From e8cf7c70e94b552133122f5228900db5d2627654 Mon Sep 17 00:00:00 2001 From: Nikitas Date: Thu, 15 May 2025 15:22:27 +0200 Subject: [PATCH 1/4] removed the pgcrypto extension creation (Azure bug) --- .../PostgresRecordManager/PostgresRecordManager.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts b/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts index ebb429af0ac..500fc413fc9 100644 --- a/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts +++ b/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts @@ -227,8 +227,6 @@ class PostgresRecordManager implements RecordManagerInterface { const queryRunner = dataSource.createQueryRunner() const tableName = this.sanitizeTableName(this.tableName) - await queryRunner.query('CREATE EXTENSION IF NOT EXISTS pgcrypto;') - await queryRunner.manager.query(` CREATE TABLE IF NOT EXISTS "${tableName}" ( uuid UUID PRIMARY KEY DEFAULT gen_random_uuid(), From daea556a11e8a39519315ac885476d8e3c02830e Mon Sep 17 00:00:00 2001 From: Nikitas Date: Mon, 8 Jun 2026 18:53:57 +0200 Subject: [PATCH 2/4] feat: add configurable BullMQ worker stall options BullMQ's default stall detection (30s interval, 1 retry) causes long-running jobs like document store upserts to be silently retried, resulting in duplicate vector embeddings in the database. Adds two new environment variables to control this behaviour: - WORKER_STALLED_INTERVAL (ms): how often BullMQ checks for stalled jobs. Defaults to 300000 (5 min) instead of BullMQ's 30s default. - WORKER_MAX_STALLED_COUNT: how many times a stalled job is retried before failing. Defaults to 0 (fail immediately, no retry) instead of BullMQ's default of 1. Setting WORKER_MAX_STALLED_COUNT=0 prevents duplicate processing on retry while still allowing the job to be re-triggered manually if needed. --- packages/server/src/queue/BaseQueue.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/server/src/queue/BaseQueue.ts b/packages/server/src/queue/BaseQueue.ts index 87b3a9faef7..eb51e2bab73 100644 --- a/packages/server/src/queue/BaseQueue.ts +++ b/packages/server/src/queue/BaseQueue.ts @@ -6,6 +6,8 @@ const QUEUE_REDIS_EVENT_STREAM_MAX_LEN = process.env.QUEUE_REDIS_EVENT_STREAM_MA ? parseInt(process.env.QUEUE_REDIS_EVENT_STREAM_MAX_LEN) : 10000 const WORKER_CONCURRENCY = process.env.WORKER_CONCURRENCY ? parseInt(process.env.WORKER_CONCURRENCY) : 100000 +const WORKER_STALLED_INTERVAL = process.env.WORKER_STALLED_INTERVAL ? parseInt(process.env.WORKER_STALLED_INTERVAL) : 300000 +const WORKER_MAX_STALLED_COUNT = process.env.WORKER_MAX_STALLED_COUNT ? parseInt(process.env.WORKER_MAX_STALLED_COUNT) : 0 const REMOVE_ON_AGE = process.env.REMOVE_ON_AGE ? parseInt(process.env.REMOVE_ON_AGE) : -1 const REMOVE_ON_COUNT = process.env.REMOVE_ON_COUNT ? parseInt(process.env.REMOVE_ON_COUNT) : -1 @@ -81,7 +83,9 @@ export abstract class BaseQueue { }, { connection: this.connection, - concurrency + concurrency, + stalledInterval: WORKER_STALLED_INTERVAL, + maxStalledCount: WORKER_MAX_STALLED_COUNT } ) From 9c3fd41578787d8ad5c120de56c17bd29976d5db Mon Sep 17 00:00:00 2001 From: Nikitas Date: Mon, 8 Jun 2026 18:56:58 +0200 Subject: [PATCH 3/4] refactor: add parseInt radix and fail-fast validation for stall env vars --- packages/server/src/queue/BaseQueue.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/server/src/queue/BaseQueue.ts b/packages/server/src/queue/BaseQueue.ts index eb51e2bab73..18dcc42a3db 100644 --- a/packages/server/src/queue/BaseQueue.ts +++ b/packages/server/src/queue/BaseQueue.ts @@ -6,8 +6,12 @@ const QUEUE_REDIS_EVENT_STREAM_MAX_LEN = process.env.QUEUE_REDIS_EVENT_STREAM_MA ? parseInt(process.env.QUEUE_REDIS_EVENT_STREAM_MAX_LEN) : 10000 const WORKER_CONCURRENCY = process.env.WORKER_CONCURRENCY ? parseInt(process.env.WORKER_CONCURRENCY) : 100000 -const WORKER_STALLED_INTERVAL = process.env.WORKER_STALLED_INTERVAL ? parseInt(process.env.WORKER_STALLED_INTERVAL) : 300000 -const WORKER_MAX_STALLED_COUNT = process.env.WORKER_MAX_STALLED_COUNT ? parseInt(process.env.WORKER_MAX_STALLED_COUNT) : 0 +const WORKER_STALLED_INTERVAL = process.env.WORKER_STALLED_INTERVAL ? parseInt(process.env.WORKER_STALLED_INTERVAL, 10) : 300000 +const WORKER_MAX_STALLED_COUNT = process.env.WORKER_MAX_STALLED_COUNT ? parseInt(process.env.WORKER_MAX_STALLED_COUNT, 10) : 0 +if (isNaN(WORKER_STALLED_INTERVAL) || WORKER_STALLED_INTERVAL <= 0) + throw new Error(`Invalid WORKER_STALLED_INTERVAL: "${process.env.WORKER_STALLED_INTERVAL}". Must be a positive integer (ms).`) +if (isNaN(WORKER_MAX_STALLED_COUNT) || WORKER_MAX_STALLED_COUNT < 0) + throw new Error(`Invalid WORKER_MAX_STALLED_COUNT: "${process.env.WORKER_MAX_STALLED_COUNT}". Must be a non-negative integer.`) const REMOVE_ON_AGE = process.env.REMOVE_ON_AGE ? parseInt(process.env.REMOVE_ON_AGE) : -1 const REMOVE_ON_COUNT = process.env.REMOVE_ON_COUNT ? parseInt(process.env.REMOVE_ON_COUNT) : -1 From 7f0e7184bde57d4524589f409a8fe027a2cdff06 Mon Sep 17 00:00:00 2001 From: Nikitas Papadopoulos Date: Tue, 9 Jun 2026 08:33:34 +0200 Subject: [PATCH 4/4] Update packages/server/src/queue/BaseQueue.ts Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- packages/server/src/queue/BaseQueue.ts | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/packages/server/src/queue/BaseQueue.ts b/packages/server/src/queue/BaseQueue.ts index 18dcc42a3db..4964f22ac0a 100644 --- a/packages/server/src/queue/BaseQueue.ts +++ b/packages/server/src/queue/BaseQueue.ts @@ -6,12 +6,14 @@ const QUEUE_REDIS_EVENT_STREAM_MAX_LEN = process.env.QUEUE_REDIS_EVENT_STREAM_MA ? parseInt(process.env.QUEUE_REDIS_EVENT_STREAM_MAX_LEN) : 10000 const WORKER_CONCURRENCY = process.env.WORKER_CONCURRENCY ? parseInt(process.env.WORKER_CONCURRENCY) : 100000 -const WORKER_STALLED_INTERVAL = process.env.WORKER_STALLED_INTERVAL ? parseInt(process.env.WORKER_STALLED_INTERVAL, 10) : 300000 -const WORKER_MAX_STALLED_COUNT = process.env.WORKER_MAX_STALLED_COUNT ? parseInt(process.env.WORKER_MAX_STALLED_COUNT, 10) : 0 -if (isNaN(WORKER_STALLED_INTERVAL) || WORKER_STALLED_INTERVAL <= 0) - throw new Error(`Invalid WORKER_STALLED_INTERVAL: "${process.env.WORKER_STALLED_INTERVAL}". Must be a positive integer (ms).`) -if (isNaN(WORKER_MAX_STALLED_COUNT) || WORKER_MAX_STALLED_COUNT < 0) - throw new Error(`Invalid WORKER_MAX_STALLED_COUNT: "${process.env.WORKER_MAX_STALLED_COUNT}". Must be a non-negative integer.`) +const WORKER_STALLED_INTERVAL = parseInt(process.env.WORKER_STALLED_INTERVAL ?? '300000', 10) +const WORKER_MAX_STALLED_COUNT = parseInt(process.env.WORKER_MAX_STALLED_COUNT ?? '0', 10) +if (isNaN(WORKER_STALLED_INTERVAL) || WORKER_STALLED_INTERVAL <= 0) { + throw new Error('Invalid WORKER_STALLED_INTERVAL: "' + process.env.WORKER_STALLED_INTERVAL + '". Must be a positive integer (ms).') +} +if (isNaN(WORKER_MAX_STALLED_COUNT) || WORKER_MAX_STALLED_COUNT < 0) { + throw new Error('Invalid WORKER_MAX_STALLED_COUNT: "' + process.env.WORKER_MAX_STALLED_COUNT + '". Must be a non-negative integer.') +} const REMOVE_ON_AGE = process.env.REMOVE_ON_AGE ? parseInt(process.env.REMOVE_ON_AGE) : -1 const REMOVE_ON_COUNT = process.env.REMOVE_ON_COUNT ? parseInt(process.env.REMOVE_ON_COUNT) : -1