diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 52c71ca8..19b37ca0 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -53,6 +53,11 @@ const DB_DUPLICATE_KEY_ERROR = '11000'; */ const MAX_CODE_LINE_LENGTH = 140; +/** + * Delay in milliseconds to wait for duplicate key event to be persisted to database + */ +const DUPLICATE_KEY_RETRY_DELAY_MS = 10; + /** * Worker for handling Javascript events */ @@ -223,17 +228,47 @@ export default class GrouperWorker extends Worker { } catch (e) { /** * If we caught Database duplication error, then another worker thread has already saved it to the database - * and we need to process this event as repetition + * Clear the cache and fetch the event that was just inserted, then process it as a repetition */ if (e.code?.toString() === DB_DUPLICATE_KEY_ERROR) { - await this.handle(task); + this.logger.info(`[handle] Duplicate key detected for groupHash=${uniqueEventHash}, fetching created event as repetition`); + + const eventCacheKey = await this.getEventCacheKey(task.projectId, uniqueEventHash); - return; + /** + * Invalidate cache to force fresh fetch from database + */ + this.cache.del(eventCacheKey); + + /** + * Fetch the event that was just inserted by the competing worker + * Add small delay to ensure the event is persisted + */ + await new Promise(resolve => setTimeout(resolve, DUPLICATE_KEY_RETRY_DELAY_MS)); + + existedEvent = await this.getEvent(task.projectId, uniqueEventHash); + + if (!existedEvent) { + this.logger.error(`[handle] Event not found after duplicate key error for groupHash=${uniqueEventHash}`); + throw new DatabaseReadWriteError('Event not found after duplicate key error'); + } + + this.logger.info(`[handle] Successfully fetched event after duplicate key for groupHash=${uniqueEventHash}`); + + /** + * Now continue processing as if this was not the first occurrence + * This avoids recursion and properly handles the event as a repetition + */ } else { throw e; } } - } else { + } + + /** + * Handle repetition processing when duplicate key was detected + */ + if (!isFirstOccurrence && existedEvent) { const [incrementAffectedUsers, shouldIncrementDailyAffectedUsers] = await this.shouldIncrementAffectedUsers(task, existedEvent); incrementDailyAffectedUsers = shouldIncrementDailyAffectedUsers;