} from './server/controllers'
import { advertiseDoNotTrack } from './server/middlewares/dnt'
import { Redis } from './server/lib/redis'
-import { BadActorFollowScheduler } from './server/lib/schedulers/bad-actor-follow-scheduler'
+import { ActorFollowScheduler } from './server/lib/schedulers/actor-follow-scheduler'
import { RemoveOldJobsScheduler } from './server/lib/schedulers/remove-old-jobs-scheduler'
import { UpdateVideosScheduler } from './server/lib/schedulers/update-videos-scheduler'
import { YoutubeDlUpdateScheduler } from './server/lib/schedulers/youtube-dl-update-scheduler'
VideosCaptionCache.Instance.init(CONFIG.CACHE.VIDEO_CAPTIONS.SIZE, CACHE.VIDEO_CAPTIONS.MAX_AGE)
// Enable Schedulers
- BadActorFollowScheduler.Instance.enable()
+ ActorFollowScheduler.Instance.enable()
RemoveOldJobsScheduler.Instance.enable()
UpdateVideosScheduler.Instance.enable()
YoutubeDlUpdateScheduler.Instance.enable()
// 1 hour
let SCHEDULER_INTERVALS_MS = {
- badActorFollow: 60000 * 60, // 1 hour
+ actorFollowScores: 60000 * 60, // 1 hour
removeOldJobs: 60000 * 60, // 1 hour
updateVideos: 60000, // 1 minute
youtubeDLUpdate: 60000 * 60 * 24 // 1 day
CONSTRAINTS_FIELDS.ACTORS.AVATAR.FILE_SIZE.max = 100 * 1024 // 100KB
- SCHEDULER_INTERVALS_MS.badActorFollow = 10000
+ SCHEDULER_INTERVALS_MS.actorFollowScores = 1000
SCHEDULER_INTERVALS_MS.removeOldJobs = 10000
SCHEDULER_INTERVALS_MS.updateVideos = 5000
REPEAT_JOBS['videos-views'] = { every: 5000 }
--- /dev/null
+import { ACTOR_FOLLOW_SCORE } from '../../initializers'
+import { logger } from '../../helpers/logger'
+
+// Cache follows scores, instead of writing them too often in database
+// Keep data in memory, we don't really need Redis here as we don't really care to loose some scores
+class ActorFollowScoreCache {
+
+ private static instance: ActorFollowScoreCache
+ private pendingFollowsScore: { [ url: string ]: number } = {}
+
+ private constructor () {}
+
+ static get Instance () {
+ return this.instance || (this.instance = new this())
+ }
+
+ updateActorFollowsScore (goodInboxes: string[], badInboxes: string[]) {
+ if (goodInboxes.length === 0 && badInboxes.length === 0) return
+
+ logger.info('Updating %d good actor follows and %d bad actor follows scores in cache.', goodInboxes.length, badInboxes.length)
+
+ for (const goodInbox of goodInboxes) {
+ if (this.pendingFollowsScore[goodInbox] === undefined) this.pendingFollowsScore[goodInbox] = 0
+
+ this.pendingFollowsScore[goodInbox] += ACTOR_FOLLOW_SCORE.BONUS
+ }
+
+ for (const badInbox of badInboxes) {
+ if (this.pendingFollowsScore[badInbox] === undefined) this.pendingFollowsScore[badInbox] = 0
+
+ this.pendingFollowsScore[badInbox] += ACTOR_FOLLOW_SCORE.PENALTY
+ }
+ }
+
+ getPendingFollowsScoreCopy () {
+ return this.pendingFollowsScore
+ }
+
+ clearPendingFollowsScore () {
+ this.pendingFollowsScore = {}
+ }
+}
+
+export {
+ ActorFollowScoreCache
+}
+export * from './actor-follow-score-cache'
export * from './videos-preview-cache'
export * from './videos-caption-cache'
import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers'
+import { ActorFollowScoreCache } from '../../cache'
export type ActivitypubHttpBroadcastPayload = {
uris: string[]
.catch(() => badUrls.push(uri))
}, { concurrency: BROADCAST_CONCURRENCY })
- return ActorFollowModel.updateActorFollowsScore(goodUrls, badUrls, undefined)
+ return ActorFollowScoreCache.Instance.updateActorFollowsScore(goodUrls, badUrls)
}
// ---------------------------------------------------------------------------
import * as Bull from 'bull'
import { logger } from '../../../helpers/logger'
import { doRequest } from '../../../helpers/requests'
-import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
import { JOB_REQUEST_TIMEOUT } from '../../../initializers'
+import { ActorFollowScoreCache } from '../../cache'
export type ActivitypubHttpUnicastPayload = {
uri: string
try {
await doRequest(options)
- ActorFollowModel.updateActorFollowsScore([ uri ], [], undefined)
+ ActorFollowScoreCache.Instance.updateActorFollowsScore([ uri ], [])
} catch (err) {
- ActorFollowModel.updateActorFollowsScore([], [ uri ], undefined)
+ ActorFollowScoreCache.Instance.updateActorFollowsScore([], [ uri ])
throw err
}
return total
}
- removeOldJobs () {
+ async removeOldJobs () {
for (const key of Object.keys(this.queues)) {
const queue = this.queues[key]
- queue.clean(JOB_COMPLETED_LIFETIME, 'completed')
+ await queue.clean(JOB_COMPLETED_LIFETIME, 'completed')
}
}
+import { logger } from '../../helpers/logger'
+
export abstract class AbstractScheduler {
protected abstract schedulerIntervalMs: number
private interval: NodeJS.Timer
+ private isRunning = false
enable () {
if (!this.schedulerIntervalMs) throw new Error('Interval is not correctly set.')
clearInterval(this.interval)
}
- abstract execute ()
+ async execute () {
+ if (this.isRunning === true) return
+ this.isRunning = true
+
+ try {
+ await this.internalExecute()
+ } catch (err) {
+ logger.error('Cannot execute %s scheduler.', this.constructor.name, { err })
+ } finally {
+ this.isRunning = false
+ }
+ }
+
+ protected abstract internalExecute (): Promise<any>
}
--- /dev/null
+import { isTestInstance } from '../../helpers/core-utils'
+import { logger } from '../../helpers/logger'
+import { ActorFollowModel } from '../../models/activitypub/actor-follow'
+import { AbstractScheduler } from './abstract-scheduler'
+import { SCHEDULER_INTERVALS_MS } from '../../initializers'
+import { ActorFollowScoreCache } from '../cache'
+
+export class ActorFollowScheduler extends AbstractScheduler {
+
+ private static instance: AbstractScheduler
+
+ protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.actorFollowScores
+
+ private constructor () {
+ super()
+ }
+
+ protected async internalExecute () {
+ await this.processPendingScores()
+
+ await this.removeBadActorFollows()
+ }
+
+ private async processPendingScores () {
+ const pendingScores = ActorFollowScoreCache.Instance.getPendingFollowsScoreCopy()
+
+ ActorFollowScoreCache.Instance.clearPendingFollowsScore()
+
+ for (const inbox of Object.keys(pendingScores)) {
+ await ActorFollowModel.updateFollowScore(inbox, pendingScores[inbox])
+ }
+ }
+
+ private async removeBadActorFollows () {
+ if (!isTestInstance()) logger.info('Removing bad actor follows (scheduler).')
+
+ try {
+ await ActorFollowModel.removeBadActorFollows()
+ } catch (err) {
+ logger.error('Error in bad actor follows scheduler.', { err })
+ }
+ }
+
+ static get Instance () {
+ return this.instance || (this.instance = new this())
+ }
+}
+++ /dev/null
-import { isTestInstance } from '../../helpers/core-utils'
-import { logger } from '../../helpers/logger'
-import { ActorFollowModel } from '../../models/activitypub/actor-follow'
-import { AbstractScheduler } from './abstract-scheduler'
-import { SCHEDULER_INTERVALS_MS } from '../../initializers'
-
-export class BadActorFollowScheduler extends AbstractScheduler {
-
- private static instance: AbstractScheduler
-
- protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.badActorFollow
-
- private constructor () {
- super()
- }
-
- async execute () {
- if (!isTestInstance()) logger.info('Removing bad actor follows (scheduler).')
-
- try {
- await ActorFollowModel.removeBadActorFollows()
- } catch (err) {
- logger.error('Error in bad actor follows scheduler.', { err })
- }
- }
-
- static get Instance () {
- return this.instance || (this.instance = new this())
- }
-}
super()
}
- async execute () {
- if (!isTestInstance()) logger.info('Removing old jobs (scheduler).')
+ protected internalExecute () {
+ if (!isTestInstance()) logger.info('Removing old jobs in scheduler.')
- JobQueue.Instance.removeOldJobs()
+ return JobQueue.Instance.removeOldJobs()
}
static get Instance () {
protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.updateVideos
- private isRunning = false
-
private constructor () {
super()
}
- async execute () {
- if (this.isRunning === true) return
- this.isRunning = true
-
- try {
- await retryTransactionWrapper(this.updateVideos.bind(this))
- } catch (err) {
- logger.error('Cannot execute update videos scheduler.', { err })
- } finally {
- this.isRunning = false
- }
+ protected async internalExecute () {
+ return retryTransactionWrapper(this.updateVideos.bind(this))
}
private async updateVideos () {
export class VideosRedundancyScheduler extends AbstractScheduler {
private static instance: AbstractScheduler
- private executing = false
protected schedulerIntervalMs = CONFIG.REDUNDANCY.VIDEOS.CHECK_INTERVAL
super()
}
- async execute () {
- if (this.executing) return
-
- this.executing = true
-
+ protected async internalExecute () {
for (const obj of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) {
logger.info('Running redundancy scheduler for strategy %s.', obj.strategy)
await this.extendsLocalExpiration()
await this.purgeRemoteExpired()
-
- this.executing = false
}
static get Instance () {
super()
}
- execute () {
+ protected internalExecute () {
return updateYoutubeDLBinary()
}
if (numberOfActorFollowsRemoved) logger.info('Removed bad %d actor follows.', numberOfActorFollowsRemoved)
}
- static updateActorFollowsScore (goodInboxes: string[], badInboxes: string[], t: Sequelize.Transaction | undefined) {
- if (goodInboxes.length === 0 && badInboxes.length === 0) return
-
- logger.info('Updating %d good actor follows and %d bad actor follows scores.', goodInboxes.length, badInboxes.length)
-
- if (goodInboxes.length !== 0) {
- ActorFollowModel.incrementScores(goodInboxes, ACTOR_FOLLOW_SCORE.BONUS, t)
- .catch(err => logger.error('Cannot increment scores of good actor follows.', { err }))
- }
-
- if (badInboxes.length !== 0) {
- ActorFollowModel.incrementScores(badInboxes, ACTOR_FOLLOW_SCORE.PENALTY, t)
- .catch(err => logger.error('Cannot decrement scores of bad actor follows.', { err }))
- }
- }
-
static loadByActorAndTarget (actorId: number, targetActorId: number, t?: Sequelize.Transaction) {
const query = {
where: {
}
}
+ static updateFollowScore (inboxUrl: string, value: number, t?: Sequelize.Transaction) {
+ const query = `UPDATE "actorFollow" SET "score" = LEAST("score" + ${value}, ${ACTOR_FOLLOW_SCORE.MAX}) ` +
+ 'WHERE id IN (' +
+ 'SELECT "actorFollow"."id" FROM "actorFollow" ' +
+ 'INNER JOIN "actor" ON "actor"."id" = "actorFollow"."actorId" ' +
+ `WHERE "actor"."inboxUrl" = '${inboxUrl}' OR "actor"."sharedInboxUrl" = '${inboxUrl}'` +
+ ')'
+
+ const options = {
+ type: Sequelize.QueryTypes.BULKUPDATE,
+ transaction: t
+ }
+
+ return ActorFollowModel.sequelize.query(query, options)
+ }
+
private static async createListAcceptedFollowForApiQuery (
type: 'followers' | 'following',
actorIds: number[],
}
}
- private static incrementScores (inboxUrls: string[], value: number, t: Sequelize.Transaction | undefined) {
- const inboxUrlsString = inboxUrls.map(url => `'${url}'`).join(',')
-
- const query = `UPDATE "actorFollow" SET "score" = LEAST("score" + ${value}, ${ACTOR_FOLLOW_SCORE.MAX}) ` +
- 'WHERE id IN (' +
- 'SELECT "actorFollow"."id" FROM "actorFollow" ' +
- 'INNER JOIN "actor" ON "actor"."id" = "actorFollow"."actorId" ' +
- 'WHERE "actor"."inboxUrl" IN (' + inboxUrlsString + ') OR "actor"."sharedInboxUrl" IN (' + inboxUrlsString + ')' +
- ')'
-
- const options = t ? {
- type: Sequelize.QueryTypes.BULKUPDATE,
- transaction: t
- } : undefined
-
- return ActorFollowModel.sequelize.query(query, options)
- }
-
private static listBadActorFollows () {
const query = {
where: {