Optimize actor follow scores modifications
authorChocobozzz <me@florianbigard.com>
Thu, 20 Dec 2018 13:31:11 +0000 (14:31 +0100)
committerChocobozzz <me@florianbigard.com>
Thu, 20 Dec 2018 13:31:11 +0000 (14:31 +0100)
15 files changed:
server.ts
server/initializers/constants.ts
server/lib/cache/actor-follow-score-cache.ts [new file with mode: 0644]
server/lib/cache/index.ts
server/lib/job-queue/handlers/activitypub-http-broadcast.ts
server/lib/job-queue/handlers/activitypub-http-unicast.ts
server/lib/job-queue/job-queue.ts
server/lib/schedulers/abstract-scheduler.ts
server/lib/schedulers/actor-follow-scheduler.ts [new file with mode: 0644]
server/lib/schedulers/bad-actor-follow-scheduler.ts [deleted file]
server/lib/schedulers/remove-old-jobs-scheduler.ts
server/lib/schedulers/update-videos-scheduler.ts
server/lib/schedulers/videos-redundancy-scheduler.ts
server/lib/schedulers/youtube-dl-update-scheduler.ts
server/models/activitypub/actor-follow.ts

index 6dff16f46214eb33eec2b4b485ea68ff1d942c92..868a03ba49b6c2a9667003f465b1e8cb4a03e7e3 100644 (file)
--- a/server.ts
+++ b/server.ts
@@ -94,7 +94,7 @@ import {
 } from './server/controllers'
 import { advertiseDoNotTrack } from './server/middlewares/dnt'
 import { Redis } from './server/lib/redis'
-import { BadActorFollowScheduler } from './server/lib/schedulers/bad-actor-follow-scheduler'
+import { ActorFollowScheduler } from './server/lib/schedulers/actor-follow-scheduler'
 import { RemoveOldJobsScheduler } from './server/lib/schedulers/remove-old-jobs-scheduler'
 import { UpdateVideosScheduler } from './server/lib/schedulers/update-videos-scheduler'
 import { YoutubeDlUpdateScheduler } from './server/lib/schedulers/youtube-dl-update-scheduler'
@@ -219,7 +219,7 @@ async function startApplication () {
   VideosCaptionCache.Instance.init(CONFIG.CACHE.VIDEO_CAPTIONS.SIZE, CACHE.VIDEO_CAPTIONS.MAX_AGE)
 
   // Enable Schedulers
-  BadActorFollowScheduler.Instance.enable()
+  ActorFollowScheduler.Instance.enable()
   RemoveOldJobsScheduler.Instance.enable()
   UpdateVideosScheduler.Instance.enable()
   YoutubeDlUpdateScheduler.Instance.enable()
index b326a6c7bb4b8349a71b087432439d2f5bcbc626..1c27a9f6b9710c2e2dc1b0ee8687087d68961f14 100644 (file)
@@ -144,7 +144,7 @@ const VIDEO_IMPORT_TIMEOUT = 1000 * 3600 // 1 hour
 
 // 1 hour
 let SCHEDULER_INTERVALS_MS = {
-  badActorFollow: 60000 * 60, // 1 hour
+  actorFollowScores: 60000 * 60, // 1 hour
   removeOldJobs: 60000 * 60, // 1 hour
   updateVideos: 60000, // 1 minute
   youtubeDLUpdate: 60000 * 60 * 24 // 1 day
@@ -675,7 +675,7 @@ if (isTestInstance() === true) {
 
   CONSTRAINTS_FIELDS.ACTORS.AVATAR.FILE_SIZE.max = 100 * 1024 // 100KB
 
-  SCHEDULER_INTERVALS_MS.badActorFollow = 10000
+  SCHEDULER_INTERVALS_MS.actorFollowScores = 1000
   SCHEDULER_INTERVALS_MS.removeOldJobs = 10000
   SCHEDULER_INTERVALS_MS.updateVideos = 5000
   REPEAT_JOBS['videos-views'] = { every: 5000 }
diff --git a/server/lib/cache/actor-follow-score-cache.ts b/server/lib/cache/actor-follow-score-cache.ts
new file mode 100644 (file)
index 0000000..d070bde
--- /dev/null
@@ -0,0 +1,46 @@
+import { ACTOR_FOLLOW_SCORE } from '../../initializers'
+import { logger } from '../../helpers/logger'
+
+// Cache follows scores, instead of writing them too often in database
+// Keep data in memory, we don't really need Redis here as we don't really care to loose some scores
+class ActorFollowScoreCache {
+
+  private static instance: ActorFollowScoreCache
+  private pendingFollowsScore: { [ url: string ]: number } = {}
+
+  private constructor () {}
+
+  static get Instance () {
+    return this.instance || (this.instance = new this())
+  }
+
+  updateActorFollowsScore (goodInboxes: string[], badInboxes: string[]) {
+    if (goodInboxes.length === 0 && badInboxes.length === 0) return
+
+    logger.info('Updating %d good actor follows and %d bad actor follows scores in cache.', goodInboxes.length, badInboxes.length)
+
+    for (const goodInbox of goodInboxes) {
+      if (this.pendingFollowsScore[goodInbox] === undefined) this.pendingFollowsScore[goodInbox] = 0
+
+      this.pendingFollowsScore[goodInbox] += ACTOR_FOLLOW_SCORE.BONUS
+    }
+
+    for (const badInbox of badInboxes) {
+      if (this.pendingFollowsScore[badInbox] === undefined) this.pendingFollowsScore[badInbox] = 0
+
+      this.pendingFollowsScore[badInbox] += ACTOR_FOLLOW_SCORE.PENALTY
+    }
+  }
+
+  getPendingFollowsScoreCopy () {
+    return this.pendingFollowsScore
+  }
+
+  clearPendingFollowsScore () {
+    this.pendingFollowsScore = {}
+  }
+}
+
+export {
+  ActorFollowScoreCache
+}
index 54eb983fa16ec1c970982399562c08c5d4891f57..e921d04a719f168c6d2028c1ef663f368b92355f 100644 (file)
@@ -1,2 +1,3 @@
+export * from './actor-follow-score-cache'
 export * from './videos-preview-cache'
 export * from './videos-caption-cache'
index abbd89b3b1a8f4f5f39bf44083294f0acbddaf5b..9493945ff6d9b0aadfc2514c2b4e0cb09b74ccef 100644 (file)
@@ -5,6 +5,7 @@ import { doRequest } from '../../../helpers/requests'
 import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
 import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
 import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers'
+import { ActorFollowScoreCache } from '../../cache'
 
 export type ActivitypubHttpBroadcastPayload = {
   uris: string[]
@@ -38,7 +39,7 @@ async function processActivityPubHttpBroadcast (job: Bull.Job) {
       .catch(() => badUrls.push(uri))
   }, { concurrency: BROADCAST_CONCURRENCY })
 
-  return ActorFollowModel.updateActorFollowsScore(goodUrls, badUrls, undefined)
+  return ActorFollowScoreCache.Instance.updateActorFollowsScore(goodUrls, badUrls)
 }
 
 // ---------------------------------------------------------------------------
index d36479032849d5cfc635b54cf8e1cc188c09cdb3..3973dcdc8cc7dced56a3977a6e916b99cc24b612 100644 (file)
@@ -1,9 +1,9 @@
 import * as Bull from 'bull'
 import { logger } from '../../../helpers/logger'
 import { doRequest } from '../../../helpers/requests'
-import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
 import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
 import { JOB_REQUEST_TIMEOUT } from '../../../initializers'
+import { ActorFollowScoreCache } from '../../cache'
 
 export type ActivitypubHttpUnicastPayload = {
   uri: string
@@ -31,9 +31,9 @@ async function processActivityPubHttpUnicast (job: Bull.Job) {
 
   try {
     await doRequest(options)
-    ActorFollowModel.updateActorFollowsScore([ uri ], [], undefined)
+    ActorFollowScoreCache.Instance.updateActorFollowsScore([ uri ], [])
   } catch (err) {
-    ActorFollowModel.updateActorFollowsScore([], [ uri ], undefined)
+    ActorFollowScoreCache.Instance.updateActorFollowsScore([], [ uri ])
 
     throw err
   }
index e34be7dcddecbc3b24a8664de559392ce7f660e0..ba9cbe0d99d64b9c0e3f10a579acfec291510180 100644 (file)
@@ -165,10 +165,10 @@ class JobQueue {
     return total
   }
 
-  removeOldJobs () {
+  async removeOldJobs () {
     for (const key of Object.keys(this.queues)) {
       const queue = this.queues[key]
-      queue.clean(JOB_COMPLETED_LIFETIME, 'completed')
+      await queue.clean(JOB_COMPLETED_LIFETIME, 'completed')
     }
   }
 
index b9d0a4d177b715a8fac48a6df7108afab49294db..86ea7aa38bd931ea04d917447ca02c0b989e011a 100644 (file)
@@ -1,8 +1,11 @@
+import { logger } from '../../helpers/logger'
+
 export abstract class AbstractScheduler {
 
   protected abstract schedulerIntervalMs: number
 
   private interval: NodeJS.Timer
+  private isRunning = false
 
   enable () {
     if (!this.schedulerIntervalMs) throw new Error('Interval is not correctly set.')
@@ -14,5 +17,18 @@ export abstract class AbstractScheduler {
     clearInterval(this.interval)
   }
 
-  abstract execute ()
+  async execute () {
+    if (this.isRunning === true) return
+    this.isRunning = true
+
+    try {
+      await this.internalExecute()
+    } catch (err) {
+      logger.error('Cannot execute %s scheduler.', this.constructor.name, { err })
+    } finally {
+      this.isRunning = false
+    }
+  }
+
+  protected abstract internalExecute (): Promise<any>
 }
diff --git a/server/lib/schedulers/actor-follow-scheduler.ts b/server/lib/schedulers/actor-follow-scheduler.ts
new file mode 100644 (file)
index 0000000..3967be7
--- /dev/null
@@ -0,0 +1,47 @@
+import { isTestInstance } from '../../helpers/core-utils'
+import { logger } from '../../helpers/logger'
+import { ActorFollowModel } from '../../models/activitypub/actor-follow'
+import { AbstractScheduler } from './abstract-scheduler'
+import { SCHEDULER_INTERVALS_MS } from '../../initializers'
+import { ActorFollowScoreCache } from '../cache'
+
+export class ActorFollowScheduler extends AbstractScheduler {
+
+  private static instance: AbstractScheduler
+
+  protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.actorFollowScores
+
+  private constructor () {
+    super()
+  }
+
+  protected async internalExecute () {
+    await this.processPendingScores()
+
+    await this.removeBadActorFollows()
+  }
+
+  private async processPendingScores () {
+    const pendingScores = ActorFollowScoreCache.Instance.getPendingFollowsScoreCopy()
+
+    ActorFollowScoreCache.Instance.clearPendingFollowsScore()
+
+    for (const inbox of Object.keys(pendingScores)) {
+      await ActorFollowModel.updateFollowScore(inbox, pendingScores[inbox])
+    }
+  }
+
+  private async removeBadActorFollows () {
+    if (!isTestInstance()) logger.info('Removing bad actor follows (scheduler).')
+
+    try {
+      await ActorFollowModel.removeBadActorFollows()
+    } catch (err) {
+      logger.error('Error in bad actor follows scheduler.', { err })
+    }
+  }
+
+  static get Instance () {
+    return this.instance || (this.instance = new this())
+  }
+}
diff --git a/server/lib/schedulers/bad-actor-follow-scheduler.ts b/server/lib/schedulers/bad-actor-follow-scheduler.ts
deleted file mode 100644 (file)
index 617149a..0000000
+++ /dev/null
@@ -1,30 +0,0 @@
-import { isTestInstance } from '../../helpers/core-utils'
-import { logger } from '../../helpers/logger'
-import { ActorFollowModel } from '../../models/activitypub/actor-follow'
-import { AbstractScheduler } from './abstract-scheduler'
-import { SCHEDULER_INTERVALS_MS } from '../../initializers'
-
-export class BadActorFollowScheduler extends AbstractScheduler {
-
-  private static instance: AbstractScheduler
-
-  protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.badActorFollow
-
-  private constructor () {
-    super()
-  }
-
-  async execute () {
-    if (!isTestInstance()) logger.info('Removing bad actor follows (scheduler).')
-
-    try {
-      await ActorFollowModel.removeBadActorFollows()
-    } catch (err) {
-      logger.error('Error in bad actor follows scheduler.', { err })
-    }
-  }
-
-  static get Instance () {
-    return this.instance || (this.instance = new this())
-  }
-}
index a29a6b80091bf1bc811acf2636f17c2e301a91fb..4a4341ba981250938aa93bd84ce81314a617735f 100644 (file)
@@ -14,10 +14,10 @@ export class RemoveOldJobsScheduler extends AbstractScheduler {
     super()
   }
 
-  async execute () {
-    if (!isTestInstance()) logger.info('Removing old jobs (scheduler).')
+  protected internalExecute () {
+    if (!isTestInstance()) logger.info('Removing old jobs in scheduler.')
 
-    JobQueue.Instance.removeOldJobs()
+    return JobQueue.Instance.removeOldJobs()
   }
 
   static get Instance () {
index fd2edfd1702007665ce8cf03279db14194547c30..21f071f9eed4f47552670068181b22e57661ffe2 100644 (file)
@@ -12,23 +12,12 @@ export class UpdateVideosScheduler extends AbstractScheduler {
 
   protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.updateVideos
 
-  private isRunning = false
-
   private constructor () {
     super()
   }
 
-  async execute () {
-    if (this.isRunning === true) return
-    this.isRunning = true
-
-    try {
-      await retryTransactionWrapper(this.updateVideos.bind(this))
-    } catch (err) {
-      logger.error('Cannot execute update videos scheduler.', { err })
-    } finally {
-      this.isRunning = false
-    }
+  protected async internalExecute () {
+    return retryTransactionWrapper(this.updateVideos.bind(this))
   }
 
   private async updateVideos () {
index 15e094d39dba5fedcebeb126a9564c36f23948fa..f643ee2268d38969e2403614949cd29a6b39eb42 100644 (file)
@@ -16,7 +16,6 @@ import { getOrCreateVideoAndAccountAndChannel } from '../activitypub'
 export class VideosRedundancyScheduler extends AbstractScheduler {
 
   private static instance: AbstractScheduler
-  private executing = false
 
   protected schedulerIntervalMs = CONFIG.REDUNDANCY.VIDEOS.CHECK_INTERVAL
 
@@ -24,11 +23,7 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
     super()
   }
 
-  async execute () {
-    if (this.executing) return
-
-    this.executing = true
-
+  protected async internalExecute () {
     for (const obj of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) {
       logger.info('Running redundancy scheduler for strategy %s.', obj.strategy)
 
@@ -57,8 +52,6 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
     await this.extendsLocalExpiration()
 
     await this.purgeRemoteExpired()
-
-    this.executing = false
   }
 
   static get Instance () {
index 461cd045ef1256bc9f83a36c334c6d8774ac1085..aa027116d7c93b18fd0573538fcd36c1f9db4402 100644 (file)
@@ -12,7 +12,7 @@ export class YoutubeDlUpdateScheduler extends AbstractScheduler {
     super()
   }
 
-  execute () {
+  protected internalExecute () {
     return updateYoutubeDLBinary()
   }
 
index 0a693508354549ff3fbd46d55a9014211535cdb1..994f791de2a6711f08074d04c562a9f583ff96ec 100644 (file)
@@ -127,22 +127,6 @@ export class ActorFollowModel extends Model<ActorFollowModel> {
     if (numberOfActorFollowsRemoved) logger.info('Removed bad %d actor follows.', numberOfActorFollowsRemoved)
   }
 
-  static updateActorFollowsScore (goodInboxes: string[], badInboxes: string[], t: Sequelize.Transaction | undefined) {
-    if (goodInboxes.length === 0 && badInboxes.length === 0) return
-
-    logger.info('Updating %d good actor follows and %d bad actor follows scores.', goodInboxes.length, badInboxes.length)
-
-    if (goodInboxes.length !== 0) {
-      ActorFollowModel.incrementScores(goodInboxes, ACTOR_FOLLOW_SCORE.BONUS, t)
-        .catch(err => logger.error('Cannot increment scores of good actor follows.', { err }))
-    }
-
-    if (badInboxes.length !== 0) {
-      ActorFollowModel.incrementScores(badInboxes, ACTOR_FOLLOW_SCORE.PENALTY, t)
-        .catch(err => logger.error('Cannot decrement scores of bad actor follows.', { err }))
-    }
-  }
-
   static loadByActorAndTarget (actorId: number, targetActorId: number, t?: Sequelize.Transaction) {
     const query = {
       where: {
@@ -464,6 +448,22 @@ export class ActorFollowModel extends Model<ActorFollowModel> {
     }
   }
 
+  static updateFollowScore (inboxUrl: string, value: number, t?: Sequelize.Transaction) {
+    const query = `UPDATE "actorFollow" SET "score" = LEAST("score" + ${value}, ${ACTOR_FOLLOW_SCORE.MAX}) ` +
+      'WHERE id IN (' +
+      'SELECT "actorFollow"."id" FROM "actorFollow" ' +
+      'INNER JOIN "actor" ON "actor"."id" = "actorFollow"."actorId" ' +
+      `WHERE "actor"."inboxUrl" = '${inboxUrl}' OR "actor"."sharedInboxUrl" = '${inboxUrl}'` +
+      ')'
+
+    const options = {
+      type: Sequelize.QueryTypes.BULKUPDATE,
+      transaction: t
+    }
+
+    return ActorFollowModel.sequelize.query(query, options)
+  }
+
   private static async createListAcceptedFollowForApiQuery (
     type: 'followers' | 'following',
     actorIds: number[],
@@ -518,24 +518,6 @@ export class ActorFollowModel extends Model<ActorFollowModel> {
     }
   }
 
-  private static incrementScores (inboxUrls: string[], value: number, t: Sequelize.Transaction | undefined) {
-    const inboxUrlsString = inboxUrls.map(url => `'${url}'`).join(',')
-
-    const query = `UPDATE "actorFollow" SET "score" = LEAST("score" + ${value}, ${ACTOR_FOLLOW_SCORE.MAX}) ` +
-      'WHERE id IN (' +
-        'SELECT "actorFollow"."id" FROM "actorFollow" ' +
-        'INNER JOIN "actor" ON "actor"."id" = "actorFollow"."actorId" ' +
-        'WHERE "actor"."inboxUrl" IN (' + inboxUrlsString + ') OR "actor"."sharedInboxUrl" IN (' + inboxUrlsString + ')' +
-      ')'
-
-    const options = t ? {
-      type: Sequelize.QueryTypes.BULKUPDATE,
-      transaction: t
-    } : undefined
-
-    return ActorFollowModel.sequelize.query(query, options)
-  }
-
   private static listBadActorFollows () {
     const query = {
       where: {