import { YoutubeDlUpdateScheduler } from './server/lib/schedulers/youtube-dl-update-scheduler'
import { VideosRedundancyScheduler } from './server/lib/schedulers/videos-redundancy-scheduler'
import { RemoveOldHistoryScheduler } from './server/lib/schedulers/remove-old-history-scheduler'
+import { AutoFollowIndexInstances } from './server/lib/schedulers/auto-follow-index-instances'
import { isHTTPSignatureDigestValid } from './server/helpers/peertube-crypto'
import { PeerTubeSocket } from './server/lib/peertube-socket'
import { updateStreamingPlaylistsInfohashesIfNeeded } from './server/lib/hls'
RemoveOldHistoryScheduler.Instance.enable()
RemoveOldViewsScheduler.Instance.enable()
PluginsCheckScheduler.Instance.enable()
+ AutoFollowIndexInstances.Instance.enable()
// Redis initialization
Redis.Instance.init()
updateVideos: 60000, // 1 minute
youtubeDLUpdate: 60000 * 60 * 24, // 1 day
checkPlugins: CONFIG.PLUGINS.INDEX.CHECK_LATEST_VERSIONS_INTERVAL,
+ autoFollowIndexInstances: 60000 * 60 * 24, // 1 day
removeOldViews: 60000 * 60 * 24, // 1 day
removeOldHistory: 60000 * 60 * 24 // 1 day
}
+const INSTANCES_INDEX = {
+ HOSTS_PATH: '/api/v1/instances/hosts'
+}
+
// ---------------------------------------------------------------------------
const CONSTRAINTS_FIELDS = {
SCHEDULER_INTERVALS_MS.removeOldHistory = 5000
SCHEDULER_INTERVALS_MS.removeOldViews = 5000
SCHEDULER_INTERVALS_MS.updateVideos = 5000
+ SCHEDULER_INTERVALS_MS.autoFollowIndexInstances = 5000
REPEAT_JOBS[ 'videos-views' ] = { every: 5000 }
REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1
PREVIEWS_SIZE,
REMOTE_SCHEME,
FOLLOW_STATES,
+ INSTANCES_INDEX,
DEFAULT_USER_THEME_NAME,
SERVER_ACTOR_NAME,
PLUGIN_GLOBAL_CSS_FILE_NAME,
--- /dev/null
+import { logger } from '../../helpers/logger'
+import { AbstractScheduler } from './abstract-scheduler'
+import { INSTANCES_INDEX, SCHEDULER_INTERVALS_MS, SERVER_ACTOR_NAME } from '../../initializers/constants'
+import { CONFIG } from '../../initializers/config'
+import { chunk } from 'lodash'
+import { doRequest } from '@server/helpers/requests'
+import { ActorFollowModel } from '@server/models/activitypub/actor-follow'
+import { JobQueue } from '@server/lib/job-queue'
+import { getServerActor } from '@server/helpers/utils'
+
+export class AutoFollowIndexInstances extends AbstractScheduler {
+
+ private static instance: AbstractScheduler
+
+ protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.autoFollowIndexInstances
+
+ private lastCheck: Date
+
+ private constructor () {
+ super()
+ }
+
+ protected async internalExecute () {
+ return this.autoFollow()
+ }
+
+ private async autoFollow () {
+ if (CONFIG.FOLLOWINGS.INSTANCE.AUTO_FOLLOW_INDEX.ENABLED === false) return
+
+ const indexUrl = CONFIG.FOLLOWINGS.INSTANCE.AUTO_FOLLOW_INDEX.INDEX_URL
+
+ logger.info('Auto follow instances of index %s.', indexUrl)
+
+ try {
+ const serverActor = await getServerActor()
+
+ const uri = indexUrl + INSTANCES_INDEX.HOSTS_PATH
+
+ const qs = this.lastCheck ? { since: this.lastCheck.toISOString() } : {}
+ this.lastCheck = new Date()
+
+ const { body } = await doRequest({ uri, qs, json: true })
+
+ const hosts: string[] = body.data.map(o => o.host)
+ const chunks = chunk(hosts, 20)
+
+ for (const chunk of chunks) {
+ const unfollowedHosts = await ActorFollowModel.keepUnfollowedInstance(chunk)
+
+ for (const unfollowedHost of unfollowedHosts) {
+ const payload = {
+ host: unfollowedHost,
+ name: SERVER_ACTOR_NAME,
+ followerActorId: serverActor.id,
+ isAutoFollow: true
+ }
+
+ await JobQueue.Instance.createJob({ type: 'activitypub-follow', payload })
+ .catch(err => logger.error('Cannot create follow job for %s.', unfollowedHost, err))
+ }
+ }
+
+ } catch (err) {
+ logger.error('Cannot auto follow hosts of index %s.', indexUrl, { err })
+ }
+
+ }
+
+ static get Instance () {
+ return this.instance || (this.instance = new this())
+ }
+}
import * as Bluebird from 'bluebird'
-import { values } from 'lodash'
+import { values, difference } from 'lodash'
import {
AfterCreate,
AfterDestroy,
import { ActorFollow } from '../../../shared/models/actors/follow.model'
import { logger } from '../../helpers/logger'
import { getServerActor } from '../../helpers/utils'
-import { ACTOR_FOLLOW_SCORE, FOLLOW_STATES } from '../../initializers/constants'
+import { ACTOR_FOLLOW_SCORE, FOLLOW_STATES, SERVER_ACTOR_NAME } from '../../initializers/constants'
import { ServerModel } from '../server/server'
import { createSafeIn, getSort } from '../utils'
import { ActorModel, unusedActorAttributesForAPI } from './actor'
})
}
+ static async keepUnfollowedInstance (hosts: string[]) {
+ const followerId = (await getServerActor()).id
+
+ const query = {
+ attributes: [],
+ where: {
+ actorId: followerId
+ },
+ include: [
+ {
+ attributes: [ ],
+ model: ActorModel.unscoped(),
+ required: true,
+ as: 'ActorFollowing',
+ where: {
+ preferredUsername: SERVER_ACTOR_NAME
+ },
+ include: [
+ {
+ attributes: [ 'host' ],
+ model: ServerModel.unscoped(),
+ required: true,
+ where: {
+ host: {
+ [Op.in]: hosts
+ }
+ }
+ }
+ ]
+ }
+ ]
+ }
+
+ const res = await ActorFollowModel.findAll(query)
+ const followedHosts = res.map(res => res.ActorFollowing.Server.host)
+
+ return difference(hosts, followedHosts)
+ }
+
static listAcceptedFollowerUrlsForAP (actorIds: number[], t: Transaction, start?: number, count?: number) {
return ActorFollowModel.createListAcceptedFollowForApiQuery('followers', actorIds, t, start, count)
}
acceptFollower,
cleanupTests,
flushAndRunMultipleServers,
+ MockInstancesIndex,
ServerInfo,
setAccessTokensToServers,
unfollow,
- updateCustomSubConfig
+ updateCustomSubConfig,
+ wait
} from '../../../../shared/extra-utils/index'
import { follow, getFollowersListPaginationAndSort, getFollowingListPaginationAndSort } from '../../../../shared/extra-utils/server/follows'
import { waitJobs } from '../../../../shared/extra-utils/server/jobs'
const res = await getFollowersListPaginationAndSort(following.url, 0, 5, '-createdAt')
const follows = res.body.data as ActorFollow[]
- if (exists === true) {
- expect(res.body.total).to.equal(1)
+ const follow = follows.find(f => {
+ return f.follower.host === follower.host && f.state === 'accepted'
+ })
- expect(follows[ 0 ].follower.host).to.equal(follower.host)
- expect(follows[ 0 ].state).to.equal('accepted')
+ if (exists === true) {
+ expect(follow).to.exist
} else {
- expect(follows.filter(f => f.state === 'accepted')).to.have.lengthOf(0)
+ expect(follow).to.be.undefined
}
}
const res = await getFollowingListPaginationAndSort(follower.url, 0, 5, '-createdAt')
const follows = res.body.data as ActorFollow[]
- if (exists === true) {
- expect(res.body.total).to.equal(1)
+ const follow = follows.find(f => {
+ return f.following.host === following.host && f.state === 'accepted'
+ })
- expect(follows[ 0 ].following.host).to.equal(following.host)
- expect(follows[ 0 ].state).to.equal('accepted')
+ if (exists === true) {
+ expect(follow).to.exist
} else {
- expect(follows.filter(f => f.state === 'accepted')).to.have.lengthOf(0)
+ expect(follow).to.be.undefined
}
}
}
before(async function () {
this.timeout(30000)
- servers = await flushAndRunMultipleServers(2)
+ servers = await flushAndRunMultipleServers(3)
// Get the access tokens
await setAccessTokensToServers(servers)
})
})
+ describe('Auto follow index', function () {
+ const instanceIndexServer = new MockInstancesIndex()
+
+ before(async () => {
+ await instanceIndexServer.initialize()
+ })
+
+ it('Should not auto follow index if the option is not enabled', async function () {
+ this.timeout(30000)
+
+ await wait(5000)
+ await waitJobs(servers)
+
+ await checkFollow(servers[ 0 ], servers[ 1 ], false)
+ await checkFollow(servers[ 1 ], servers[ 0 ], false)
+ })
+
+ it('Should auto follow the index', async function () {
+ this.timeout(30000)
+
+ instanceIndexServer.addInstance(servers[1].host)
+
+ const config = {
+ followings: {
+ instance: {
+ autoFollowIndex: {
+ indexUrl: 'http://localhost:42100',
+ enabled: true
+ }
+ }
+ }
+ }
+ await updateCustomSubConfig(servers[0].url, servers[0].accessToken, config)
+
+ await wait(5000)
+ await waitJobs(servers)
+
+ await checkFollow(servers[ 0 ], servers[ 1 ], true)
+
+ await resetFollows(servers)
+ })
+
+ it('Should follow new added instances in the index but not old ones', async function () {
+ this.timeout(30000)
+
+ instanceIndexServer.addInstance(servers[2].host)
+
+ await wait(5000)
+ await waitJobs(servers)
+
+ await checkFollow(servers[ 0 ], servers[ 1 ], false)
+ await checkFollow(servers[ 0 ], servers[ 2 ], true)
+ })
+ })
+
after(async function () {
await cleanupTests(servers)
})
export * from './videos/videos'
export * from './videos/video-change-ownership'
export * from './feeds/feeds'
+export * from './instances-index/mock-instances-index'
export * from './search/videos'
--- /dev/null
+import * as express from 'express'
+
+export class MockInstancesIndex {
+ private indexInstances: { host: string, createdAt: string }[] = []
+
+ initialize () {
+ return new Promise(res => {
+ const app = express()
+
+ app.use('/', (req: express.Request, res: express.Response, next: express.NextFunction) => {
+ if (process.env.DEBUG) console.log('Receiving request on mocked server %s.', req.url)
+
+ return next()
+ })
+
+ app.get('/api/v1/instances/hosts', (req: express.Request, res: express.Response) => {
+ const since = req.query.since
+
+ const filtered = this.indexInstances.filter(i => {
+ if (!since) return true
+
+ return i.createdAt > since
+ })
+
+ return res.json({
+ total: filtered.length,
+ data: filtered
+ })
+ })
+
+ app.listen(42100, () => res())
+ })
+ }
+
+ addInstance (host: string) {
+ this.indexInstances.push({ host, createdAt: new Date().toISOString() })
+ }
+}
"typeRoots": [ "node_modules/@types", "server/typings" ],
"baseUrl": "./",
"paths": {
- "@server/*": [ "server/*" ]
+ "@server/*": [ "server/*" ],
+ "@shared/*": [ "shared/*" ]
}
},
"exclude": [