Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion packages/server/src/queue/BaseQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ const QUEUE_REDIS_EVENT_STREAM_MAX_LEN = process.env.QUEUE_REDIS_EVENT_STREAM_MA
? parseInt(process.env.QUEUE_REDIS_EVENT_STREAM_MAX_LEN)
: 10000
const WORKER_CONCURRENCY = process.env.WORKER_CONCURRENCY ? parseInt(process.env.WORKER_CONCURRENCY) : 100000
const WORKER_STALLED_INTERVAL = parseInt(process.env.WORKER_STALLED_INTERVAL ?? '300000', 10)
const WORKER_MAX_STALLED_COUNT = parseInt(process.env.WORKER_MAX_STALLED_COUNT ?? '0', 10)
if (isNaN(WORKER_STALLED_INTERVAL) || WORKER_STALLED_INTERVAL <= 0) {
throw new Error('Invalid WORKER_STALLED_INTERVAL: "' + process.env.WORKER_STALLED_INTERVAL + '". Must be a positive integer (ms).')
}
if (isNaN(WORKER_MAX_STALLED_COUNT) || WORKER_MAX_STALLED_COUNT < 0) {
throw new Error('Invalid WORKER_MAX_STALLED_COUNT: "' + process.env.WORKER_MAX_STALLED_COUNT + '". Must be a non-negative integer.')
}
const REMOVE_ON_AGE = process.env.REMOVE_ON_AGE ? parseInt(process.env.REMOVE_ON_AGE) : -1
const REMOVE_ON_COUNT = process.env.REMOVE_ON_COUNT ? parseInt(process.env.REMOVE_ON_COUNT) : -1

Expand Down Expand Up @@ -81,7 +89,9 @@ export abstract class BaseQueue {
},
{
connection: this.connection,
concurrency
concurrency,
stalledInterval: WORKER_STALLED_INTERVAL,
maxStalledCount: WORKER_MAX_STALLED_COUNT
}
)

Expand Down
Loading