Skip to content

Commit 83c6933

Browse files
authored
Merge branch 'main' into mollifier-phase-2
2 parents 68ae8b0 + 6c9f1f1 commit 83c6933

43 files changed

Lines changed: 1795 additions & 428 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.env.example

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,52 @@ REDIS_TLS_DISABLED="true"
2929
DEV_OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:3030/otel"
3030
DEV_OTEL_BATCH_PROCESSING_ENABLED="0"
3131

32+
# Realtime streams v2 (Sessions, chat.agent, large stream backfills) backed
33+
# by S2 (https://s2.dev). The `s2` service in docker/docker-compose.yml runs
34+
# the open-source s2-lite binary and pre-creates a basin named `trigger-local`
35+
# (see docker/config/s2-spec.json). Comment these out to fall back to v1
36+
# (Redis-only) streams; Sessions and chat.agent then become unavailable.
37+
REALTIME_STREAMS_S2_BASIN=trigger-local
38+
REALTIME_STREAMS_S2_ACCESS_TOKEN=ignored
39+
REALTIME_STREAMS_S2_ENDPOINT=http://localhost:4566/v1
40+
REALTIME_STREAMS_S2_SKIP_ACCESS_TOKENS=true
41+
REALTIME_STREAMS_DEFAULT_VERSION=v2
42+
43+
# Running multiple instances side by side (worktrees, branch experiments)
44+
#
45+
# Every host port in docker/docker-compose.yml is `${VAR:-default}` and the
46+
# project name comes from `COMPOSE_PROJECT_NAME`. To stand up a second stack
47+
# alongside the default one, uncomment the block below in this clone's `.env`
48+
# (pick any offset that doesn't clash with anything else running), then update
49+
# the URL/PORT vars further up to match. Default values are commented for
50+
# reference.
51+
#
52+
# --- core (pnpm run docker) ---
53+
# COMPOSE_PROJECT_NAME=triggerdotdev-docker-alt
54+
# CONTAINER_PREFIX=alt-
55+
# POSTGRES_HOST_PORT=15432 # default 5432
56+
# REDIS_HOST_PORT=16379 # default 6379
57+
# ELECTRIC_HOST_PORT=13060 # default 3060
58+
# MINIO_API_HOST_PORT=19005 # default 9005
59+
# MINIO_CONSOLE_HOST_PORT=19006 # default 9006
60+
# CLICKHOUSE_HTTP_HOST_PORT=18123 # default 8123
61+
# CLICKHOUSE_TCP_HOST_PORT=19000 # default 9000
62+
# S2_HOST_PORT=14566 # default 4566
63+
# REMIX_APP_PORT=13030 # default 3030
64+
# --- extras (only needed if you also run `pnpm run docker:full`) ---
65+
# ELECTRIC_SHARD_1_HOST_PORT=13061 # default 3061
66+
# CH_UI_HOST_PORT=15521 # default 5521
67+
# TOXIPROXY_PROXY_HOST_PORT=40303 # default 30303
68+
# TOXIPROXY_API_HOST_PORT=18474 # default 8474
69+
# NGINX_H2_HOST_PORT=18443 # default 8443
70+
# OTEL_GRPC_HOST_PORT=14317 # default 4317
71+
# OTEL_HTTP_HOST_PORT=14318 # default 4318
72+
# OTEL_PROMETHEUS_HOST_PORT=18889 # default 8889
73+
# PROMETHEUS_HOST_PORT=19090 # default 9090
74+
# GRAFANA_HOST_PORT=13001 # default 3001
75+
# (and update DATABASE_URL / CLICKHOUSE_URL / REDIS_PORT / APP_ORIGIN /
76+
# LOGIN_ORIGIN / ELECTRIC_ORIGIN / REALTIME_STREAMS_S2_ENDPOINT to match)
77+
3278
# When the domain is set to `localhost` the CLI deploy command will only --load the image by default and not --push it
3379
DEPLOY_REGISTRY_HOST=localhost:5000
3480

@@ -106,7 +152,7 @@ POSTHOG_PROJECT_KEY=
106152
# INTERNAL_OTEL_TRACE_LOGGING_ENABLED=1
107153
# INTERNAL_OTEL_TRACE_INSTRUMENT_PRISMA_ENABLED=0
108154

109-
# Enable local observability stack (requires `pnpm run docker` to start otel-collector)
155+
# Enable local observability stack (requires `pnpm run docker:full` to bring up otel-collector + prometheus + grafana)
110156
# Uncomment these to send metrics to the local Prometheus via OTEL Collector:
111157
# INTERNAL_OTEL_METRIC_EXPORTER_ENABLED=1
112158
# INTERNAL_OTEL_METRIC_EXPORTER_URL=http://localhost:4318/v1/metrics
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Dedupe the `realtimeStreams` array push on `PUT /realtime/v1/streams/:runId/:target/:streamId` so repeat stream-init calls for the same `(run, streamId)` skip the row UPDATE, mirroring the existing append handler.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: improvement
4+
---
5+
6+
Group Prisma P1001 ("Can't reach database server") errors into a single Sentry issue via a `beforeSend` fingerprint rule, so DB outages no longer fan out into hundreds of distinct issues that bury other alerts. Adds a small extensible rule table for future collapsing rules.

AGENTS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ See `ai/references/repo.md` for a more complete explanation of the workspaces.
1919
```bash
2020
pnpm run docker
2121
```
22+
Add `:full` (`pnpm run docker:full`) for the optional observability + chaos tooling. See `docker/docker-compose.extras.yml`.
2223
4. Run database migrations:
2324
```bash
2425
pnpm run db:migrate

CLAUDE.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ This is a pnpm 10.33.2 monorepo using Turborepo. Run commands from root with `pn
99
**Adding dependencies:** Edit `package.json` directly instead of using `pnpm add`, then run `pnpm i` from the repo root. See `.claude/rules/package-installation.md` for the full process.
1010

1111
```bash
12-
pnpm run docker # Start Docker services (PostgreSQL, Redis, Electric)
12+
pnpm run docker # Core dev services (Postgres, Redis, Electric, MinIO, ClickHouse, s2-lite)
13+
# pnpm run docker:full # Same + observability stack (Prometheus, Grafana, OTEL) and chaos tooling
1314
pnpm run db:migrate # Run database migrations
1415
pnpm run db:seed # Seed the database (required for reference projects)
1516

CONTRIBUTING.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,14 @@ branch are tagged into a release periodically.
7171

7272
Feel free to update `SESSION_SECRET` and `MAGIC_LINK_SECRET` as well using the same method.
7373

74-
8. Start Docker. This starts the required services: Postgres, Redis, Electric, and ClickHouse (the ClickHouse migrator runs once on first start). If this is your first time using Docker, consider going through this [guide](DOCKER_INSTALLATION.md).
74+
8. Start Docker. This starts the core dev services (Postgres, Redis, Electric, MinIO, ClickHouse, s2-lite) and runs the ClickHouse migrator once on first start. If this is your first time using Docker, consider going through this [guide](DOCKER_INSTALLATION.md).
7575

7676
```
7777
pnpm run docker
7878
```
7979

80+
For the observability stack (Prometheus, Grafana, OTEL collector) and other optional tooling (Toxiproxy, nginx-h2, ch-ui, extra electric shard), use `pnpm run docker:full` instead. See `docker/docker-compose.extras.yml` for the full list.
81+
8082
9. Migrate the database
8183
```
8284
pnpm run db:migrate
@@ -300,3 +302,7 @@ The process running on port `3030` should be destroyed.
300302
```sh
301303
sudo kill -9 <PID>
302304
```
305+
306+
### Running two clones side by side (worktree, branch experiment)
307+
308+
The default `pnpm run docker` uses the project name `triggerdotdev-docker` and the standard host ports (5432, 6379, 3060, 4566, 8123, 9000, 9005, 9006). To stand up a second instance in another clone without clashing, set a different `COMPOSE_PROJECT_NAME` and the offset host ports in that clone's `.env`. The "Running multiple instances side by side" block in `.env.example` lists every overridable env var with its default for reference; uncomment the lines you need and update `DATABASE_URL` / `CLICKHOUSE_URL` / `REDIS_PORT` / `APP_ORIGIN` / `LOGIN_ORIGIN` / `ELECTRIC_ORIGIN` / `REALTIME_STREAMS_S2_ENDPOINT` to match.

apps/webapp/app/components/runs/v3/agent/AgentView.tsx

Lines changed: 111 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { UIMessage } from "@ai-sdk/react";
2-
import { SSEStreamSubscription } from "@trigger.dev/core/v3";
2+
import { ChatSnapshotV1Schema, SSEStreamSubscription } from "@trigger.dev/core/v3";
33
import { useEffect, useMemo, useRef, useState } from "react";
44
import { Paragraph } from "~/components/primitives/Paragraph";
55
import { Spinner } from "~/components/primitives/Spinner";
@@ -27,6 +27,15 @@ export type AgentViewAuth = {
2727
* channel and is merged in by the AgentView subscription.
2828
*/
2929
initialMessages: UIMessage[];
30+
/**
31+
* Presigned GET URL for the session's chat-snapshot S3 blob (written
32+
* by the agent after each turn-complete; see `ChatSnapshotV1`).
33+
* Optional — sessions that registered a `hydrateMessages` hook skip
34+
* snapshot writes and the URL fetch will 404. In that case the
35+
* dashboard falls back to seq=0 SSE (which, post-trim, shows only the
36+
* most recent turn). Generated server-side by `SessionPresenter`.
37+
*/
38+
snapshotPresignedUrl?: string;
3039
};
3140

3241
/**
@@ -81,6 +90,7 @@ export function AgentView({ agentView }: { agentView: AgentViewAuth }) {
8190
projectSlug: project.slug,
8291
envSlug: environment.slug,
8392
initialMessages: agentView.initialMessages,
93+
snapshotPresignedUrl: agentView.snapshotPresignedUrl,
8494
});
8595

8696
// Sticky-bottom auto-scroll: walks up to find the inspector's scroll
@@ -120,13 +130,19 @@ export function AgentView({ agentView }: { agentView: AgentViewAuth }) {
120130
* - `kind: "stop"` is a stop signal — no messages, nothing to render
121131
* here, so it's filtered.
122132
*
133+
* Wire payloads are slim-wire (one new UIMessage per record, on
134+
* `payload.message`). The legacy `payload.messages` array shape is kept
135+
* here as a fallback so any historical records on a long-lived session
136+
* still render.
137+
*
123138
* The server wraps records in `{data, id}` and writes `data` as a JSON
124139
* string; SSE v2 delivers the parsed string back. {@link parseChunkPayload}
125140
* re-parses to recover the object.
126141
*/
127142
type InputStreamChunk = {
128143
kind?: "message" | "stop";
129144
payload?: {
145+
message?: { id?: string; role?: string; parts?: unknown[] };
130146
messages?: Array<{ id?: string; role?: string; parts?: unknown[] }>;
131147
trigger?: string;
132148
};
@@ -217,13 +233,15 @@ function useAgentSessionMessages({
217233
projectSlug,
218234
envSlug,
219235
initialMessages,
236+
snapshotPresignedUrl,
220237
}: {
221238
sessionId: string;
222239
apiOrigin: string;
223240
orgSlug: string;
224241
projectSlug: string;
225242
envSlug: string;
226243
initialMessages: UIMessage[];
244+
snapshotPresignedUrl?: string;
227245
}): UIMessage[] {
228246
// Seed with the user messages from the run's task payload.
229247
const seedMessages = useMemo(
@@ -285,27 +303,92 @@ function useAgentSessionMessages({
285303
const outputUrl = `${sessionBase}/out`;
286304
const inputUrl = `${sessionBase}/in`;
287305

306+
/**
307+
* Try to seed `pendingRef` from the agent's S3 snapshot blob and return
308+
* the snapshot's `lastOutEventId` so the `.out` SSE subscription resumes
309+
* just past the snapshot. Returns undefined for sessions that don't
310+
* have a snapshot (e.g. `hydrateMessages` customers, or sessions that
311+
* have never completed a turn).
312+
*/
313+
const loadSnapshot = async (): Promise<string | undefined> => {
314+
if (!snapshotPresignedUrl) return undefined;
315+
try {
316+
const resp = await fetch(snapshotPresignedUrl, { signal: abort.signal });
317+
if (!resp.ok) return undefined;
318+
const json = (await resp.json()) as unknown;
319+
const parsed = ChatSnapshotV1Schema.safeParse(json);
320+
if (!parsed.success) return undefined;
321+
const snapshot = parsed.data;
322+
// Preserve the snapshot's array order in the final render by
323+
// giving each message a unique, monotonically increasing
324+
// timestamp from `(savedAt - count + index)`. Real chunk
325+
// timestamps from the SSE path use S2 arrival ms (positive
326+
// numbers in the present), so anything below `savedAt` sorts
327+
// before live chunks while preserving snapshot order among
328+
// themselves.
329+
const count = snapshot.messages.length;
330+
snapshot.messages.forEach((raw, i) => {
331+
const message = raw as UIMessage;
332+
if (!message?.id) return;
333+
// The snapshot's seed wins over the task-payload seed for any
334+
// overlapping ids (the snapshot represents the agent's
335+
// canonical accumulator, post-turn).
336+
pendingRef.current.set(message.id, message);
337+
if (!timestampsRef.current.has(message.id)) {
338+
timestampsRef.current.set(message.id, snapshot.savedAt - count + i);
339+
}
340+
});
341+
scheduleFlush.current();
342+
return snapshot.lastOutEventId;
343+
} catch {
344+
// 404 / network / parse / abort — fall back to seq=0 SSE
345+
return undefined;
346+
}
347+
};
348+
349+
const outputSubOptions = (lastEventId: string | undefined) =>
350+
({
351+
signal: abort.signal,
352+
timeoutInSeconds: 120,
353+
...(lastEventId !== undefined ? { lastEventId } : {}),
354+
}) as const;
355+
288356
const commonSubOptions = {
289357
signal: abort.signal,
290358
timeoutInSeconds: 120,
291359
} as const;
292360

293361
// ---- Output stream: assistant messages ---------------------------------
294362
//
295-
// The output stream delivers UIMessageChunks interleaved with
296-
// Trigger-specific control chunks (`trigger:turn-complete`, etc.). We
297-
// filter the control chunks and fold everything else into an assistant
298-
// `UIMessage` via our own `applyOutputChunk` accumulator — the AI SDK's
299-
// `readUIMessageStream` helper is only available in `ai@6`, and the
300-
// webapp is pinned to `ai@4`, so we re-implement just the chunk types
301-
// that `renderPart` actually displays.
363+
// The output stream delivers data records (UIMessageChunks) interleaved
364+
// with Trigger control records (`turn-complete`, `upgrade-required`) and
365+
// S2 command records (`trim`). Control + command records ride on
366+
// `record.headers` with empty bodies; the SSE parser strips S2 command
367+
// records entirely, and control records arrive with `value.chunk ===
368+
// undefined`, which `parseChunkPayload` drops below.
369+
//
370+
// We fold everything else into an assistant `UIMessage` via our own
371+
// `applyOutputChunk` accumulator — the AI SDK's `readUIMessageStream`
372+
// helper is only available in `ai@6`, and the webapp is pinned to
373+
// `ai@4`, so we re-implement just the chunk types that `renderPart`
374+
// actually displays.
302375
//
303376
// We capture the **server timestamp of each assistant message's first
304377
// `start` chunk** so later sort-by-timestamp merges with the input
305378
// stream correctly.
306379
const runOutput = async () => {
307380
try {
308-
const sub = new SSEStreamSubscription(outputUrl, commonSubOptions);
381+
// Seed messages from the snapshot first (if available), then
382+
// resume the SSE from the snapshot's last event id so we don't
383+
// re-stream chunks already represented in the snapshot. If no
384+
// snapshot exists (no URL, 404, parse failure), the SSE opens
385+
// at seq=0 — which, post-trim, contains roughly one turn of
386+
// records (acceptable fallback for `hydrateMessages` sessions
387+
// and fresh sessions).
388+
const snapshotLastEventId = await loadSnapshot();
389+
if (abort.signal.aborted) return;
390+
391+
const sub = new SSEStreamSubscription(outputUrl, outputSubOptions(snapshotLastEventId));
309392
const raw = await sub.subscribe();
310393
const reader = raw.getReader();
311394

@@ -318,6 +401,12 @@ function useAgentSessionMessages({
318401

319402
const chunk = parseChunkPayload(value.chunk) as OutputChunk | null;
320403
if (!chunk || typeof chunk.type !== "string") continue;
404+
// Legacy belt-and-suspenders: prior versions of the SDK
405+
// emitted `trigger:turn-complete` / `trigger:upgrade-required`
406+
// as data records (`type` field). Current versions use
407+
// header-form control records, which `parseChunkPayload`
408+
// drops above. Keep this filter to handle any in-flight
409+
// sessions whose `.out` was populated by the older SDK.
321410
if (chunk.type.startsWith("trigger:")) continue;
322411

323412
if (chunk.type === "start") {
@@ -413,9 +502,18 @@ function useAgentSessionMessages({
413502
const chunk = parseChunkPayload(value.chunk) as InputStreamChunk | null;
414503
if (!chunk || chunk.kind !== "message") continue;
415504
const payload = chunk.payload;
416-
if (!payload || !Array.isArray(payload.messages)) continue;
417-
418-
const incomingUsers = payload.messages.filter(
505+
if (!payload) continue;
506+
507+
// Slim-wire is one UIMessage on `payload.message`; legacy
508+
// payloads carried an array on `payload.messages`. Accept
509+
// either so historical records on a long-lived session still
510+
// render.
511+
const candidates = Array.isArray(payload.messages)
512+
? payload.messages
513+
: payload.message
514+
? [payload.message]
515+
: [];
516+
const incomingUsers = candidates.filter(
419517
(m): m is UIMessage =>
420518
m != null && (m as { role?: string }).role === "user" && typeof m.id === "string"
421519
);
@@ -454,7 +552,7 @@ function useAgentSessionMessages({
454552
pendingTimerRef.current = null;
455553
}
456554
};
457-
}, [sessionId, apiOrigin, orgSlug, projectSlug, envSlug]);
555+
}, [sessionId, apiOrigin, orgSlug, projectSlug, envSlug, snapshotPresignedUrl]);
458556

459557
return useMemo(() => {
460558
const timestamps = timestampsRef.current;

0 commit comments

Comments
 (0)