Skip to content

Commit 1e1ff9e

Browse files
committed
refactor(supervisor): op + kind first-class on State
1 parent a418a5a commit 1e1ff9e

8 files changed

Lines changed: 59 additions & 27 deletions

File tree

apps/supervisor/src/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,8 @@ class ManagedSupervisor {
259259
await runWideEvent(
260260
{
261261
...this.wideEventOpts,
262+
op: "dequeue",
263+
kind: "inbound",
262264
traceparent,
263265
setup: (state) => {
264266
setMeta(state, "run_id", message.run.friendlyId);
@@ -269,8 +271,6 @@ class ManagedSupervisor {
269271
setMeta(state, "deployment_id", message.deployment.friendlyId);
270272
}
271273
setMeta(state, "machine_preset", message.run.machine.name);
272-
state.extras.op = "dequeue";
273-
state.extras.kind = "inbound";
274274
state.extras.iteration = "dequeue";
275275
state.extras.dequeue_response_ms = dequeueResponseMs;
276276
state.extras.polling_interval_ms = pollingIntervalMs;

apps/supervisor/src/services/computeSnapshotService.ts

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,9 @@ export class ComputeSnapshotService {
7676
this.timerWheel.submit(runFriendlyId, data);
7777
emitOneShot({
7878
...this.wideEventOpts,
79+
op: "snapshot.schedule",
80+
kind: "event",
7981
populate: (state) => {
80-
state.extras.op = "snapshot.schedule";
81-
state.extras.kind = "event";
8282
state.meta.run_id = runFriendlyId;
8383
state.meta.snapshot_id = data.snapshotFriendlyId;
8484
state.extras.runner_id = data.runnerId;
@@ -98,9 +98,9 @@ export class ComputeSnapshotService {
9898
if (cancelled) {
9999
emitOneShot({
100100
...this.wideEventOpts,
101+
op: "snapshot.canceled",
102+
kind: "event",
101103
populate: (state) => {
102-
state.extras.op = "snapshot.canceled";
103-
state.extras.kind = "event";
104104
state.meta.run_id = runFriendlyId;
105105
},
106106
});
@@ -121,7 +121,6 @@ export class ComputeSnapshotService {
121121
// become extras/meta on the same wide event - no nested emission.
122122
const state = fromContext();
123123
if (state) {
124-
state.extras.op = "snapshot.callback";
125124
state.extras["snapshot.status"] = body.status;
126125
if (body.instance_id) state.extras["snapshot.instance_id"] = body.instance_id;
127126
if (body.duration_ms !== undefined) state.extras["snapshot.duration_ms"] = body.duration_ms;
@@ -247,9 +246,9 @@ export class ComputeSnapshotService {
247246
await runWideEvent(
248247
{
249248
...this.wideEventOpts,
249+
op: "snapshot.dispatch",
250+
kind: "scheduled",
250251
setup: (state) => {
251-
state.extras.op = "snapshot.dispatch";
252-
state.extras.kind = "scheduled";
253252
state.meta.run_id = snapshot.runFriendlyId;
254253
state.meta.snapshot_id = snapshot.snapshotFriendlyId;
255254
state.extras.runner_id = snapshot.runnerId;

apps/supervisor/src/wideEvents/emit.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ export function emit(state: State): void {
2727
appendIfSet(out, "region", state.region);
2828
appendIfSet(out, "node_id", state.nodeId);
2929

30+
appendIfSet(out, "op", state.op);
31+
appendIfSet(out, "kind", state.kind);
32+
3033
out.ok = state.ok;
3134
if (state.statusCode !== 0) out.status = state.statusCode;
3235
out.duration_ms = state.durationMs;

apps/supervisor/src/wideEvents/middleware.test.ts

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ describe("runWideEvent", () => {
2020
it("emits one event with ok=true when no statusCode is set", async () => {
2121
const lines = await captureStdout(async () => {
2222
await runWideEvent(
23-
{ service: "supervisor", env: {}, enabled: true, route: "/x", method: "POST" },
23+
{ service: "supervisor", env: {}, enabled: true, op: "test", route: "/x", method: "POST" },
2424
async () => undefined
2525
);
2626
});
@@ -39,7 +39,7 @@ describe("runWideEvent", () => {
3939
it("derives ok from statusCode set via finalize", async () => {
4040
const lines = await captureStdout(async () => {
4141
await runWideEvent(
42-
{ service: "supervisor", env: {}, enabled: true },
42+
{ service: "supervisor", env: {}, enabled: true, op: "test" },
4343
async () => undefined,
4444
(state) => {
4545
state.statusCode = 200;
@@ -56,7 +56,7 @@ describe("runWideEvent", () => {
5656
it("treats 4xx as ok=false", async () => {
5757
const lines = await captureStdout(async () => {
5858
await runWideEvent(
59-
{ service: "supervisor", env: {}, enabled: true },
59+
{ service: "supervisor", env: {}, enabled: true, op: "test" },
6060
async () => undefined,
6161
(state) => {
6262
state.statusCode = 400;
@@ -73,7 +73,7 @@ describe("runWideEvent", () => {
7373
it("emits ok=false with error.kind=internal on throw", async () => {
7474
const lines = await captureStdout(async () => {
7575
await runWideEvent(
76-
{ service: "supervisor", env: {}, enabled: true },
76+
{ service: "supervisor", env: {}, enabled: true, op: "test" },
7777
async () => {
7878
throw new Error("boom");
7979
}
@@ -91,7 +91,7 @@ describe("runWideEvent", () => {
9191
it("threads state through AsyncLocalStorage", async () => {
9292
const lines = await captureStdout(async () => {
9393
await runWideEvent(
94-
{ service: "supervisor", env: {}, enabled: true },
94+
{ service: "supervisor", env: {}, enabled: true, op: "test" },
9595
async () => {
9696
setMeta(fromContext(), "run_id", "run_abc");
9797
}
@@ -108,7 +108,7 @@ describe("runWideEvent", () => {
108108
const tp = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
109109
const lines = await captureStdout(async () => {
110110
await runWideEvent(
111-
{ service: "supervisor", env: {}, enabled: true, traceparent: tp },
111+
{ service: "supervisor", env: {}, enabled: true, op: "test", traceparent: tp },
112112
async () => undefined
113113
);
114114
});
@@ -124,7 +124,7 @@ describe("runWideEvent", () => {
124124
{
125125
service: "supervisor",
126126
env: {},
127-
enabled: true,
127+
enabled: true, op: "test",
128128
setup: (state) => {
129129
state.meta.run_id = "run_abc";
130130
state.extras.iteration = "dequeue";
@@ -144,7 +144,7 @@ describe("runWideEvent", () => {
144144
let seenState: ReturnType<typeof fromContext> = null;
145145
const lines = await captureStdout(async () => {
146146
await runWideEvent(
147-
{ service: "supervisor", env: {}, enabled: false },
147+
{ service: "supervisor", env: {}, enabled: false, op: "test" },
148148
async () => {
149149
seenState = fromContext();
150150
}
@@ -159,7 +159,7 @@ describe("runWideEvent", () => {
159159
await Promise.all(
160160
["a", "b", "c"].map((tag) =>
161161
runWideEvent(
162-
{ service: "supervisor", env: {}, enabled: true },
162+
{ service: "supervisor", env: {}, enabled: true, op: "test" },
163163
async () => {
164164
const s = fromContext();
165165
if (!s) throw new Error("no state");
@@ -182,7 +182,7 @@ describe("emitOneShot", () => {
182182
emitOneShot({
183183
service: "supervisor",
184184
env: {},
185-
enabled: true,
185+
enabled: true, op: "test",
186186
populate: (s) => {
187187
s.meta.run_id = "run_abc";
188188
s.extras.event = "run:start";
@@ -200,7 +200,7 @@ describe("emitOneShot", () => {
200200

201201
it("emits nothing when disabled", async () => {
202202
const lines = await captureStdout(() => {
203-
emitOneShot({ service: "supervisor", env: {}, enabled: false });
203+
emitOneShot({ service: "supervisor", env: {}, enabled: false, op: "test" });
204204
});
205205
expect(lines).toHaveLength(0);
206206
});

apps/supervisor/src/wideEvents/middleware.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ export type WideEventOptions = {
1818

1919
/** Per-invocation options layered on top of `WideEventOptions`. */
2020
export type WideEventLifecycleOptions = WideEventOptions & {
21+
/** Operation discriminator (`instance.create`, `dequeue`, ...). Required. */
22+
op: string;
23+
/** Event shape: `inbound` | `outbound` | `event` | `scheduled`. Optional. */
24+
kind?: string;
2125
/** Route template (HTTP only) captured into `extras.route`. */
2226
route?: string;
2327
/** HTTP method captured into `extras.method`. */
@@ -54,6 +58,8 @@ export async function runWideEvent<T>(
5458
env: opts.env,
5559
inboundRequestId: opts.inboundRequestId,
5660
traceparent: opts.traceparent,
61+
op: opts.op,
62+
kind: opts.kind,
5763
});
5864
if (opts.route) state.extras.route = opts.route;
5965
if (opts.method) state.extras.method = opts.method;
@@ -94,6 +100,8 @@ export async function runWideEvent<T>(
94100
*/
95101
export function emitOneShot(
96102
opts: WideEventOptions & {
103+
op: string;
104+
kind?: string;
97105
traceparent?: string;
98106
populate?: (state: State) => void;
99107
}
@@ -103,6 +111,8 @@ export function emitOneShot(
103111
service: opts.service,
104112
env: opts.env,
105113
traceparent: opts.traceparent,
114+
op: opts.op,
115+
kind: opts.kind,
106116
});
107117
if (opts.populate) opts.populate(state);
108118
state.ok = true;

apps/supervisor/src/wideEvents/new.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ export type NewStateOptions = {
3737
inboundRequestId?: string;
3838
/** Optional inbound W3C traceparent (HTTP header, queue message field). */
3939
traceparent?: string;
40+
/** Operation discriminator. Dotted `noun.verb`. Defaults to empty (set later). */
41+
op?: string;
42+
/** Event shape: `inbound` | `outbound` | `event` | `scheduled`. Defaults to empty. */
43+
kind?: string;
4044
};
4145

4246
/**
@@ -62,6 +66,8 @@ export function newState(opts: NewStateOptions): State {
6266
commitSha: opts.env.commitSha,
6367
region: opts.env.region,
6468
nodeId: opts.env.nodeId,
69+
op: opts.op ?? "",
70+
kind: opts.kind ?? "",
6571
meta: {},
6672
phases: [],
6773
ok: false,

apps/supervisor/src/wideEvents/state.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,22 @@ export type State = {
2222
region?: string;
2323
nodeId?: string;
2424

25+
/**
26+
* Operation discriminator. Dotted `noun.verb` (e.g. `instance.create`,
27+
* `snapshot.dispatch`). Low cardinality - bounded set per service, not
28+
* unbounded. Empty allowed during construction but expected to be set
29+
* before emit.
30+
*/
31+
op: string;
32+
33+
/**
34+
* Event shape. `inbound` for received requests, `outbound` for outgoing
35+
* calls, `event` for ambient occurrences with no meaningful duration,
36+
* `scheduled` for timer-driven work. Empty allowed; omitted from emit
37+
* when empty.
38+
*/
39+
kind: string;
40+
2541
// Caller-attached opaque metadata, flattened to `meta.<key>` on emit.
2642
meta: Record<string, string>;
2743

apps/supervisor/src/workloadServer/index.ts

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -196,15 +196,13 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
196196
{
197197
...this.wideEventOpts,
198198
enabled,
199+
op,
200+
kind: "inbound",
199201
route,
200202
method,
201203
traceparent: this.headerValueFromRequest(ctx.req, "traceparent"),
202204
inboundRequestId: this.headerValueFromRequest(ctx.req, "x-request-id"),
203-
setup: (state) => {
204-
state.extras.op = op;
205-
state.extras.kind = "inbound";
206-
this.attachRouteMeta(state, ctx.params);
207-
},
205+
setup: (state) => this.attachRouteMeta(state, ctx.params),
208206
},
209207
fn,
210208
(state) => {
@@ -679,9 +677,9 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
679677
) => {
680678
emitOneShot({
681679
...this.wideEventOpts,
680+
op: event === "run_connected" ? "socket.run.connected" : "socket.run.disconnected",
681+
kind: "event",
682682
populate: (state) => {
683-
state.extras.op = event === "run_connected" ? "socket.run.connected" : "socket.run.disconnected";
684-
state.extras.kind = "event";
685683
state.extras.event = event;
686684
setMeta(state, "run_id", friendlyId);
687685
if (socket.data.deploymentId) {

0 commit comments

Comments
 (0)