diff --git a/packages/server/src/queue/BaseQueue.ts b/packages/server/src/queue/BaseQueue.ts index 87b3a9faef7..4964f22ac0a 100644 --- a/packages/server/src/queue/BaseQueue.ts +++ b/packages/server/src/queue/BaseQueue.ts @@ -6,6 +6,14 @@ const QUEUE_REDIS_EVENT_STREAM_MAX_LEN = process.env.QUEUE_REDIS_EVENT_STREAM_MA ? parseInt(process.env.QUEUE_REDIS_EVENT_STREAM_MAX_LEN) : 10000 const WORKER_CONCURRENCY = process.env.WORKER_CONCURRENCY ? parseInt(process.env.WORKER_CONCURRENCY) : 100000 +const WORKER_STALLED_INTERVAL = parseInt(process.env.WORKER_STALLED_INTERVAL ?? '300000', 10) +const WORKER_MAX_STALLED_COUNT = parseInt(process.env.WORKER_MAX_STALLED_COUNT ?? '0', 10) +if (isNaN(WORKER_STALLED_INTERVAL) || WORKER_STALLED_INTERVAL <= 0) { + throw new Error('Invalid WORKER_STALLED_INTERVAL: "' + process.env.WORKER_STALLED_INTERVAL + '". Must be a positive integer (ms).') +} +if (isNaN(WORKER_MAX_STALLED_COUNT) || WORKER_MAX_STALLED_COUNT < 0) { + throw new Error('Invalid WORKER_MAX_STALLED_COUNT: "' + process.env.WORKER_MAX_STALLED_COUNT + '". Must be a non-negative integer.') +} const REMOVE_ON_AGE = process.env.REMOVE_ON_AGE ? parseInt(process.env.REMOVE_ON_AGE) : -1 const REMOVE_ON_COUNT = process.env.REMOVE_ON_COUNT ? parseInt(process.env.REMOVE_ON_COUNT) : -1 @@ -81,7 +89,9 @@ export abstract class BaseQueue { }, { connection: this.connection, - concurrency + concurrency, + stalledInterval: WORKER_STALLED_INTERVAL, + maxStalledCount: WORKER_MAX_STALLED_COUNT } )