From ba3cf1c5056ec433af8cc1017b338012d71e8371 Mon Sep 17 00:00:00 2001 From: Taly Date: Sat, 14 Feb 2026 16:35:15 +0300 Subject: [PATCH 1/2] Handle DB duplicate key without recursion Replace the previous recursive reprocessing on database duplicate-key errors with a safer flow: log the duplicate, invalidate the event cache, wait briefly, fetch the event inserted by the competing worker and treat it as a repetition. If the event cannot be read back, raise a DatabaseReadWriteError. This avoids recursive calls to handle(), ensures fresh DB reads (cache coherence), and adds logging for diagnostics; repetition processing is resumed using the fetched event. --- workers/grouper/src/index.ts | 38 ++++++++++++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 52c71ca8..7d4d1500 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -223,17 +223,47 @@ export default class GrouperWorker extends Worker { } catch (e) { /** * If we caught Database duplication error, then another worker thread has already saved it to the database - * and we need to process this event as repetition + * Clear the cache and fetch the event that was just inserted, then process it as a repetition */ if (e.code?.toString() === DB_DUPLICATE_KEY_ERROR) { - await this.handle(task); + this.logger.info(`[handle] Duplicate key detected for groupHash=${uniqueEventHash}, fetching created event as repetition`); - return; + const eventCacheKey = await this.getEventCacheKey(task.projectId, uniqueEventHash); + + /** + * Invalidate cache to force fresh fetch from database + */ + this.cache.del(eventCacheKey); + + /** + * Fetch the event that was just inserted by the competing worker + * Add small delay to ensure the event is persisted + */ + await new Promise(resolve => setTimeout(resolve, 10)); + + existedEvent = await this.getEvent(task.projectId, uniqueEventHash); + + if (!existedEvent) { + this.logger.error(`[handle] Event not found after duplicate key error for groupHash=${uniqueEventHash}`); + throw new DatabaseReadWriteError('Event not found after duplicate key error'); + } + + this.logger.info(`[handle] Successfully fetched event after duplicate key for groupHash=${uniqueEventHash}`); + + /** + * Now continue processing as if this was not the first occurrence + * This avoids recursion and properly handles the event as a repetition + */ } else { throw e; } } - } else { + } + + /** + * Handle repetition processing when duplicate key was detected + */ + if (!isFirstOccurrence && existedEvent) { const [incrementAffectedUsers, shouldIncrementDailyAffectedUsers] = await this.shouldIncrementAffectedUsers(task, existedEvent); incrementDailyAffectedUsers = shouldIncrementDailyAffectedUsers; From c3af0612377d366cbd40c080385a353aa849e303 Mon Sep 17 00:00:00 2001 From: Taly Date: Sat, 14 Feb 2026 16:39:55 +0300 Subject: [PATCH 2/2] Update index.ts --- workers/grouper/src/index.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 7d4d1500..19b37ca0 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -53,6 +53,11 @@ const DB_DUPLICATE_KEY_ERROR = '11000'; */ const MAX_CODE_LINE_LENGTH = 140; +/** + * Delay in milliseconds to wait for duplicate key event to be persisted to database + */ +const DUPLICATE_KEY_RETRY_DELAY_MS = 10; + /** * Worker for handling Javascript events */ @@ -239,7 +244,7 @@ export default class GrouperWorker extends Worker { * Fetch the event that was just inserted by the competing worker * Add small delay to ensure the event is persisted */ - await new Promise(resolve => setTimeout(resolve, 10)); + await new Promise(resolve => setTimeout(resolve, DUPLICATE_KEY_RETRY_DELAY_MS)); existedEvent = await this.getEvent(task.projectId, uniqueEventHash);