Skip to content

Commit fcb3813

Browse files
committed
Implement batch support to redis worker and use it in the TTL worker to batch expire runs
1 parent 5bfeac4 commit fcb3813

File tree

9 files changed

+386
-203
lines changed

9 files changed

+386
-203
lines changed

apps/webapp/app/env.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,9 @@ const EnvironmentSchema = z
604604
RUN_ENGINE_TTL_SYSTEM_SHARD_COUNT: z.coerce.number().int().optional(),
605605
RUN_ENGINE_TTL_SYSTEM_POLL_INTERVAL_MS: z.coerce.number().int().default(1_000),
606606
RUN_ENGINE_TTL_SYSTEM_BATCH_SIZE: z.coerce.number().int().default(100),
607+
RUN_ENGINE_TTL_WORKER_CONCURRENCY: z.coerce.number().int().default(1),
608+
RUN_ENGINE_TTL_WORKER_BATCH_MAX_SIZE: z.coerce.number().int().default(50),
609+
RUN_ENGINE_TTL_WORKER_BATCH_MAX_WAIT_MS: z.coerce.number().int().default(5_000),
607610

608611
/** Optional maximum TTL for all runs (e.g. "14d"). If set, runs without an explicit TTL
609612
* will use this as their TTL, and runs with a TTL larger than this will be clamped. */

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ function createRunEngine() {
8585
shardCount: env.RUN_ENGINE_TTL_SYSTEM_SHARD_COUNT,
8686
pollIntervalMs: env.RUN_ENGINE_TTL_SYSTEM_POLL_INTERVAL_MS,
8787
batchSize: env.RUN_ENGINE_TTL_SYSTEM_BATCH_SIZE,
88+
workerConcurrency: env.RUN_ENGINE_TTL_WORKER_CONCURRENCY,
89+
batchMaxSize: env.RUN_ENGINE_TTL_WORKER_BATCH_MAX_SIZE,
90+
batchMaxWaitMs: env.RUN_ENGINE_TTL_WORKER_BATCH_MAX_WAIT_MS,
8891
},
8992
},
9093
runLock: {

apps/webapp/app/v3/services/batchTriggerTask.server.ts

Lines changed: 0 additions & 152 deletions
This file was deleted.

internal-packages/run-engine/src/engine/index.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,15 @@ import {
7575
RunEngineOptions,
7676
TriggerParams,
7777
} from "./types.js";
78-
import { ttlWorkerCatalog } from "./ttlWorkerCatalog.js";
78+
import { createTtlWorkerCatalog } from "./ttlWorkerCatalog.js";
7979
import { workerCatalog } from "./workerCatalog.js";
8080
import pMap from "p-map";
8181

8282
export class RunEngine {
8383
private runLockRedis: Redis;
8484
private runLock: RunLocker;
8585
private worker: EngineWorker;
86-
private ttlWorker: Worker<typeof ttlWorkerCatalog>;
86+
private ttlWorker: Worker<ReturnType<typeof createTtlWorkerCatalog>>;
8787
private logger: Logger;
8888
private tracer: Tracer;
8989
private meter: Meter;
@@ -341,21 +341,27 @@ export class RunEngine {
341341
waitpointSystem: this.waitpointSystem,
342342
});
343343

344+
const ttlWorkerCatalog = createTtlWorkerCatalog({
345+
visibilityTimeoutMs: options.queue?.ttlSystem?.visibilityTimeoutMs,
346+
batchMaxSize: options.queue?.ttlSystem?.batchMaxSize,
347+
batchMaxWaitMs: options.queue?.ttlSystem?.batchMaxWaitMs,
348+
});
349+
344350
this.ttlWorker = new Worker({
345351
name: "ttl-expiration",
346352
redisOptions: {
347353
...options.queue.redis,
348354
keyPrefix: `${options.queue.redis.keyPrefix}runqueue:ttl-worker:`,
349355
},
350356
catalog: ttlWorkerCatalog,
351-
concurrency: { limit: 20 },
357+
concurrency: { limit: options.queue?.ttlSystem?.workerConcurrency ?? 1 },
352358
pollIntervalMs: options.worker.pollIntervalMs ?? 1000,
353359
immediatePollIntervalMs: options.worker.immediatePollIntervalMs ?? 100,
354360
shutdownTimeoutMs: options.worker.shutdownTimeoutMs ?? 10_000,
355361
logger: new Logger("RunEngineTtlWorker", options.logLevel ?? "info"),
356362
jobs: {
357-
expireTtlRun: async ({ payload }) => {
358-
await this.ttlSystem.expireRunsBatch([payload.runId]);
363+
expireTtlRun: async (items) => {
364+
await this.ttlSystem.expireRunsBatch(items.map((i) => i.payload.runId));
359365
},
360366
},
361367
});

internal-packages/run-engine/src/engine/systems/ttlSystem.ts

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ export class TtlSystem {
171171
const skipped: { runId: string; reason: string }[] = [];
172172

173173
// Fetch all runs in a single query (no snapshot data needed)
174-
const runs = await this.$.prisma.taskRun.findMany({
174+
const runs = await this.$.readOnlyPrisma.taskRun.findMany({
175175
where: { id: { in: runIds } },
176176
select: {
177177
id: true,
@@ -182,13 +182,9 @@ export class TtlSystem {
182182
taskEventStore: true,
183183
createdAt: true,
184184
associatedWaitpoint: { select: { id: true } },
185-
runtimeEnvironment: {
186-
select: {
187-
id: true,
188-
organizationId: true,
189-
projectId: true,
190-
},
191-
},
185+
organizationId: true,
186+
projectId: true,
187+
runtimeEnvironmentId: true,
192188
},
193189
});
194190

@@ -259,6 +255,11 @@ export class TtlSystem {
259255
});
260256
}
261257

258+
// This should really never happen
259+
if (!run.organizationId) {
260+
return;
261+
}
262+
262263
// Emit event
263264
this.$.eventBus.emit("runExpired", {
264265
run: {
@@ -273,9 +274,9 @@ export class TtlSystem {
273274
status: "EXPIRED" as TaskRunStatus,
274275
},
275276
time: now,
276-
organization: { id: run.runtimeEnvironment.organizationId },
277-
project: { id: run.runtimeEnvironment.projectId },
278-
environment: { id: run.runtimeEnvironment.id },
277+
organization: { id: run.organizationId },
278+
project: { id: run.projectId },
279+
environment: { id: run.runtimeEnvironmentId },
279280
});
280281

281282
expired.push(run.id);

internal-packages/run-engine/src/engine/tests/ttl.test.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -733,8 +733,8 @@ describe("RunEngine ttl", () => {
733733
expect(executionData2.run.status).toBe("PENDING");
734734

735735
// Now wait for the TTL consumer to poll and expire the run
736-
// (pollIntervalMs is 5000, so we wait 7s to allow time for the poll + processing)
737-
await setTimeout(7_000);
736+
// (pollIntervalMs is 5000 for TTL scan + up to 5000ms batch maxWaitMs + processing)
737+
await setTimeout(13_000);
738738

739739
// The TTL consumer should have found and expired the run
740740
expect(expiredEvents.length).toBe(1);
@@ -848,7 +848,8 @@ describe("RunEngine ttl", () => {
848848
authenticatedEnvironment.id,
849849
10
850850
);
851-
await setTimeout(7_000);
851+
// Wait for TTL scan (5000ms) + batch maxWaitMs (5000ms) + processing buffer
852+
await setTimeout(13_000);
852853

853854
const expiredRun = await prisma.taskRun.findUnique({
854855
where: { id: run.id },
Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,26 @@
11
import { z } from "zod";
22

3-
export const ttlWorkerCatalog = {
4-
expireTtlRun: {
5-
schema: z.object({
6-
runId: z.string(),
7-
orgId: z.string(),
8-
queueKey: z.string(),
9-
}),
10-
visibilityTimeoutMs: 30_000,
11-
},
3+
export type TtlWorkerCatalogOptions = {
4+
visibilityTimeoutMs?: number;
5+
batchMaxSize?: number;
6+
batchMaxWaitMs?: number;
127
};
8+
9+
export function createTtlWorkerCatalog(options?: TtlWorkerCatalogOptions) {
10+
return {
11+
expireTtlRun: {
12+
schema: z.object({
13+
runId: z.string(),
14+
orgId: z.string(),
15+
queueKey: z.string(),
16+
}),
17+
visibilityTimeoutMs: options?.visibilityTimeoutMs ?? 120_000,
18+
batch: {
19+
maxSize: options?.batchMaxSize ?? 50,
20+
maxWaitMs: options?.batchMaxWaitMs ?? 5_000,
21+
},
22+
},
23+
};
24+
}
25+
26+
export const ttlWorkerCatalog = createTtlWorkerCatalog();

internal-packages/run-engine/src/engine/types.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,14 @@ export type RunEngineOptions = {
7373
batchSize?: number;
7474
/** Whether TTL consumers are disabled (default: false) */
7575
disabled?: boolean;
76-
/** Visibility timeout for TTL worker jobs (ms, default: 30000) */
76+
/** Visibility timeout for TTL worker jobs (ms, default: 120000) */
7777
visibilityTimeoutMs?: number;
78+
/** Concurrency limit for the TTL redis-worker (default: 1) */
79+
workerConcurrency?: number;
80+
/** Max items to accumulate before flushing a batch (default: 500) */
81+
batchMaxSize?: number;
82+
/** Max time (ms) to wait for more items before flushing a batch (default: 5000) */
83+
batchMaxWaitMs?: number;
7884
};
7985
};
8086
runLock: {

0 commit comments

Comments
 (0)