function getVideo (req: express.Request, res: express.Response) {
const videoInstance = res.locals.video
+ if (videoInstance.isOutdated()) {
+ JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video', videoUrl: videoInstance.url } })
+ .catch(err => logger.error('Cannot create AP refresher job for video %s.', videoInstance.url, { err }))
+ }
+
return res.json(videoInstance.toFormattedDetailsJSON())
}
return res.json({ description })
}
-async function listVideos (req: express.Request, res: express.Response, next: express.NextFunction) {
+async function listVideos (req: express.Request, res: express.Response) {
const resultList = await VideoModel.listForApi({
start: req.query.start,
count: req.query.count,
'video-file': 1,
'video-import': 1,
'email': 5,
- 'videos-views': 1
+ 'videos-views': 1,
+ 'activitypub-refresher': 1
}
const JOB_CONCURRENCY: { [ id in JobType ]: number } = {
'activitypub-http-broadcast': 1,
'video-file': 1,
'video-import': 1,
'email': 5,
- 'videos-views': 1
+ 'videos-views': 1,
+ 'activitypub-refresher': 1
}
const JOB_TTL: { [ id in JobType ]: number } = {
'activitypub-http-broadcast': 60000 * 10, // 10 minutes
'video-file': 1000 * 3600 * 48, // 2 days, transcoding could be long
'video-import': 1000 * 3600 * 2, // hours
'email': 60000 * 10, // 10 minutes
- 'videos-views': undefined // Unlimited
+ 'videos-views': undefined, // Unlimited
+ 'activitypub-refresher': 60000 * 10 // 10 minutes
}
const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = {
'videos-views': {
- cron: '1 * * * *' // At 1 minutes past the hour
+ cron: '1 * * * *' // At 1 minute past the hour
}
}
// ---------------------------------------------------------------------------
-const PRIVATE_RSA_KEY_SIZE = 2048
+let PRIVATE_RSA_KEY_SIZE = 2048
// Password encryption
const BCRYPT_SALT_SIZE = 10
// Special constants for a test instance
if (isTestInstance() === true) {
+ PRIVATE_RSA_KEY_SIZE = 1024
+
ACTOR_FOLLOW_SCORE.BASE = 20
REMOTE_SCHEME.HTTP = 'http'
videoObject,
account: actor.Account,
channel: channelActor.VideoChannel,
- updateViews: true,
overrideTo: activity.to
}
return updateVideoFromAP(updateOptions)
shares: boolean
comments: boolean
thumbnail: boolean
- refreshVideo: boolean
+ refreshVideo?: boolean
}
async function syncVideoExternalAttributes (video: VideoModel, fetchedVideo: VideoTorrentObject, syncParam: SyncParam) {
logger.info('Adding likes/dislikes/shares/comments of video %s.', video.uuid)
async function getOrCreateVideoAndAccountAndChannel (options: {
videoObject: VideoTorrentObject | string,
syncParam?: SyncParam,
- fetchType?: VideoFetchByUrlType,
- refreshViews?: boolean
+ fetchType?: VideoFetchByUrlType
}) {
// Default params
const syncParam = options.syncParam || { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true, refreshVideo: false }
const fetchType = options.fetchType || 'all'
- const refreshViews = options.refreshViews || false
// Get video url
const videoUrl = getAPUrl(options.videoObject)
const refreshOptions = {
video: videoFromDatabase,
fetchedType: fetchType,
- syncParam,
- refreshViews
+ syncParam
}
- const p = refreshVideoIfNeeded(refreshOptions)
- if (syncParam.refreshVideo === true) videoFromDatabase = await p
+
+ if (syncParam.refreshVideo === true) videoFromDatabase = await refreshVideoIfNeeded(refreshOptions)
+ else await JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video', videoUrl: videoFromDatabase.url } })
return { video: videoFromDatabase }
}
videoObject: VideoTorrentObject,
account: AccountModel,
channel: VideoChannelModel,
- updateViews: boolean,
overrideTo?: string[]
}) {
logger.debug('Updating remote video "%s".', options.videoObject.uuid)
options.video.set('publishedAt', videoData.publishedAt)
options.video.set('privacy', videoData.privacy)
options.video.set('channelId', videoData.channelId)
+ options.video.set('views', videoData.views)
- if (options.updateViews === true) options.video.set('views', videoData.views)
await options.video.save(sequelizeOptions)
{
}
}
+async function refreshVideoIfNeeded (options: {
+ video: VideoModel,
+ fetchedType: VideoFetchByUrlType,
+ syncParam: SyncParam
+}): Promise<VideoModel> {
+ if (!options.video.isOutdated()) return options.video
+
+ // We need more attributes if the argument video was fetched with not enough joints
+ const video = options.fetchedType === 'all' ? options.video : await VideoModel.loadByUrlAndPopulateAccount(options.video.url)
+
+ try {
+ const { response, videoObject } = await fetchRemoteVideo(video.url)
+ if (response.statusCode === 404) {
+ logger.info('Cannot refresh remote video %s: video does not exist anymore. Deleting it.', video.url)
+
+ // Video does not exist anymore
+ await video.destroy()
+ return undefined
+ }
+
+ if (videoObject === undefined) {
+ logger.warn('Cannot refresh remote video %s: invalid body.', video.url)
+
+ await video.setAsRefreshed()
+ return video
+ }
+
+ const channelActor = await getOrCreateVideoChannelFromVideoObject(videoObject)
+ const account = await AccountModel.load(channelActor.VideoChannel.accountId)
+
+ const updateOptions = {
+ video,
+ videoObject,
+ account,
+ channel: channelActor.VideoChannel
+ }
+ await retryTransactionWrapper(updateVideoFromAP, updateOptions)
+ await syncVideoExternalAttributes(video, videoObject, options.syncParam)
+
+ return video
+ } catch (err) {
+ logger.warn('Cannot refresh video %s.', options.video.url, { err })
+
+ // Don't refresh in loop
+ await video.setAsRefreshed()
+ return video
+ }
+}
+
export {
updateVideoFromAP,
+ refreshVideoIfNeeded,
federateVideoIfNeeded,
fetchRemoteVideo,
getOrCreateVideoAndAccountAndChannel,
return videoCreated
}
-async function refreshVideoIfNeeded (options: {
- video: VideoModel,
- fetchedType: VideoFetchByUrlType,
- syncParam: SyncParam,
- refreshViews: boolean
-}): Promise<VideoModel> {
- if (!options.video.isOutdated()) return options.video
-
- // We need more attributes if the argument video was fetched with not enough joints
- const video = options.fetchedType === 'all' ? options.video : await VideoModel.loadByUrlAndPopulateAccount(options.video.url)
-
- try {
- const { response, videoObject } = await fetchRemoteVideo(video.url)
- if (response.statusCode === 404) {
- logger.info('Cannot refresh remote video %s: video does not exist anymore. Deleting it.', video.url)
-
- // Video does not exist anymore
- await video.destroy()
- return undefined
- }
-
- if (videoObject === undefined) {
- logger.warn('Cannot refresh remote video %s: invalid body.', video.url)
- return video
- }
-
- const channelActor = await getOrCreateVideoChannelFromVideoObject(videoObject)
- const account = await AccountModel.load(channelActor.VideoChannel.accountId)
-
- const updateOptions = {
- video,
- videoObject,
- account,
- channel: channelActor.VideoChannel,
- updateViews: options.refreshViews
- }
- await retryTransactionWrapper(updateVideoFromAP, updateOptions)
- await syncVideoExternalAttributes(video, videoObject, options.syncParam)
-
- return video
- } catch (err) {
- logger.warn('Cannot refresh video %s.', options.video.url, { err })
- return video
- }
-}
-
async function videoActivityObjectToDBAttributes (
videoChannel: VideoChannelModel,
videoObject: VideoTorrentObject,
--- /dev/null
+import * as Bull from 'bull'
+import { logger } from '../../../helpers/logger'
+import { fetchVideoByUrl } from '../../../helpers/video'
+import { refreshVideoIfNeeded } from '../../activitypub'
+
+export type RefreshPayload = {
+ videoUrl: string
+ type: 'video'
+}
+
+async function refreshAPObject (job: Bull.Job) {
+ const payload = job.data as RefreshPayload
+ logger.info('Processing AP refresher in job %d.', job.id)
+
+ if (payload.type === 'video') return refreshAPVideo(payload.videoUrl)
+}
+
+// ---------------------------------------------------------------------------
+
+export {
+ refreshAPObject
+}
+
+// ---------------------------------------------------------------------------
+
+async function refreshAPVideo (videoUrl: string) {
+ const fetchType = 'all' as 'all'
+ const syncParam = { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true }
+
+ const videoFromDatabase = await fetchVideoByUrl(videoUrl, fetchType)
+ if (videoFromDatabase) {
+ const refreshOptions = {
+ video: videoFromDatabase,
+ fetchedType: fetchType,
+ syncParam
+ }
+
+ await refreshVideoIfNeeded(refreshOptions)
+ }
+}
import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow'
import { processVideoImport, VideoImportPayload } from './handlers/video-import'
import { processVideosViews } from './handlers/video-views'
+import { refreshAPObject, RefreshPayload } from './handlers/activitypub-refresher'
type CreateJobArgument =
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
{ type: 'video-file', payload: VideoFilePayload } |
{ type: 'email', payload: EmailPayload } |
{ type: 'video-import', payload: VideoImportPayload } |
+ { type: 'activitypub-refresher', payload: RefreshPayload } |
{ type: 'videos-views', payload: {} }
const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = {
'video-file': processVideoFile,
'email': processEmail,
'video-import': processVideoImport,
- 'videos-views': processVideosViews
+ 'videos-views': processVideosViews,
+ 'activitypub-refresher': refreshAPObject
}
const jobTypes: JobType[] = [
'video-file',
'video-file-import',
'video-import',
- 'videos-views'
+ 'videos-views',
+ 'activitypub-refresher'
]
class JobQueue {
(now - updatedAtTime) > ACTIVITY_PUB.VIDEO_REFRESH_INTERVAL
}
+ setAsRefreshed () {
+ this.changed('updatedAt', true)
+
+ return this.save()
+ }
+
getBaseUrls () {
let baseUrlHttp
let baseUrlWs
import './client'
import './fetch'
import './helpers'
+import './refresher'
import './security'
--- /dev/null
+/* tslint:disable:no-unused-expression */
+
+import 'mocha'
+import { doubleFollow, getVideo, reRunServer } from '../../utils'
+import { flushAndRunMultipleServers, killallServers, ServerInfo, setAccessTokensToServers, uploadVideo, wait } from '../../utils/index'
+import { waitJobs } from '../../utils/server/jobs'
+import { setVideoField } from '../../utils/miscs/sql'
+
+describe('Test AP refresher', function () {
+ let servers: ServerInfo[] = []
+ let videoUUID1: string
+ let videoUUID2: string
+ let videoUUID3: string
+
+ before(async function () {
+ this.timeout(30000)
+
+ servers = await flushAndRunMultipleServers(2)
+
+ // Get the access tokens
+ await setAccessTokensToServers(servers)
+
+ {
+ const res = await uploadVideo(servers[1].url, servers[1].accessToken, { name: 'video1' })
+ videoUUID1 = res.body.video.uuid
+ }
+
+ {
+ const res = await uploadVideo(servers[1].url, servers[1].accessToken, { name: 'video2' })
+ videoUUID2 = res.body.video.uuid
+ }
+
+ {
+ const res = await uploadVideo(servers[1].url, servers[1].accessToken, { name: 'video3' })
+ videoUUID3 = res.body.video.uuid
+ }
+
+ await doubleFollow(servers[0], servers[1])
+ })
+
+ it('Should remove a deleted remote video', async function () {
+ this.timeout(60000)
+
+ await wait(10000)
+
+ // Change UUID so the remote server returns a 404
+ await setVideoField(2, videoUUID1, 'uuid', '304afe4f-39f9-4d49-8ed7-ac57b86b174f')
+
+ await getVideo(servers[0].url, videoUUID1)
+ await getVideo(servers[0].url, videoUUID2)
+
+ await waitJobs(servers)
+
+ await getVideo(servers[0].url, videoUUID1, 404)
+ await getVideo(servers[0].url, videoUUID2, 200)
+ })
+
+ it('Should not update a remote video if the remote instance is down', async function () {
+ this.timeout(60000)
+
+ killallServers([ servers[1] ])
+
+ await setVideoField(2, videoUUID3, 'uuid', '304afe4f-39f9-4d49-8ed7-ac57b86b174e')
+
+ // Video will need a refresh
+ await wait(10000)
+
+ await getVideo(servers[0].url, videoUUID3)
+ // The refresh should fail
+ await waitJobs([ servers[0] ])
+
+ await reRunServer(servers[1])
+
+ // Should not refresh the video, even if the last refresh failed (to avoir a loop on dead instances)
+ await getVideo(servers[0].url, videoUUID3)
+ await waitJobs(servers)
+
+ await getVideo(servers[0].url, videoUUID3, 200)
+ })
+
+ after(async function () {
+ killallServers(servers)
+ })
+})
'video-file' |
'email' |
'video-import' |
- 'videos-views'
+ 'videos-views' |
+ 'activitypub-refresher'
export interface Job {
id: number