diff --git a/pkgs/client/__tests__/e2e/skipped-steps.test.ts b/pkgs/client/__tests__/e2e/skipped-steps.test.ts new file mode 100644 index 000000000..2d3afed7e --- /dev/null +++ b/pkgs/client/__tests__/e2e/skipped-steps.test.ts @@ -0,0 +1,268 @@ +import { describe, it, expect } from 'vitest'; +import { withPgNoTransaction } from '../helpers/db.js'; +import { createTestSupabaseClient } from '../helpers/setup.js'; +import { createTestFlow } from '../helpers/fixtures.js'; +import { grantMinimalPgflowPermissions } from '../helpers/permissions.js'; +import { PgflowClient } from '../../src/lib/PgflowClient.js'; +import { FlowStepStatus } from '../../src/lib/types.js'; +import { PgflowSqlClient } from '@pgflow/core'; +import { readAndStart } from '../helpers/polling.js'; +import { cleanupFlow } from '../helpers/cleanup.js'; +import { createEventTracker } from '../helpers/test-utils.js'; +import { skipStep } from '../helpers/skip-step.js'; + +/** + * Tests for skipped step event handling in the client. + * + * Skipped steps can occur when: + * - A step's condition evaluates to false (condition_unmet) + * - A dependency was skipped, causing cascading skips (dependency_skipped) + * - A handler fails during evaluation (handler_failed) + * + * These tests verify the client correctly: + * - Receives and processes skipped broadcast events + * - Updates step state with skipped_at and skip_reason + * - Treats skipped as a terminal state + * - Handles waitForStatus(Skipped) correctly + */ +describe('Skipped Step Handling', () => { + it( + 'client handles skipped step state from database snapshot', + withPgNoTransaction(async (sql) => { + // This test verifies the client correctly handles skipped step state + // when fetched from the database (e.g., on reconnect or late join) + + const testFlow = createTestFlow('skip_snap'); + await cleanupFlow(sql, testFlow.slug); + await grantMinimalPgflowPermissions(sql); + + await sql`SELECT pgflow.create_flow(${testFlow.slug})`; + await sql`SELECT pgflow.add_step(${testFlow.slug}, 'will_skip_step')`; + + const supabaseClient = createTestSupabaseClient(); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); + + // Start the flow + const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' }); + const step = run.step('will_skip_step'); + + // Verify initial state is Started (root step) + expect(step.status).toBe(FlowStepStatus.Started); + + // Directly call pgflow.skip_step to simulate the step being skipped + // This mimics what would happen when a condition evaluates to false + await skipStep(sql, run.run_id, 'will_skip_step', 'condition_unmet'); + + // Wait for the skipped event to be received + await step.waitForStatus(FlowStepStatus.Skipped, { timeoutMs: 10000 }); + + // Verify skipped state + expect(step.status).toBe(FlowStepStatus.Skipped); + expect(step.skipped_at).toBeInstanceOf(Date); + expect(step.skip_reason).toBe('condition_unmet'); + + // Verify output is null for skipped steps (per design decision Q1) + expect(step.output).toBeNull(); + + await supabaseClient.removeAllChannels(); + }), + { timeout: 15000 } + ); + + it( + 'receives skipped broadcast event and updates step state', + withPgNoTransaction(async (sql) => { + // This test verifies the client receives and processes skipped events + // broadcast via Supabase realtime + + const testFlow = createTestFlow('skip_broadcast'); + await cleanupFlow(sql, testFlow.slug); + await grantMinimalPgflowPermissions(sql); + + await sql`SELECT pgflow.create_flow(${testFlow.slug})`; + await sql`SELECT pgflow.add_step(${testFlow.slug}, 'skipped_step')`; + + const supabaseClient = createTestSupabaseClient(); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); + + const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' }); + const step = run.step('skipped_step'); + + // Set up event tracking BEFORE the skip happens + const tracker = createEventTracker(); + step.on('*', tracker.callback); + + // Skip the step + await skipStep(sql, run.run_id, 'skipped_step', 'handler_failed'); + + // Wait for the skipped status + await step.waitForStatus(FlowStepStatus.Skipped, { timeoutMs: 10000 }); + + // Verify we received the skipped event + expect(tracker).toHaveReceivedEvent('step:skipped'); + expect(tracker).toHaveReceivedEvent('step:skipped', { + run_id: run.run_id, + step_slug: 'skipped_step', + status: FlowStepStatus.Skipped, + skip_reason: 'handler_failed', + }); + + // Verify step state + expect(step.status).toBe(FlowStepStatus.Skipped); + expect(step.skip_reason).toBe('handler_failed'); + + await supabaseClient.removeAllChannels(); + }), + { timeout: 15000 } + ); + + it( + 'skipped is a terminal state - no further status changes', + withPgNoTransaction(async (sql) => { + // Verify that once a step is skipped, it cannot transition to other states + + const testFlow = createTestFlow('skip_terminal'); + await cleanupFlow(sql, testFlow.slug); + await grantMinimalPgflowPermissions(sql); + + await sql`SELECT pgflow.create_flow(${testFlow.slug})`; + await sql`SELECT pgflow.add_step(${testFlow.slug}, 'terminal_step')`; + + const supabaseClient = createTestSupabaseClient(); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); + + const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' }); + const step = run.step('terminal_step'); + + // Skip the step + await skipStep(sql, run.run_id, 'terminal_step', 'dependency_skipped'); + + await step.waitForStatus(FlowStepStatus.Skipped, { timeoutMs: 10000 }); + + // Store original skipped_at + const originalSkippedAt = step.skipped_at; + + // Set up tracker for any subsequent events + const tracker = createEventTracker(); + step.on('*', tracker.callback); + + // Try to complete the step (should be ignored by the database, no event broadcast) + // The database should reject this, but even if an event comes through, + // the client should ignore it due to terminal state protection + const sqlClient = new PgflowSqlClient(sql); + try { + // This should fail at the database level + const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 0.1, 1); + if (tasks.length > 0) { + await sqlClient.completeTask(tasks[0], { result: 'should not work' }); + } + } catch (e) { + // Expected - skipped steps shouldn't have tasks + } + + // Give time for any potential events + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Verify step is still skipped with same timestamp + expect(step.status).toBe(FlowStepStatus.Skipped); + expect(step.skipped_at).toEqual(originalSkippedAt); + + // Verify no additional events were processed + expect(tracker).toHaveReceivedTotalEvents(0); + + await supabaseClient.removeAllChannels(); + }), + { timeout: 15000 } + ); + + it( + 'waitForStatus(Skipped) resolves when step is skipped', + withPgNoTransaction(async (sql) => { + // Verify waitForStatus works correctly with Skipped status + + const testFlow = createTestFlow('wait_skip'); + await cleanupFlow(sql, testFlow.slug); + await grantMinimalPgflowPermissions(sql); + + await sql`SELECT pgflow.create_flow(${testFlow.slug})`; + await sql`SELECT pgflow.add_step(${testFlow.slug}, 'wait_step')`; + + const supabaseClient = createTestSupabaseClient(); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); + + const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' }); + const step = run.step('wait_step'); + + // Start waiting for skipped BEFORE the skip happens + const waitPromise = step.waitForStatus(FlowStepStatus.Skipped, { + timeoutMs: 10000, + }); + + // Skip the step after a small delay + setTimeout(async () => { + await skipStep(sql, run.run_id, 'wait_step', 'condition_unmet'); + }, 100); + + // Wait should resolve with the step + const result = await waitPromise; + expect(result).toBe(step); + expect(result.status).toBe(FlowStepStatus.Skipped); + expect(result.skip_reason).toBe('condition_unmet'); + + await supabaseClient.removeAllChannels(); + }), + { timeout: 15000 } + ); + + it( + 'handles all skip reasons correctly', + withPgNoTransaction(async (sql) => { + // Verify all three skip reasons are handled correctly + + const skipReasons = [ + 'condition_unmet', + 'handler_failed', + 'dependency_skipped', + ] as const; + + for (const skipReason of skipReasons) { + const testFlow = createTestFlow(`skip_${skipReason}`); + await cleanupFlow(sql, testFlow.slug); + await grantMinimalPgflowPermissions(sql); + + await sql`SELECT pgflow.create_flow(${testFlow.slug})`; + await sql`SELECT pgflow.add_step(${testFlow.slug}, 'reason_step')`; + + const supabaseClient = createTestSupabaseClient(); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); + + const run = await pgflowClient.startFlow(testFlow.slug, { + test: 'data', + }); + const step = run.step('reason_step'); + + // Skip with specific reason + await skipStep(sql, run.run_id, 'reason_step', skipReason); + + await step.waitForStatus(FlowStepStatus.Skipped, { timeoutMs: 10000 }); + + // Verify the skip reason was captured correctly + expect(step.status).toBe(FlowStepStatus.Skipped); + expect(step.skip_reason).toBe(skipReason); + + await supabaseClient.removeAllChannels(); + } + }), + { timeout: 45000 } + ); +}); diff --git a/pkgs/client/__tests__/helpers/event-factories.ts b/pkgs/client/__tests__/helpers/event-factories.ts index 7e95f75a8..350e9ae67 100644 --- a/pkgs/client/__tests__/helpers/event-factories.ts +++ b/pkgs/client/__tests__/helpers/event-factories.ts @@ -7,6 +7,7 @@ import type { BroadcastStepStartedEvent, BroadcastStepCompletedEvent, BroadcastStepFailedEvent, + BroadcastStepSkippedEvent, } from '../../src/lib/types'; /** @@ -98,3 +99,17 @@ export function createStepFailedEvent( ...overrides, }; } + +export function createStepSkippedEvent( + overrides: Partial = {} +): BroadcastStepSkippedEvent { + return { + event_type: 'step:skipped', + run_id: '123e4567-e89b-12d3-a456-426614174000', + step_slug: 'test-step', + status: FlowStepStatus.Skipped, + skipped_at: new Date().toISOString(), + skip_reason: 'condition_unmet', + ...overrides, + }; +} diff --git a/pkgs/client/__tests__/helpers/fixtures.ts b/pkgs/client/__tests__/helpers/fixtures.ts index a570f9c0b..31288f586 100644 --- a/pkgs/client/__tests__/helpers/fixtures.ts +++ b/pkgs/client/__tests__/helpers/fixtures.ts @@ -4,10 +4,12 @@ export function createTestFlow(flowSlug?: string) { const uniqueSuffix = `${Date.now()}_${Math.random() .toString(36) .substr(2, 5)}`; + + const maxBaseLength = 48 - uniqueSuffix.length - 1; + const baseSlug = flowSlug ? flowSlug.slice(0, maxBaseLength) : 'test_flow'; + return { - slug: flowSlug - ? `${flowSlug}_${uniqueSuffix}` - : `test_flow_${uniqueSuffix}`, + slug: `${baseSlug}_${uniqueSuffix}`, options: {}, }; } diff --git a/pkgs/client/__tests__/helpers/skip-step.ts b/pkgs/client/__tests__/helpers/skip-step.ts new file mode 100644 index 000000000..2b43911ff --- /dev/null +++ b/pkgs/client/__tests__/helpers/skip-step.ts @@ -0,0 +1,19 @@ +import type postgres from 'postgres'; + +/** + * Skip a step using the internal _cascade_force_skip_steps function. + * This is a test helper that wraps the internal function. + * If pgflow.skip_step() is exposed publicly later, swap implementation here. + */ +export async function skipStep( + sql: postgres.Sql, + runId: string, + stepSlug: string, + skipReason: 'condition_unmet' | 'handler_failed' | 'dependency_skipped' +): Promise { + await sql`SELECT pgflow._cascade_force_skip_steps( + ${runId}::uuid, + ${stepSlug}::text, + ${skipReason}::text + )`; +} diff --git a/pkgs/client/__tests__/helpers/state-factories.ts b/pkgs/client/__tests__/helpers/state-factories.ts index 31786d91a..2eac1fa0f 100644 --- a/pkgs/client/__tests__/helpers/state-factories.ts +++ b/pkgs/client/__tests__/helpers/state-factories.ts @@ -22,7 +22,9 @@ export function createRunRow(overrides: Partial = {}): RunRow { }; } -export function createStepStateRow(overrides: Partial = {}): StepStateRow { +export function createStepStateRow( + overrides: Partial = {} +): StepStateRow { return { run_id: '123e4567-e89b-12d3-a456-426614174000', step_slug: 'test-step', @@ -30,16 +32,22 @@ export function createStepStateRow(overrides: Partial = {}): StepS started_at: null, completed_at: null, failed_at: null, + skipped_at: null, error_message: null, + skip_reason: null, created_at: new Date().toISOString(), flow_slug: 'test-flow', remaining_deps: 0, remaining_tasks: 1, + initial_tasks: null, + output: null, ...overrides, }; } -export function createFlowRun(overrides: Partial[0]> = {}): FlowRun { +export function createFlowRun( + overrides: Partial[0]> = {} +): FlowRun { return new FlowRun({ run_id: '123e4567-e89b-12d3-a456-426614174000', flow_slug: 'test-flow', @@ -56,7 +64,9 @@ export function createFlowRun(overrides: Partial[0]> = {}): FlowStep { +export function createFlowStep( + overrides: Partial[0]> = {} +): FlowStep { return new FlowStep({ run_id: '123e4567-e89b-12d3-a456-426614174000', step_slug: 'test-step' as any, @@ -67,6 +77,8 @@ export function createFlowStep(overrides: Partial = {} ): RunRow { const baseRun = createRunRow(overrides); - + switch (status) { case 'started': return { @@ -87,7 +99,7 @@ export function createRunInState( status: 'started', started_at: new Date().toISOString(), }; - + case 'completed': return { ...baseRun, @@ -97,7 +109,7 @@ export function createRunInState( output: { result: 'success' } as Json, remaining_steps: 0, }; - + case 'failed': return { ...baseRun, @@ -112,22 +124,22 @@ export function createRunInState( * Creates a step in a specific state with appropriate timestamps and fields */ export function createStepInState( - status: 'created' | 'started' | 'completed' | 'failed', + status: 'created' | 'started' | 'completed' | 'failed' | 'skipped', overrides: Partial = {} ): StepStateRow { const baseStep = createStepStateRow(overrides); - + switch (status) { case 'created': return baseStep; - + case 'started': return { ...baseStep, status: 'started', started_at: new Date().toISOString(), }; - + case 'completed': return { ...baseStep, @@ -135,7 +147,7 @@ export function createStepInState( started_at: new Date(Date.now() - 30000).toISOString(), completed_at: new Date().toISOString(), }; - + case 'failed': return { ...baseStep, @@ -144,21 +156,31 @@ export function createStepInState( failed_at: new Date().toISOString(), error_message: 'Step failed', }; + + case 'skipped': + return { + ...baseStep, + status: 'skipped', + skipped_at: new Date().toISOString(), + skip_reason: 'condition_unmet', + }; } } /** * Creates a complete flow run state with multiple steps */ -export function createCompleteFlowState(options: { - runId?: string; - flowSlug?: string; - runStatus?: 'started' | 'completed' | 'failed'; - steps?: Array<{ - stepSlug: string; - status: 'created' | 'started' | 'completed' | 'failed'; - }>; -} = {}): { run: RunRow; steps: StepStateRow[] } { +export function createCompleteFlowState( + options: { + runId?: string; + flowSlug?: string; + runStatus?: 'started' | 'completed' | 'failed'; + steps?: Array<{ + stepSlug: string; + status: 'created' | 'started' | 'completed' | 'failed' | 'skipped'; + }>; + } = {} +): { run: RunRow; steps: StepStateRow[] } { const { runId = '123e4567-e89b-12d3-a456-426614174000', flowSlug = 'test-flow', @@ -168,13 +190,13 @@ export function createCompleteFlowState(options: { { stepSlug: 'step-2', status: 'created' }, ], } = options; - + const run = createRunInState(runStatus, { run_id: runId, flow_slug: flowSlug, - remaining_steps: steps.filter(s => s.status !== 'completed').length, + remaining_steps: steps.filter((s) => s.status !== 'completed').length, }); - + const stepStates = steps.map(({ stepSlug, status }) => createStepInState(status, { run_id: runId, @@ -182,6 +204,6 @@ export function createCompleteFlowState(options: { step_slug: stepSlug, }) ); - + return { run, steps: stepStates }; -} \ No newline at end of file +} diff --git a/pkgs/client/__tests__/types/skipped-events.test-d.ts b/pkgs/client/__tests__/types/skipped-events.test-d.ts new file mode 100644 index 000000000..0dd9ded42 --- /dev/null +++ b/pkgs/client/__tests__/types/skipped-events.test-d.ts @@ -0,0 +1,128 @@ +import { describe, it, expectTypeOf } from 'vitest'; +import { FlowStepStatus } from '../../src/lib/types'; +import type { + SkipReason, + StepEventData, + BroadcastStepSkippedEvent, + FlowStepState, +} from '../../src/lib/types'; +import { Flow, type ExtractFlowSteps } from '@pgflow/dsl'; + +// Create a sample flow for testing +const TestFlow = new Flow<{ data: string }>({ + slug: 'test_flow', + maxAttempts: 3, + baseDelay: 5, + timeout: 10, +}) + .step({ slug: 'step_a' }, (input) => ({ + result: input.data.toUpperCase(), + })) + .step({ slug: 'step_b', dependsOn: ['step_a'] }, (input) => ({ + processed: input.step_a.result, + })); + +describe('Skipped Event Type Tests', () => { + describe('SkipReason type', () => { + it('should be a union of valid skip reasons', () => { + // SkipReason should be a union of these three values + expectTypeOf().toEqualTypeOf< + 'condition_unmet' | 'handler_failed' | 'dependency_skipped' + >(); + }); + + it('should be assignable from valid skip reasons', () => { + const conditionUnmet: SkipReason = 'condition_unmet'; + const handlerFailed: SkipReason = 'handler_failed'; + const dependencySkipped: SkipReason = 'dependency_skipped'; + + expectTypeOf(conditionUnmet).toMatchTypeOf(); + expectTypeOf(handlerFailed).toMatchTypeOf(); + expectTypeOf(dependencySkipped).toMatchTypeOf(); + }); + }); + + describe('BroadcastStepSkippedEvent type', () => { + it('should have required skipped event properties', () => { + expectTypeOf().toMatchTypeOf<{ + event_type: 'step:skipped'; + run_id: string; + step_slug: string; + status: FlowStepStatus.Skipped; + skipped_at: string; + skip_reason: SkipReason; + }>(); + }); + + it('should have status as FlowStepStatus.Skipped literal', () => { + expectTypeOf< + BroadcastStepSkippedEvent['status'] + >().toEqualTypeOf(); + }); + + it('should have event_type as step:skipped literal', () => { + expectTypeOf< + BroadcastStepSkippedEvent['event_type'] + >().toEqualTypeOf<'step:skipped'>(); + }); + }); + + describe('StepEventData with skipped event', () => { + type StepSlugs = keyof ExtractFlowSteps & string; + type TestStepEventData = StepEventData; + + it('should include skipped event type in StepEventData', () => { + // StepEventData should have a 'skipped' key + expectTypeOf().toHaveProperty('skipped'); + }); + + it('should have correct skipped event structure', () => { + type SkippedEvent = TestStepEventData['skipped']; + + expectTypeOf().toMatchTypeOf<{ + event_type: 'step:skipped'; + run_id: string; + step_slug: string; + status: FlowStepStatus.Skipped; + skipped_at: string; + skip_reason: SkipReason; + }>(); + }); + + it('should type step_slug as the flow step slug type', () => { + type SkippedEvent = TestStepEventData['skipped']; + // step_slug should be typed as the specific step slugs, not just string + expectTypeOf().toMatchTypeOf(); + }); + }); + + describe('FlowStepState with skipped fields', () => { + type StepSlugs = keyof ExtractFlowSteps & string; + type TestStepState = FlowStepState; + + it('should have skipped_at field as Date | null', () => { + expectTypeOf().toHaveProperty('skipped_at'); + expectTypeOf().toEqualTypeOf(); + }); + + it('should have skip_reason field as SkipReason | null', () => { + expectTypeOf().toHaveProperty('skip_reason'); + expectTypeOf< + TestStepState['skip_reason'] + >().toEqualTypeOf(); + }); + }); + + describe('FlowStepStatus enum', () => { + it('should include Skipped status', () => { + // Verify that FlowStepStatus has a Skipped value + expectTypeOf(FlowStepStatus.Skipped).toMatchTypeOf(); + }); + + it('should have Skipped equal to string skipped', () => { + // Verify the enum value is the expected string + const skipped: 'skipped' = FlowStepStatus.Skipped; + expectTypeOf().toEqualTypeOf<'skipped'>(); + }); + }); +}); diff --git a/pkgs/client/__tests__/unit/FlowStep.test.ts b/pkgs/client/__tests__/unit/FlowStep.test.ts index 67a8db6e8..0e2dce59e 100644 --- a/pkgs/client/__tests__/unit/FlowStep.test.ts +++ b/pkgs/client/__tests__/unit/FlowStep.test.ts @@ -10,6 +10,7 @@ import { createStepStartedEvent, createStepCompletedEvent, createStepFailedEvent, + createStepSkippedEvent, } from '../helpers/event-factories'; import { createFlowStep } from '../helpers/state-factories'; // Test scenarios have been inlined for clarity @@ -161,6 +162,74 @@ describe('FlowStep', () => { // They only have error_message as strings (already verified above) }); + test('handles skipped event correctly', () => { + const step = createFlowStep({ + run_id: RUN_ID, + step_slug: STEP_SLUG as any, + }); + + const skippedEvent = createStepSkippedEvent({ + run_id: RUN_ID, + step_slug: STEP_SLUG, + skip_reason: 'condition_unmet', + }); + + // Set up event tracking + const allTracker = createEventTracker(); + const skippedTracker = createEventTracker(); + step.on('*', allTracker.callback); + step.on('skipped', skippedTracker.callback); + + // Update state and verify + const result = step.updateState(skippedEvent); + expect(result).toBe(true); + + // Check state was updated correctly + expect(step.status).toBe(FlowStepStatus.Skipped); + expect(step.skipped_at).toBeInstanceOf(Date); + expect(step.skip_reason).toBe('condition_unmet'); + // Skipped steps have null output per Q1 design decision + expect(step.output).toBeNull(); + + // Verify events were emitted with comprehensive matchers + expect(allTracker).toHaveReceivedTotalEvents(1); + expect(skippedTracker).toHaveReceivedEvent('step:skipped'); + expect(allTracker).toHaveReceivedEvent('step:skipped', { + run_id: RUN_ID, + step_slug: STEP_SLUG, + status: FlowStepStatus.Skipped, + skip_reason: 'condition_unmet', + }); + expect(allTracker).toNotHaveReceivedEvent('step:completed'); + expect(allTracker).toNotHaveReceivedEvent('step:failed'); + expect(allTracker).toNotHaveReceivedEvent('step:started'); + }); + + test('handles skipped event with different skip reasons', () => { + const skipReasons = [ + 'condition_unmet', + 'handler_failed', + 'dependency_skipped', + ] as const; + + for (const skipReason of skipReasons) { + const step = createFlowStep({ + run_id: RUN_ID, + step_slug: STEP_SLUG as any, + }); + + const skippedEvent = createStepSkippedEvent({ + run_id: RUN_ID, + step_slug: STEP_SLUG, + skip_reason: skipReason, + }); + + const result = step.updateState(skippedEvent); + expect(result).toBe(true); + expect(step.skip_reason).toBe(skipReason); + } + }); + test('handles events with missing fields gracefully', () => { const step = new FlowStep({ run_id: RUN_ID, @@ -172,6 +241,8 @@ describe('FlowStep', () => { started_at: null, completed_at: null, failed_at: null, + skipped_at: null, + skip_reason: null, }); // Incomplete failed event without timestamps @@ -179,12 +250,12 @@ describe('FlowStep', () => { run_id: RUN_ID, step_slug: STEP_SLUG, status: FlowStepStatus.Failed, - error_message: 'Something went wrong' + error_message: 'Something went wrong', }; - + // Should still update the state correctly step.updateState(incompleteEvent as any); - + expect(step.status).toBe(FlowStepStatus.Failed); expect(step.failed_at).toBeInstanceOf(Date); expect(step.error_message).toBe('Something went wrong'); @@ -280,14 +351,14 @@ describe('FlowStep', () => { error_message: 'Attempt to override completed step', }); const result = step.updateState(failedEvent); - + // Should not update terminal state expect(result).toBe(false); expect(step.status).toBe(FlowStepStatus.Completed); - + // Output should remain unchanged expect(step.output).toEqual({ step_result: 'success' }); - + // Error fields should remain null expect(step.error).toBeNull(); expect(step.error_message).toBeNull(); @@ -316,13 +387,59 @@ describe('FlowStep', () => { // Try to update with a new completed event const result = step.updateState(newCompletedEvent); - + // Should not update terminal state even with same status expect(result).toBe(false); - + // Output should remain unchanged expect(step.output).toEqual({ step_result: 'success' }); }); + + test('protects skipped terminal state from subsequent updates', () => { + // Create a step in skipped state + const step = new FlowStep({ + run_id: RUN_ID, + step_slug: STEP_SLUG as any, + status: FlowStepStatus.Skipped, + output: null, + error: null, + error_message: null, + started_at: null, + completed_at: null, + failed_at: null, + skipped_at: new Date(), + skip_reason: 'condition_unmet', + }); + + // Try to update to started state + const startedEvent = createStepStartedEvent({ + run_id: RUN_ID, + step_slug: STEP_SLUG, + }); + const result = step.updateState(startedEvent); + + // Should not update terminal skipped state + expect(result).toBe(false); + expect(step.status).toBe(FlowStepStatus.Skipped); + + // Try to update to completed state + const completedEvent = createStepCompletedEvent({ + run_id: RUN_ID, + step_slug: STEP_SLUG, + output: { result: 'success' }, + }); + expect(step.updateState(completedEvent)).toBe(false); + expect(step.status).toBe(FlowStepStatus.Skipped); + + // Try to update to failed state + const failedEvent = createStepFailedEvent({ + run_id: RUN_ID, + step_slug: STEP_SLUG, + error_message: 'Attempt to override skipped step', + }); + expect(step.updateState(failedEvent)).toBe(false); + expect(step.status).toBe(FlowStepStatus.Skipped); + }); }); describe('Step Event Lifecycles', () => { @@ -352,7 +469,10 @@ describe('FlowStep', () => { // Verify exact event sequence expect(tracker).toHaveReceivedTotalEvents(2); - expect(tracker).toHaveReceivedEventSequence(['step:started', 'step:completed']); + expect(tracker).toHaveReceivedEventSequence([ + 'step:started', + 'step:completed', + ]); expect(tracker).toHaveReceivedInOrder('step:started', 'step:completed'); expect(tracker).toNotHaveReceivedEvent('step:failed'); @@ -396,7 +516,10 @@ describe('FlowStep', () => { // Verify exact event sequence expect(tracker).toHaveReceivedTotalEvents(2); - expect(tracker).toHaveReceivedEventSequence(['step:started', 'step:failed']); + expect(tracker).toHaveReceivedEventSequence([ + 'step:started', + 'step:failed', + ]); expect(tracker).toHaveReceivedInOrder('step:started', 'step:failed'); expect(tracker).toNotHaveReceivedEvent('step:completed'); @@ -409,6 +532,41 @@ describe('FlowStep', () => { }); }); + test('skipped step: skipped ONLY (no started event)', () => { + // Skipped steps go directly from created to skipped without starting + const step = createFlowStep({ + run_id: RUN_ID, + step_slug: STEP_SLUG as any, + status: FlowStepStatus.Created, + }); + + const tracker = createEventTracker(); + step.on('*', tracker.callback); + + // Simulate skipped event (no started event for skipped steps) + const skippedEvent = createStepSkippedEvent({ + run_id: RUN_ID, + step_slug: STEP_SLUG, + skip_reason: 'condition_unmet', + }); + step.updateState(skippedEvent); + + // Verify ONLY skipped event was emitted + expect(tracker).toHaveReceivedTotalEvents(1); + expect(tracker).toNotHaveReceivedEvent('step:started'); + expect(tracker).toNotHaveReceivedEvent('step:completed'); + expect(tracker).toNotHaveReceivedEvent('step:failed'); + expect(tracker).toHaveReceivedEvent('step:skipped'); + + // Verify skipped payload + expect(tracker).toHaveReceivedEvent('step:skipped', { + run_id: RUN_ID, + step_slug: STEP_SLUG, + status: FlowStepStatus.Skipped, + skip_reason: 'condition_unmet', + }); + }); + test('empty map step: completed ONLY (no started event)', () => { // Empty maps skip started and go straight to completed const step = createFlowStep({ @@ -589,7 +747,7 @@ describe('FlowStep', () => { // Create a promise that should resolve when the status is updated const waitPromise = step.waitForStatus(FlowStepStatus.Completed); - + // Update the status after a delay setTimeout(() => { const completedEvent = createStepCompletedEvent({ @@ -602,7 +760,7 @@ describe('FlowStep', () => { // Advance timers to trigger the update await advanceTimersAndFlush(1000); - + // Wait for the promise to resolve const result = await waitPromise; expect(result).toBe(step); @@ -623,14 +781,18 @@ describe('FlowStep', () => { }); // Should timeout after 5000ms (default is 5min, but we'll use a shorter timeout) - const waitPromise = step.waitForStatus(FlowStepStatus.Completed, { timeoutMs: 5000 }); - + const waitPromise = step.waitForStatus(FlowStepStatus.Completed, { + timeoutMs: 5000, + }); + // Immediately add catch handler to avoid unhandled rejection - const expectPromise = expect(waitPromise).rejects.toThrow(/Timeout waiting for step/); - + const expectPromise = expect(waitPromise).rejects.toThrow( + /Timeout waiting for step/ + ); + // Advance timers past the timeout await advanceTimersAndFlush(5001); - + // Wait for the expectation to complete await expectPromise; }); @@ -651,7 +813,7 @@ describe('FlowStep', () => { // Should resolve immediately since already in completed status const waitPromise = step.waitForStatus(FlowStepStatus.Completed); const result = await waitPromise; - + expect(result).toBe(step); }); @@ -669,19 +831,21 @@ describe('FlowStep', () => { }); const controller = new AbortController(); - const waitPromise = step.waitForStatus(FlowStepStatus.Completed, { - signal: controller.signal + const waitPromise = step.waitForStatus(FlowStepStatus.Completed, { + signal: controller.signal, }); - + // Immediately add catch handler to avoid unhandled rejection - const expectPromise = expect(waitPromise).rejects.toThrow(/Aborted waiting for step/); - + const expectPromise = expect(waitPromise).rejects.toThrow( + /Aborted waiting for step/ + ); + // Abort the operation setTimeout(() => controller.abort(), 1000); - + // Advance timers to trigger the abort await advanceTimersAndFlush(1000); - + // Wait for the expectation to complete await expectPromise; }); @@ -700,8 +864,10 @@ describe('FlowStep', () => { }); // Create a promise that should resolve if status is reached before timeout - const waitPromise = step.waitForStatus(FlowStepStatus.Completed, { timeoutMs: 5000 }); - + const waitPromise = step.waitForStatus(FlowStepStatus.Completed, { + timeoutMs: 5000, + }); + // Update status before timeout setTimeout(() => { const completedEvent = createStepCompletedEvent({ @@ -711,10 +877,10 @@ describe('FlowStep', () => { }); step.updateState(completedEvent); }, 1000); - + // Advance timers partway await advanceTimersAndFlush(1000); - + // The promise should resolve const result = await waitPromise; expect(result).toBe(step); @@ -736,7 +902,7 @@ describe('FlowStep', () => { // Test waiting for started status const startedPromise = step.waitForStatus(FlowStepStatus.Started); - + // Update to started setTimeout(() => { const startedEvent = createStepStartedEvent({ @@ -745,15 +911,15 @@ describe('FlowStep', () => { }); step.updateState(startedEvent); }, 1000); - + await advanceTimersAndFlush(1000); - + const startedResult = await startedPromise; expect(startedResult.status).toBe(FlowStepStatus.Started); - + // Test waiting for failed status const failedPromise = step.waitForStatus(FlowStepStatus.Failed); - + // Update to failed setTimeout(() => { const failedEvent = createStepFailedEvent({ @@ -763,9 +929,9 @@ describe('FlowStep', () => { }); step.updateState(failedEvent); }, 1000); - + await advanceTimersAndFlush(1000); - + const failedResult = await failedPromise; expect(failedResult.status).toBe(FlowStepStatus.Failed); }); @@ -780,8 +946,12 @@ describe('FlowStep', () => { }); // Start waiting for 'started' status - const waitPromise = step.waitForStatus(FlowStepStatus.Started, { timeoutMs: 2000 }); - const expectPromise = expect(waitPromise).rejects.toThrow(/Timeout waiting for step/); + const waitPromise = step.waitForStatus(FlowStepStatus.Started, { + timeoutMs: 2000, + }); + const expectPromise = expect(waitPromise).rejects.toThrow( + /Timeout waiting for step/ + ); // Simulate empty map: goes directly to completed WITHOUT started event setTimeout(() => { @@ -812,7 +982,9 @@ describe('FlowStep', () => { }); // Start waiting for completed - const waitPromise = step.waitForStatus(FlowStepStatus.Completed, { timeoutMs: 5000 }); + const waitPromise = step.waitForStatus(FlowStepStatus.Completed, { + timeoutMs: 5000, + }); // Simulate empty map: goes directly to completed setTimeout(() => { @@ -841,7 +1013,9 @@ describe('FlowStep', () => { }); // Normal steps should successfully wait for 'started' - const waitPromise = step.waitForStatus(FlowStepStatus.Started, { timeoutMs: 5000 }); + const waitPromise = step.waitForStatus(FlowStepStatus.Started, { + timeoutMs: 5000, + }); // Simulate normal step: sends started event setTimeout(() => { @@ -858,5 +1032,35 @@ describe('FlowStep', () => { const result = await waitPromise; expect(result.status).toBe(FlowStepStatus.Started); }); + + test('waitForStatus(Skipped) resolves when step is skipped', async () => { + const step = createFlowStep({ + run_id: RUN_ID, + step_slug: STEP_SLUG as any, + status: FlowStepStatus.Created, + }); + + // Start waiting for 'skipped' status + const waitPromise = step.waitForStatus(FlowStepStatus.Skipped, { + timeoutMs: 5000, + }); + + // Simulate skipped event + setTimeout(() => { + const skippedEvent = createStepSkippedEvent({ + run_id: RUN_ID, + step_slug: STEP_SLUG, + skip_reason: 'condition_unmet', + }); + step.updateState(skippedEvent); + }, 100); + + await advanceTimersAndFlush(100); + + // Should resolve successfully + const result = await waitPromise; + expect(result.status).toBe(FlowStepStatus.Skipped); + expect(result.skip_reason).toBe('condition_unmet'); + }); }); -}); \ No newline at end of file +}); diff --git a/pkgs/client/src/lib/FlowRun.ts b/pkgs/client/src/lib/FlowRun.ts index 891c75f96..fe9e8f9ee 100644 --- a/pkgs/client/src/lib/FlowRun.ts +++ b/pkgs/client/src/lib/FlowRun.ts @@ -169,6 +169,8 @@ export class FlowRun started_at: null, completed_at: null, failed_at: null, + skipped_at: null, + skip_reason: null, }); // Cache the step @@ -277,7 +279,9 @@ export class FlowRun this.#state.input = row.input as ExtractFlowInput; this.#state.output = row.output as ExtractFlowOutput | null; this.#state.started_at = row.started_at ? new Date(row.started_at) : null; - this.#state.completed_at = row.completed_at ? new Date(row.completed_at) : null; + this.#state.completed_at = row.completed_at + ? new Date(row.completed_at) + : null; this.#state.failed_at = row.failed_at ? new Date(row.failed_at) : null; this.#state.remaining_steps = row.remaining_steps; this.#state.error_message = null; // Database doesn't have error_message for runs diff --git a/pkgs/client/src/lib/FlowStep.ts b/pkgs/client/src/lib/FlowStep.ts index 717d62bd7..20003d7df 100644 --- a/pkgs/client/src/lib/FlowStep.ts +++ b/pkgs/client/src/lib/FlowStep.ts @@ -1,12 +1,13 @@ import { createNanoEvents } from 'nanoevents'; import type { AnyFlow, ExtractFlowSteps, StepOutput } from '@pgflow/dsl'; import { FlowStepStatus } from './types.js'; -import type { - FlowStepState, - StepEvents, - Unsubscribe, +import type { + FlowStepState, + StepEvents, + Unsubscribe, FlowStepBase, - StepEvent + StepEvent, + SkipReason, } from './types.js'; /** @@ -15,7 +16,8 @@ import type { export class FlowStep< TFlow extends AnyFlow, TStepSlug extends keyof ExtractFlowSteps & string -> implements FlowStepBase> { +> implements FlowStepBase> +{ #state: FlowStepState; #events = createNanoEvents>(); #statusPrecedence: Record = { @@ -23,11 +25,12 @@ export class FlowStep< [FlowStepStatus.Started]: 1, [FlowStepStatus.Completed]: 2, [FlowStepStatus.Failed]: 3, + [FlowStepStatus.Skipped]: 4, }; /** * Creates a new FlowStep instance - * + * * @param initialState - Initial state for the step */ constructor(initialState: FlowStepState) { @@ -76,6 +79,20 @@ export class FlowStep< return this.#state.failed_at; } + /** + * Get the skipped_at timestamp + */ + get skipped_at(): Date | null { + return this.#state.skipped_at; + } + + /** + * Get the skip reason + */ + get skip_reason(): SkipReason | null { + return this.#state.skip_reason; + } + /** * Get the step output */ @@ -99,7 +116,7 @@ export class FlowStep< /** * Register an event handler for a step event - * + * * @param event - Event type to listen for * @param callback - Callback function to execute when event is emitted * @returns Function to unsubscribe from the event @@ -113,13 +130,17 @@ export class FlowStep< /** * Wait for the step to reach a specific status - * + * * @param targetStatus - The status to wait for * @param options - Optional timeout and abort signal * @returns Promise that resolves with the step instance when the status is reached */ waitForStatus( - targetStatus: FlowStepStatus.Started | FlowStepStatus.Completed | FlowStepStatus.Failed, + targetStatus: + | FlowStepStatus.Started + | FlowStepStatus.Completed + | FlowStepStatus.Failed + | FlowStepStatus.Skipped, options?: { timeoutMs?: number; signal?: AbortSignal } ): Promise { const timeoutMs = options?.timeoutMs ?? 5 * 60 * 1000; // Default 5 minutes @@ -134,17 +155,21 @@ export class FlowStep< return new Promise((resolve, reject) => { let timeoutId: NodeJS.Timeout | undefined; let cleanedUp = false; - + // Set up timeout if provided if (timeoutMs > 0) { timeoutId = setTimeout(() => { if (cleanedUp) return; // Prevent firing if already cleaned up cleanedUp = true; unbind(); - reject(new Error(`Timeout waiting for step ${this.step_slug} to reach status '${targetStatus}'`)); + reject( + new Error( + `Timeout waiting for step ${this.step_slug} to reach status '${targetStatus}'` + ) + ); }, timeoutMs); } - + // Set up abort signal if provided let abortCleanup: (() => void) | undefined; if (signal) { @@ -153,15 +178,19 @@ export class FlowStep< cleanedUp = true; if (timeoutId) clearTimeout(timeoutId); unbind(); - reject(new Error(`Aborted waiting for step ${this.step_slug} to reach status '${targetStatus}'`)); + reject( + new Error( + `Aborted waiting for step ${this.step_slug} to reach status '${targetStatus}'` + ) + ); }; - + signal.addEventListener('abort', abortHandler); abortCleanup = () => { signal.removeEventListener('abort', abortHandler); }; } - + // Subscribe to all events const unbind = this.on('*', (event) => { if (event.status === targetStatus) { @@ -187,10 +216,14 @@ export class FlowStep< // Direct state assignment from database row (no event conversion) this.#state.status = row.status as FlowStepStatus; this.#state.started_at = row.started_at ? new Date(row.started_at) : null; - this.#state.completed_at = row.completed_at ? new Date(row.completed_at) : null; + this.#state.completed_at = row.completed_at + ? new Date(row.completed_at) + : null; this.#state.failed_at = row.failed_at ? new Date(row.failed_at) : null; this.#state.error_message = row.error_message; this.#state.error = row.error_message ? new Error(row.error_message) : null; + this.#state.skipped_at = row.skipped_at ? new Date(row.skipped_at) : null; + this.#state.skip_reason = row.skip_reason as SkipReason | null; // Note: output is not stored in step_states table, remains null } @@ -209,12 +242,12 @@ export class FlowStep< if (event.step_slug !== this.#state.step_slug) { return false; } - + // Validate event is for this run if (event.run_id !== this.#state.run_id) { return false; } - + // Check if the event status has higher precedence than current status if (!this.#shouldUpdateStatus(this.#state.status, event.status)) { return false; @@ -226,7 +259,10 @@ export class FlowStep< this.#state = { ...this.#state, status: FlowStepStatus.Started, - started_at: typeof event.started_at === 'string' ? new Date(event.started_at) : new Date(), + started_at: + typeof event.started_at === 'string' + ? new Date(event.started_at) + : new Date(), }; this.#events.emit('started', event); break; @@ -235,7 +271,10 @@ export class FlowStep< this.#state = { ...this.#state, status: FlowStepStatus.Completed, - completed_at: typeof event.completed_at === 'string' ? new Date(event.completed_at) : new Date(), + completed_at: + typeof event.completed_at === 'string' + ? new Date(event.completed_at) + : new Date(), output: event.output as StepOutput, }; this.#events.emit('completed', event); @@ -245,13 +284,36 @@ export class FlowStep< this.#state = { ...this.#state, status: FlowStepStatus.Failed, - failed_at: typeof event.failed_at === 'string' ? new Date(event.failed_at) : new Date(), - error_message: typeof event.error_message === 'string' ? event.error_message : 'Unknown error', - error: new Error(typeof event.error_message === 'string' ? event.error_message : 'Unknown error'), + failed_at: + typeof event.failed_at === 'string' + ? new Date(event.failed_at) + : new Date(), + error_message: + typeof event.error_message === 'string' + ? event.error_message + : 'Unknown error', + error: new Error( + typeof event.error_message === 'string' + ? event.error_message + : 'Unknown error' + ), }; this.#events.emit('failed', event); break; + case FlowStepStatus.Skipped: + this.#state = { + ...this.#state, + status: FlowStepStatus.Skipped, + skipped_at: + typeof event.skipped_at === 'string' + ? new Date(event.skipped_at) + : new Date(), + skip_reason: event.skip_reason, + }; + this.#events.emit('skipped', event); + break; + default: { // Exhaustiveness check - ensures all event statuses are handled event satisfies never; @@ -261,27 +323,34 @@ export class FlowStep< // Also emit to the catch-all listener this.#events.emit('*', event); - + return true; } /** * Determines if a status should be updated based on precedence - * + * * @param currentStatus - Current status * @param newStatus - New status * @returns true if the status should be updated, false otherwise */ - #shouldUpdateStatus(currentStatus: FlowStepStatus, newStatus: FlowStepStatus): boolean { + #shouldUpdateStatus( + currentStatus: FlowStepStatus, + newStatus: FlowStepStatus + ): boolean { // Don't allow changes to terminal states - if (currentStatus === FlowStepStatus.Completed || currentStatus === FlowStepStatus.Failed) { + if ( + currentStatus === FlowStepStatus.Completed || + currentStatus === FlowStepStatus.Failed || + currentStatus === FlowStepStatus.Skipped + ) { return false; // Terminal states should never change } - + const currentPrecedence = this.#statusPrecedence[currentStatus]; const newPrecedence = this.#statusPrecedence[newStatus]; // Only allow transitions to higher precedence non-terminal status return newPrecedence > currentPrecedence; } -} \ No newline at end of file +} diff --git a/pkgs/client/src/lib/eventAdapters.ts b/pkgs/client/src/lib/eventAdapters.ts index 95e57462a..8bbefe213 100644 --- a/pkgs/client/src/lib/eventAdapters.ts +++ b/pkgs/client/src/lib/eventAdapters.ts @@ -6,15 +6,13 @@ import type { StepOutput, } from '@pgflow/dsl'; import type { RunRow, StepStateRow } from '@pgflow/core'; -import { - FlowStepStatus, - FlowRunStatus, -} from './types.js'; +import { FlowStepStatus, FlowRunStatus } from './types.js'; import type { BroadcastRunEvent, BroadcastStepEvent, FlowRunEvent, StepEvent, + SkipReason, } from './types.js'; /** @@ -60,7 +58,7 @@ export function toTypedRunEvent( */ export function toTypedStepEvent< TFlow extends AnyFlow, - TStepSlug extends keyof ExtractFlowSteps & string, + TStepSlug extends keyof ExtractFlowSteps & string >(evt: BroadcastStepEvent): StepEvent { switch (evt.status) { case FlowStepStatus.Started: @@ -89,6 +87,15 @@ export function toTypedStepEvent< failed_at: evt.failed_at, error_message: evt.error_message, }; + case FlowStepStatus.Skipped: + return { + event_type: 'step:skipped', + run_id: evt.run_id, + step_slug: evt.step_slug as TStepSlug, + status: FlowStepStatus.Skipped, + skipped_at: evt.skipped_at, + skip_reason: evt.skip_reason, + }; } } @@ -137,7 +144,7 @@ export function runRowToTypedEvent( */ export function stepStateRowToTypedEvent< TFlow extends AnyFlow, - TStepSlug extends keyof ExtractFlowSteps & string, + TStepSlug extends keyof ExtractFlowSteps & string >(row: StepStateRow): StepEvent { switch (row.status) { case 'created': @@ -167,7 +174,16 @@ export function stepStateRowToTypedEvent< failed_at: row.failed_at!, error_message: row.error_message || 'Step failed', }; + case 'skipped': + return { + event_type: 'step:skipped', + run_id: row.run_id, + step_slug: row.step_slug as TStepSlug, + status: FlowStepStatus.Skipped, + skipped_at: row.skipped_at!, + skip_reason: (row.skip_reason as SkipReason) ?? 'condition_unmet', + }; default: throw new Error(`Unknown step status: ${row.status}`); } -} \ No newline at end of file +} diff --git a/pkgs/client/src/lib/types.ts b/pkgs/client/src/lib/types.ts index 015ae7342..d35a60399 100644 --- a/pkgs/client/src/lib/types.ts +++ b/pkgs/client/src/lib/types.ts @@ -125,8 +125,17 @@ export enum FlowStepStatus { Started = 'started', Completed = 'completed', Failed = 'failed', + Skipped = 'skipped', } +/** + * Reason why a step was skipped + */ +export type SkipReason = + | 'condition_unmet' + | 'handler_failed' + | 'dependency_skipped'; + /** * Step event data types (no circular reference) */ @@ -157,6 +166,14 @@ export type StepEventData< status: FlowStepStatus.Failed; failed_at: string; }; + skipped: { + event_type: 'step:skipped'; + run_id: string; + step_slug: TStepSlug; + status: FlowStepStatus.Skipped; + skipped_at: string; + skip_reason: SkipReason; + }; }; /** @@ -182,7 +199,8 @@ export function isStepEvent< 'status' in value && (value.status === FlowStepStatus.Started || value.status === FlowStepStatus.Completed || - value.status === FlowStepStatus.Failed) + value.status === FlowStepStatus.Failed || + value.status === FlowStepStatus.Skipped) ); } @@ -231,6 +249,21 @@ export function isStepFailedEvent< ); } +/** + * Type guard for skipped step events + */ +export function isStepSkippedEvent< + TFlow extends AnyFlow, + TStepSlug extends keyof ExtractFlowSteps & string +>(event: unknown): event is StepEventData['skipped'] { + return ( + isStepEvent(event) && + event.status === FlowStepStatus.Skipped && + 'event_type' in event && + event.event_type === 'step:skipped' + ); +} + /** * Step event types matching nanoevents expectations (wildcard added separately) */ @@ -323,10 +356,22 @@ export type BroadcastStepFailedEvent = { output?: Json; // Adding for type compatibility }; +export type BroadcastStepSkippedEvent = { + event_type: 'step:skipped'; + run_id: string; + step_slug: string; + status: FlowStepStatus.Skipped; + skipped_at: string; + skip_reason: SkipReason; + error_message?: string; // Adding for type compatibility + output?: Json; // Adding for type compatibility +}; + export type BroadcastStepEvent = | BroadcastStepStartedEvent | BroadcastStepCompletedEvent - | BroadcastStepFailedEvent; + | BroadcastStepFailedEvent + | BroadcastStepSkippedEvent; /** * Flow run state @@ -361,6 +406,8 @@ export type FlowStepState< started_at: Date | null; completed_at: Date | null; failed_at: Date | null; + skipped_at: Date | null; + skip_reason: SkipReason | null; }; /**