Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/ninety-cows-lay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/sdk": patch
---

feat(sdk): Support debouncing runs when triggering with new debounce options
6 changes: 6 additions & 0 deletions .cursor/rules/migrations.mdc
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
description: how to create and apply database migrations
alwaysApply: false
---

Follow our [migrations.md](mdc:ai/references/migrations.md) guide for how to create and apply database migrations.
121 changes: 121 additions & 0 deletions ai/references/migrations.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
## Creating and applying migrations

We use prisma migrations to manage the database schema. Please follow the following steps when editing the `internal-packages/database/prisma/schema.prisma` file:

Edit the `schema.prisma` file to add or modify the schema.

Create a new migration file but don't apply it yet:

```bash
cd internal-packages/database
pnpm run db:migrate:dev:create --name "add_new_column_to_table"
```

The migration file will be created in the `prisma/migrations` directory, but it will have a bunch of edits to the schema that are not needed and will need to be removed before we can apply the migration. Here's an example of what the migration file might look like:

```sql
-- AlterEnum
ALTER TYPE "public"."TaskRunExecutionStatus" ADD VALUE 'DELAYED';

-- AlterTable
ALTER TABLE "public"."TaskRun" ADD COLUMN "debounce" JSONB;

-- AlterTable
ALTER TABLE "public"."_BackgroundWorkerToBackgroundWorkerFile" ADD CONSTRAINT "_BackgroundWorkerToBackgroundWorkerFile_AB_pkey" PRIMARY KEY ("A", "B");

-- DropIndex
DROP INDEX "public"."_BackgroundWorkerToBackgroundWorkerFile_AB_unique";

-- AlterTable
ALTER TABLE "public"."_BackgroundWorkerToTaskQueue" ADD CONSTRAINT "_BackgroundWorkerToTaskQueue_AB_pkey" PRIMARY KEY ("A", "B");

-- DropIndex
DROP INDEX "public"."_BackgroundWorkerToTaskQueue_AB_unique";

-- AlterTable
ALTER TABLE "public"."_TaskRunToTaskRunTag" ADD CONSTRAINT "_TaskRunToTaskRunTag_AB_pkey" PRIMARY KEY ("A", "B");

-- DropIndex
DROP INDEX "public"."_TaskRunToTaskRunTag_AB_unique";

-- AlterTable
ALTER TABLE "public"."_WaitpointRunConnections" ADD CONSTRAINT "_WaitpointRunConnections_AB_pkey" PRIMARY KEY ("A", "B");

-- DropIndex
DROP INDEX "public"."_WaitpointRunConnections_AB_unique";

-- AlterTable
ALTER TABLE "public"."_completedWaitpoints" ADD CONSTRAINT "_completedWaitpoints_AB_pkey" PRIMARY KEY ("A", "B");

-- DropIndex
DROP INDEX "public"."_completedWaitpoints_AB_unique";

-- CreateIndex
CREATE INDEX "SecretStore_key_idx" ON "public"."SecretStore"("key" text_pattern_ops);

-- CreateIndex
CREATE INDEX "TaskRun_runtimeEnvironmentId_id_idx" ON "public"."TaskRun"("runtimeEnvironmentId", "id" DESC);

-- CreateIndex
CREATE INDEX "TaskRun_runtimeEnvironmentId_createdAt_idx" ON "public"."TaskRun"("runtimeEnvironmentId", "createdAt" DESC);
```

All the following lines should be removed:

```sql
-- AlterTable
ALTER TABLE "public"."_BackgroundWorkerToBackgroundWorkerFile" ADD CONSTRAINT "_BackgroundWorkerToBackgroundWorkerFile_AB_pkey" PRIMARY KEY ("A", "B");

-- DropIndex
DROP INDEX "public"."_BackgroundWorkerToBackgroundWorkerFile_AB_unique";

-- AlterTable
ALTER TABLE "public"."_BackgroundWorkerToTaskQueue" ADD CONSTRAINT "_BackgroundWorkerToTaskQueue_AB_pkey" PRIMARY KEY ("A", "B");

-- DropIndex
DROP INDEX "public"."_BackgroundWorkerToTaskQueue_AB_unique";

-- AlterTable
ALTER TABLE "public"."_TaskRunToTaskRunTag" ADD CONSTRAINT "_TaskRunToTaskRunTag_AB_pkey" PRIMARY KEY ("A", "B");

-- DropIndex
DROP INDEX "public"."_TaskRunToTaskRunTag_AB_unique";

-- AlterTable
ALTER TABLE "public"."_WaitpointRunConnections" ADD CONSTRAINT "_WaitpointRunConnections_AB_pkey" PRIMARY KEY ("A", "B");

-- DropIndex
DROP INDEX "public"."_WaitpointRunConnections_AB_unique";

-- AlterTable
ALTER TABLE "public"."_completedWaitpoints" ADD CONSTRAINT "_completedWaitpoints_AB_pkey" PRIMARY KEY ("A", "B");

-- DropIndex
DROP INDEX "public"."_completedWaitpoints_AB_unique";

-- CreateIndex
CREATE INDEX "SecretStore_key_idx" ON "public"."SecretStore"("key" text_pattern_ops);

-- CreateIndex
CREATE INDEX "TaskRun_runtimeEnvironmentId_id_idx" ON "public"."TaskRun"("runtimeEnvironmentId", "id" DESC);

-- CreateIndex
CREATE INDEX "TaskRun_runtimeEnvironmentId_createdAt_idx" ON "public"."TaskRun"("runtimeEnvironmentId", "createdAt" DESC);
```

Leaving only this:

```sql
-- AlterEnum
ALTER TYPE "public"."TaskRunExecutionStatus" ADD VALUE 'DELAYED';

-- AlterTable
ALTER TABLE "public"."TaskRun" ADD COLUMN "debounce" JSONB;
```

After editing the migration file, apply the migration:

```bash
cd internal-packages/database
pnpm run db:migrate:deploy && pnpm run generate
```
6 changes: 6 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,12 @@ const EnvironmentSchema = z
.default(60_000),
RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_FACTOR: z.coerce.number().default(2),

/** Maximum duration in milliseconds that a run can be debounced. Default: 1 hour (3,600,000ms) */
RUN_ENGINE_MAXIMUM_DEBOUNCE_DURATION_MS: z.coerce
.number()
.int()
.default(60_000 * 60), // 1 hour

RUN_ENGINE_WORKER_REDIS_HOST: z
.string()
.optional()
Expand Down
3 changes: 3 additions & 0 deletions apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ export class SpanPresenter extends BasePresenter {
environmentId: run.runtimeEnvironment.id,
idempotencyKey: run.idempotencyKey,
idempotencyKeyExpiresAt: run.idempotencyKeyExpiresAt,
debounce: run.debounce as { key: string; delay: string; createdAt: Date } | null,
schedule: await this.resolveSchedule(run.scheduleId ?? undefined),
queue: {
name: run.queue,
Expand Down Expand Up @@ -357,6 +358,8 @@ export class SpanPresenter extends BasePresenter {
//idempotency
idempotencyKey: true,
idempotencyKeyExpiresAt: true,
//debounce
debounce: true,
//delayed
delayUntil: true,
//ttl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,19 @@ function RunBody({
)}
</Property.Value>
</Property.Item>
<Property.Item>
<Property.Label>Debounce</Property.Label>
<Property.Value>
{run.debounce ? (
<div>
<div className="break-all">Key: {run.debounce.key}</div>
<div>Delay: {run.debounce.delay}</div>
</div>
) : (
"–"
)}
</Property.Value>
</Property.Item>
<Property.Item>
<Property.Label>Version</Property.Label>
<Property.Value>
Expand Down
68 changes: 68 additions & 0 deletions apps/webapp/app/runEngine/concerns/traceEvents.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export class DefaultTraceEventsConcern implements TraceEventConcern {
traceparent,
setAttribute: (key, value) => event.setAttribute(key as any, value),
failWithError: event.failWithError.bind(event),
stop: event.stop.bind(event),
},
store
);
Expand Down Expand Up @@ -116,6 +117,73 @@ export class DefaultTraceEventsConcern implements TraceEventConcern {
traceparent,
setAttribute: (key, value) => event.setAttribute(key as any, value),
failWithError: event.failWithError.bind(event),
stop: event.stop.bind(event),
},
store
);
}
);
}

async traceDebouncedRun<T>(
request: TriggerTaskRequest,
parentStore: string | undefined,
options: {
existingRun: TaskRun;
debounceKey: string;
incomplete: boolean;
isError: boolean;
},
callback: (span: TracedEventSpan, store: string) => Promise<T>
): Promise<T> {
const { existingRun, debounceKey, incomplete, isError } = options;
const { repository, store } = await this.#getEventRepository(request, parentStore);

return await repository.traceEvent(
`${request.taskId} (debounced)`,
{
context: request.options?.traceContext,
spanParentAsLink: request.options?.spanParentAsLink,
kind: "SERVER",
environment: request.environment,
taskSlug: request.taskId,
attributes: {
properties: {
[SemanticInternalAttributes.ORIGINAL_RUN_ID]: existingRun.friendlyId,
},
style: {
icon: "task-cached",
},
runId: existingRun.friendlyId,
},
incomplete,
isError,
immediate: true,
},
async (event, traceContext, traceparent) => {
// Log a message about the debounced trigger
await repository.recordEvent(
`Debounced: using existing run with key "${debounceKey}"`,
{
taskSlug: request.taskId,
environment: request.environment,
attributes: {
runId: existingRun.friendlyId,
},
context: request.options?.traceContext,
parentId: event.spanId,
}
);

return await callback(
{
traceId: event.traceId,
spanId: event.spanId,
traceContext,
traceparent,
setAttribute: (key, value) => event.setAttribute(key as any, value),
failWithError: event.failWithError.bind(event),
stop: event.stop.bind(event),
},
store
);
Expand Down
45 changes: 43 additions & 2 deletions apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,19 @@ export class RunEngineTriggerTaskService {
}
}

const [parseDelayError, delayUntil] = await tryCatch(parseDelay(body.options?.delay));
// Parse delay from either explicit delay option or debounce.delay
const delaySource = body.options?.delay ?? body.options?.debounce?.delay;
const [parseDelayError, delayUntil] = await tryCatch(parseDelay(delaySource));

if (parseDelayError) {
throw new ServiceValidationError(`Invalid delay ${body.options?.delay}`);
throw new ServiceValidationError(`Invalid delay ${delaySource}`);
}

// Validate debounce options
if (body.options?.debounce && !delayUntil) {
throw new ServiceValidationError(
`Debounce requires a valid delay duration. Provided: ${body.options.debounce.delay}`
);
}

const ttl =
Expand Down Expand Up @@ -340,10 +349,42 @@ export class RunEngineTriggerTaskService {
bulkActionId: body.options?.bulkActionId,
planType,
realtimeStreamsVersion: options.realtimeStreamsVersion,
debounce: body.options?.debounce,
// When debouncing with triggerAndWait, create a span for the debounced trigger
onDebounced:
body.options?.debounce && body.options?.resumeParentOnCompletion
? async ({ existingRun, waitpoint, debounceKey }) => {
return await this.traceEventConcern.traceDebouncedRun(
triggerRequest,
parentRun?.taskEventStore,
{
existingRun,
debounceKey,
incomplete: waitpoint.status === "PENDING",
isError: waitpoint.outputIsError,
},
async (spanEvent) => {
const spanId =
options?.parentAsLinkType === "replay"
? spanEvent.spanId
: spanEvent.traceparent?.spanId
? `${spanEvent.traceparent.spanId}:${spanEvent.spanId}`
: spanEvent.spanId;
return spanId;
}
);
}
: undefined,
},
this.prisma
);

// If the returned run has a different friendlyId, it was debounced
// Stop the outer span to prevent a duplicate - the debounced span was created via onDebounced
if (taskRun.friendlyId !== runFriendlyId) {
event.stop();
}

const error = taskRun.error ? TaskRunError.parse(taskRun.error) : undefined;

if (error) {
Expand Down
17 changes: 17 additions & 0 deletions apps/webapp/app/runEngine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ export type TracedEventSpan = {
};
setAttribute: (key: string, value: string) => void;
failWithError: (error: TaskRunError) => void;
/**
* Stop the span without writing any event.
* Used when a debounced run is returned - the span for the debounced
* trigger is created separately via traceDebouncedRun.
*/
stop: () => void;
};

export interface TraceEventConcern {
Expand All @@ -150,6 +156,17 @@ export interface TraceEventConcern {
},
callback: (span: TracedEventSpan, store: string) => Promise<T>
): Promise<T>;
traceDebouncedRun<T>(
request: TriggerTaskRequest,
parentStore: string | undefined,
options: {
existingRun: TaskRun;
debounceKey: string;
incomplete: boolean;
isError: boolean;
},
callback: (span: TracedEventSpan, store: string) => Promise<T>
): Promise<T>;
}

export type TriggerRacepoints = "idempotencyKey";
Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/v3/runEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ function createRunEngine() {
? createBatchGlobalRateLimiter(env.BATCH_QUEUE_GLOBAL_RATE_LIMIT)
: undefined,
},
// Debounce configuration
debounce: {
maxDebounceDurationMs: env.RUN_ENGINE_MAXIMUM_DEBOUNCE_DURATION_MS,
},
});

return engine;
Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"lint": "eslint --cache --cache-location ./node_modules/.cache/eslint .",
"start": "cross-env NODE_ENV=production node --max-old-space-size=8192 ./build/server.js",
"start:local": "cross-env node --max-old-space-size=8192 ./build/server.js",
"typecheck": "tsc --noEmit -p ./tsconfig.check.json",
"typecheck": "cross-env NODE_OPTIONS=\"--max-old-space-size=8192\" tsc --noEmit -p ./tsconfig.check.json",
"db:seed": "tsx seed.mts",
"upload:sourcemaps": "bash ./upload-sourcemaps.sh",
"test": "vitest --no-file-parallelism",
Expand Down Expand Up @@ -288,4 +288,4 @@
"engines": {
"node": ">=18.19.0 || >=20.6.0"
}
}
}
Loading
Loading