Skip to content

Commit 94e138b

Browse files
committed
Only engage the ttl system when a run is first enqueued, no longer on re-enqueues
1 parent 5c9689c commit 94e138b

File tree

3 files changed

+177
-2
lines changed

3 files changed

+177
-2
lines changed

internal-packages/run-engine/src/engine/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -777,6 +777,7 @@ export class RunEngine {
777777
runnerId,
778778
tx: prisma,
779779
skipRunLock: true,
780+
includeTtl: true,
780781
});
781782
}
782783

internal-packages/run-engine/src/engine/systems/enqueueSystem.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ export class EnqueueSystem {
3535
workerId,
3636
runnerId,
3737
skipRunLock,
38+
includeTtl = false,
3839
}: {
3940
run: TaskRun;
4041
env: MinimalAuthenticatedEnvironment;
@@ -54,6 +55,8 @@ export class EnqueueSystem {
5455
workerId?: string;
5556
runnerId?: string;
5657
skipRunLock?: boolean;
58+
/** When true, include TTL in the queued message (only for first enqueue from trigger). Default false. */
59+
includeTtl?: boolean;
5760
}) {
5861
const prisma = tx ?? this.$.prisma;
5962

@@ -82,9 +85,10 @@ export class EnqueueSystem {
8285

8386
const timestamp = (run.queueTimestamp ?? run.createdAt).getTime() - run.priorityMs;
8487

85-
// Calculate TTL expiration timestamp if the run has a TTL
88+
// Include TTL only when explicitly requested (first enqueue from trigger).
89+
// Re-enqueues (waitpoint, checkpoint, delayed, pending version) must not add TTL.
8690
let ttlExpiresAt: number | undefined;
87-
if (run.ttl) {
91+
if (includeTtl && run.ttl) {
8892
const expireAt = parseNaturalLanguageDuration(run.ttl);
8993
if (expireAt) {
9094
ttlExpiresAt = expireAt.getTime();

internal-packages/run-engine/src/engine/tests/ttl.test.ts

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,176 @@ describe("RunEngine ttl", () => {
132132
}
133133
});
134134

135+
containerTest("First enqueue from trigger includes ttlExpiresAt in message", async ({
136+
prisma,
137+
redisOptions,
138+
}) => {
139+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
140+
141+
const engine = new RunEngine({
142+
prisma,
143+
worker: {
144+
redis: redisOptions,
145+
workers: 1,
146+
tasksPerWorker: 10,
147+
pollIntervalMs: 100,
148+
},
149+
queue: {
150+
redis: redisOptions,
151+
processWorkerQueueDebounceMs: 50,
152+
masterQueueConsumersDisabled: true,
153+
ttlSystem: {
154+
pollIntervalMs: 100,
155+
batchSize: 10,
156+
},
157+
},
158+
runLock: {
159+
redis: redisOptions,
160+
},
161+
machines: {
162+
defaultMachine: "small-1x",
163+
machines: {
164+
"small-1x": {
165+
name: "small-1x" as const,
166+
cpu: 0.5,
167+
memory: 0.5,
168+
centsPerMs: 0.0001,
169+
},
170+
},
171+
baseCostInCents: 0.0001,
172+
},
173+
tracer: trace.getTracer("test", "0.0.0"),
174+
});
175+
176+
try {
177+
const taskIdentifier = "test-task";
178+
await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier);
179+
180+
const run = await engine.trigger(
181+
{
182+
number: 1,
183+
friendlyId: "run_ttlmsg1",
184+
environment: authenticatedEnvironment,
185+
taskIdentifier,
186+
payload: "{}",
187+
payloadType: "application/json",
188+
context: {},
189+
traceContext: {},
190+
traceId: "t_ttl",
191+
spanId: "s_ttl",
192+
workerQueue: "main",
193+
queue: "task/test-task",
194+
isTest: false,
195+
tags: [],
196+
ttl: "1s",
197+
},
198+
prisma
199+
);
200+
201+
const message = await engine.runQueue.readMessage(
202+
authenticatedEnvironment.organization.id,
203+
run.id
204+
);
205+
assertNonNullable(message);
206+
expect(message.ttlExpiresAt).toBeDefined();
207+
expect(typeof message.ttlExpiresAt).toBe("number");
208+
} finally {
209+
await engine.quit();
210+
}
211+
});
212+
213+
containerTest("Re-enqueue with includeTtl false does not set ttlExpiresAt", async ({
214+
prisma,
215+
redisOptions,
216+
}) => {
217+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
218+
219+
const engine = new RunEngine({
220+
prisma,
221+
worker: {
222+
redis: redisOptions,
223+
workers: 1,
224+
tasksPerWorker: 10,
225+
pollIntervalMs: 100,
226+
},
227+
queue: {
228+
redis: redisOptions,
229+
processWorkerQueueDebounceMs: 50,
230+
masterQueueConsumersDisabled: true,
231+
ttlSystem: {
232+
pollIntervalMs: 100,
233+
batchSize: 10,
234+
},
235+
},
236+
runLock: {
237+
redis: redisOptions,
238+
},
239+
machines: {
240+
defaultMachine: "small-1x",
241+
machines: {
242+
"small-1x": {
243+
name: "small-1x" as const,
244+
cpu: 0.5,
245+
memory: 0.5,
246+
centsPerMs: 0.0001,
247+
},
248+
},
249+
baseCostInCents: 0.0001,
250+
},
251+
tracer: trace.getTracer("test", "0.0.0"),
252+
});
253+
254+
try {
255+
const taskIdentifier = "test-task";
256+
await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier);
257+
258+
const run = await engine.trigger(
259+
{
260+
number: 1,
261+
friendlyId: "run_reenq01",
262+
environment: authenticatedEnvironment,
263+
taskIdentifier,
264+
payload: "{}",
265+
payloadType: "application/json",
266+
context: {},
267+
traceContext: {},
268+
traceId: "t_re",
269+
spanId: "s_re",
270+
workerQueue: "main",
271+
queue: "task/test-task",
272+
isTest: false,
273+
tags: [],
274+
ttl: "1s",
275+
},
276+
prisma
277+
);
278+
279+
const messageAfterTrigger = await engine.runQueue.readMessage(
280+
authenticatedEnvironment.organization.id,
281+
run.id
282+
);
283+
assertNonNullable(messageAfterTrigger);
284+
expect(messageAfterTrigger.ttlExpiresAt).toBeDefined();
285+
286+
await engine.enqueueSystem.enqueueRun({
287+
run,
288+
env: authenticatedEnvironment,
289+
tx: prisma,
290+
skipRunLock: true,
291+
includeTtl: false,
292+
});
293+
294+
const messageAfterReenqueue = await engine.runQueue.readMessage(
295+
authenticatedEnvironment.organization.id,
296+
run.id
297+
);
298+
assertNonNullable(messageAfterReenqueue);
299+
expect(messageAfterReenqueue.ttlExpiresAt).toBeUndefined();
300+
} finally {
301+
await engine.quit();
302+
}
303+
});
304+
135305
containerTest("Multiple runs expiring via TTL batch", async ({ prisma, redisOptions }) => {
136306
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
137307

0 commit comments

Comments
 (0)