'video-file': 1,
'email': 5
}
-const BROADCAST_CONCURRENCY = 5 // How many requests in parallel we do in activitypub-http-broadcast job
-// 2 days
-const JOB_COMPLETED_LIFETIME = 60000 * 60 * 24 * 2
+const BROADCAST_CONCURRENCY = 10 // How many requests in parallel we do in activitypub-http-broadcast job
+const JOB_REQUEST_TIMEOUT = 3000 // 3 seconds
+const JOB_REQUEST_TTL = 60000 * 10 // 10 minutes
+const JOB_COMPLETED_LIFETIME = 60000 * 60 * 24 * 2 // 2 days
// 1 hour
let SCHEDULER_INTERVAL = 60000 * 60
VIDEO_RATE_TYPES,
VIDEO_MIMETYPE_EXT,
VIDEO_TRANSCODING_FPS,
+ JOB_REQUEST_TIMEOUT,
+ JOB_REQUEST_TTL,
USER_PASSWORD_RESET_LIFETIME,
IMAGE_MIMETYPE_EXT,
SCHEDULER_INTERVAL,
import { doRequest } from '../../../helpers/requests'
import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
-import { BROADCAST_CONCURRENCY } from '../../../initializers'
+import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers'
export type ActivitypubHttpBroadcastPayload = {
uris: string[]
method: 'POST',
uri: '',
json: body,
- httpSignature: httpSignatureOptions
+ httpSignature: httpSignatureOptions,
+ timeout: JOB_REQUEST_TIMEOUT
}
const badUrls: string[] = []
import * as kue from 'kue'
import { logger } from '../../../helpers/logger'
import { doRequest } from '../../../helpers/requests'
-import { ACTIVITY_PUB } from '../../../initializers'
+import { ACTIVITY_PUB, JOB_REQUEST_TIMEOUT } from '../../../initializers'
import { processActivities } from '../../activitypub/process'
import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast'
method: 'GET',
uri: '',
json: true,
- activityPub: true
+ activityPub: true,
+ timeout: JOB_REQUEST_TIMEOUT
}
for (const uri of payload.uris) {
import { doRequest } from '../../../helpers/requests'
import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
+import { JOB_REQUEST_TIMEOUT } from '../../../initializers'
export type ActivitypubHttpUnicastPayload = {
uri: string
method: 'POST',
uri,
json: body,
- httpSignature: httpSignatureOptions
+ httpSignature: httpSignatureOptions,
+ timeout: JOB_REQUEST_TIMEOUT
}
try {
import * as kue from 'kue'
import { JobState, JobType } from '../../../shared/models'
import { logger } from '../../helpers/logger'
-import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers'
+import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers'
import { Redis } from '../redis'
import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
'email': processEmail
}
+const jobsWithTLL: JobType[] = [
+ 'activitypub-http-broadcast',
+ 'activitypub-http-unicast',
+ 'activitypub-http-fetcher',
+ 'activitypub-follow'
+]
+
class JobQueue {
private static instance: JobQueue
createJob (obj: CreateJobArgument, priority = 'normal') {
return new Promise((res, rej) => {
- this.jobQueue
+ let job = this.jobQueue
.create(obj.type, obj.payload)
.priority(priority)
.attempts(JOB_ATTEMPTS[obj.type])
.backoff({ delay: 60 * 1000, type: 'exponential' })
- .save(err => {
- if (err) return rej(err)
- return res()
- })
+ if (jobsWithTLL.indexOf(obj.type) !== -1) {
+ job = job.ttl(JOB_REQUEST_TTL)
+ }
+
+ return job.save(err => {
+ if (err) return rej(err)
+
+ return res()
+ })
})
}