diff --git a/pkgs/edge-worker/tests/integration/flow/_conditionalHelpers.ts b/pkgs/edge-worker/tests/integration/flow/_conditionalHelpers.ts new file mode 100644 index 000000000..f6067b28b --- /dev/null +++ b/pkgs/edge-worker/tests/integration/flow/_conditionalHelpers.ts @@ -0,0 +1,65 @@ +import type { postgres } from '../../sql.ts'; +import { compileFlow } from '@pgflow/dsl'; +import type { AnyFlow } from '@pgflow/dsl'; + +// ============= Test Helpers for Conditional Flow Integration Tests ============= + +/** + * Compiles a Flow and executes SQL statements to create it in the database. + */ +export const createFlowInDb = async (sql: postgres.Sql, flow: AnyFlow) => { + const statements = compileFlow(flow); + for (const stmt of statements) { + await sql.unsafe(stmt); + } +}; + +/** + * Extended step state info including skip details + */ +export interface StepStateWithSkip { + step_slug: string; + status: string; + skip_reason: string | null; + skipped_at: string | null; +} + +/** + * Get step states with skip information + */ +export const getStepStatesWithSkip = async ( + sql: postgres.Sql, + runId: string +): Promise => { + return await sql` + SELECT step_slug, status, skip_reason, skipped_at + FROM pgflow.step_states + WHERE run_id = ${runId} + ORDER BY step_slug; + `; +}; + +/** + * Extended task info including error details + */ +export interface TaskWithError { + step_slug: string; + status: string; + error_message: string | null; + attempts_count: number; +} + +/** + * Get step tasks with error information + */ +export const getStepTasksWithError = async ( + sql: postgres.Sql, + runId: string +): Promise => { + return await sql` + SELECT step_slug, status, error_message, attempts_count + FROM pgflow.step_tasks + WHERE run_id = ${runId} + ORDER BY step_slug, task_index; + `; +}; diff --git a/pkgs/edge-worker/tests/integration/flow/conditionalFlow.test.ts b/pkgs/edge-worker/tests/integration/flow/conditionalFlow.test.ts new file mode 100644 index 000000000..0a1ac5b41 --- /dev/null +++ b/pkgs/edge-worker/tests/integration/flow/conditionalFlow.test.ts @@ -0,0 +1,571 @@ +import { assert, assertEquals } from '@std/assert'; +import { withPgNoTransaction } from '../../db.ts'; +import { Flow } from '@pgflow/dsl'; +import { delay } from '@std/async'; +import { startFlow, startWorker } from '../_helpers.ts'; +import { waitForRunCompletion } from './_testHelpers.ts'; +import { + createFlowInDb, + getStepStatesWithSkip, +} from './_conditionalHelpers.ts'; + +// Common worker config for all tests +const workerConfig = { + maxConcurrent: 1, + batchSize: 10, + maxPollSeconds: 1, + pollIntervalMs: 200, +} as const; + +// ============================================================================= +// Test 1: Step with 'if' condition - condition met (step runs) +// ============================================================================= +Deno.test( + 'conditional if - condition met runs step', + withPgNoTransaction(async (sql) => { + await sql`select pgflow_tests.reset_db();`; + + const stepWasCalled: Record = { + premium_feature: false, + }; + + const ConditionalIfMetFlow = new Flow<{ premium: boolean }>({ + slug: 'test_conditional_if_met', + }) + .step({ slug: 'base' }, async (input) => { + await delay(1); + return { premium: input.premium }; + }) + .step( + { + slug: 'premium_feature', + dependsOn: ['base'], + if: { base: { premium: true } }, + }, + async (deps) => { + stepWasCalled['premium_feature'] = true; + await delay(1); + return { accessed: true, from: deps.base }; + } + ); + + const worker = startWorker(sql, ConditionalIfMetFlow, workerConfig); + + try { + await createFlowInDb(sql, ConditionalIfMetFlow); + + const flowRun = await startFlow(sql, ConditionalIfMetFlow, { + premium: true, + }); + const polledRun = await waitForRunCompletion(sql, flowRun.run_id); + + assertEquals(polledRun.status, 'completed'); + + const stepStates = await getStepStatesWithSkip(sql, flowRun.run_id); + assertEquals(stepStates.length, 2); + for (const state of stepStates) { + assertEquals(state.status, 'completed'); + assertEquals(state.skip_reason, null); + assertEquals(state.skipped_at, null); + } + + // Only leaf steps (steps with no dependents) that completed are included in output + // 'base' is not a leaf (has dependent 'premium_feature'), so only 'premium_feature' appears + assertEquals(polledRun.output, { + premium_feature: { accessed: true, from: { premium: true } }, + }); + + assertEquals( + stepWasCalled['premium_feature'], + true, + 'premium_feature should have been called' + ); + } finally { + await worker.stop(); + } + }) +); + +// ============================================================================= +// Test 2: Step with 'if' condition - condition NOT met (step skipped) +// ============================================================================= +Deno.test( + 'conditional if - condition unmet skips step with condition_unmet', + withPgNoTransaction(async (sql) => { + await sql`select pgflow_tests.reset_db();`; + + const stepWasCalled: Record = { + premium_feature: false, + }; + + const ConditionalIfUnmetFlow = new Flow<{ premium: boolean }>({ + slug: 'test_conditional_if_unmet', + }) + .step({ slug: 'base' }, async (input) => { + await delay(1); + return { premium: input.premium }; + }) + .step( + { + slug: 'premium_feature', + dependsOn: ['base'], + if: { base: { premium: true } }, + }, + async (deps) => { + stepWasCalled['premium_feature'] = true; + await delay(1); + return { accessed: true, from: deps.base }; + } + ); + + const worker = startWorker(sql, ConditionalIfUnmetFlow, workerConfig); + + try { + await createFlowInDb(sql, ConditionalIfUnmetFlow); + + const flowRun = await startFlow(sql, ConditionalIfUnmetFlow, { + premium: false, + }); + const polledRun = await waitForRunCompletion(sql, flowRun.run_id); + + assertEquals(polledRun.status, 'completed'); + + const stepStates = await getStepStatesWithSkip(sql, flowRun.run_id); + assertEquals(stepStates.length, 2); + + const baseState = stepStates.find((s) => s.step_slug === 'base'); + assertEquals(baseState?.status, 'completed'); + assertEquals(baseState?.skip_reason, null); + + const premiumState = stepStates.find( + (s) => s.step_slug === 'premium_feature' + ); + assertEquals(premiumState?.status, 'skipped'); + assertEquals(premiumState?.skip_reason, 'condition_unmet'); + assert(premiumState?.skipped_at !== null, 'skipped_at should be set'); + + // Only leaf steps that completed are included in output + // 'base' is not a leaf, and 'premium_feature' (the only leaf) was skipped + // so output is null (jsonb_object_agg returns null for empty set) + assertEquals(polledRun.output, null); + + assertEquals( + stepWasCalled['premium_feature'], + false, + 'premium_feature should NOT have been called' + ); + } finally { + await worker.stop(); + } + }) +); + +// ============================================================================= +// Test 3: Step with 'ifNot' condition - forbidden pattern absent (step runs) +// ============================================================================= +Deno.test( + 'conditional ifNot - forbidden pattern absent runs step', + withPgNoTransaction(async (sql) => { + await sql`select pgflow_tests.reset_db();`; + + const stepWasCalled: Record = { + optional_feature: false, + }; + + const ConditionalIfNotMetFlow = new Flow<{ disabled: boolean }>({ + slug: 'test_conditional_ifnot_met', + }) + .step({ slug: 'base' }, async (input) => { + await delay(1); + return { disabled: input.disabled }; + }) + .step( + { + slug: 'optional_feature', + dependsOn: ['base'], + ifNot: { base: { disabled: true } }, + }, + async (deps) => { + stepWasCalled['optional_feature'] = true; + await delay(1); + return { ran: true, from: deps.base }; + } + ); + + const worker = startWorker(sql, ConditionalIfNotMetFlow, workerConfig); + + try { + await createFlowInDb(sql, ConditionalIfNotMetFlow); + + const flowRun = await startFlow(sql, ConditionalIfNotMetFlow, { + disabled: false, + }); + const polledRun = await waitForRunCompletion(sql, flowRun.run_id); + + assertEquals(polledRun.status, 'completed'); + + const stepStates = await getStepStatesWithSkip(sql, flowRun.run_id); + assertEquals(stepStates.length, 2); + for (const state of stepStates) { + assertEquals(state.status, 'completed'); + assertEquals(state.skip_reason, null); + } + + // Only leaf steps (steps with no dependents) that completed are included in output + // 'base' is not a leaf (has dependent 'optional_feature'), so only 'optional_feature' appears + assertEquals(polledRun.output, { + optional_feature: { ran: true, from: { disabled: false } }, + }); + + assertEquals( + stepWasCalled['optional_feature'], + true, + 'optional_feature should have been called' + ); + } finally { + await worker.stop(); + } + }) +); + +// ============================================================================= +// Test 4: Step with 'ifNot' condition - forbidden pattern present (step skipped) +// ============================================================================= +Deno.test( + 'conditional ifNot - forbidden pattern present skips step', + withPgNoTransaction(async (sql) => { + await sql`select pgflow_tests.reset_db();`; + + const stepWasCalled: Record = { + optional_feature: false, + }; + + const ConditionalIfNotUnmetFlow = new Flow<{ disabled: boolean }>({ + slug: 'test_conditional_ifnot_unmet', + }) + .step({ slug: 'base' }, async (input) => { + await delay(1); + return { disabled: input.disabled }; + }) + .step( + { + slug: 'optional_feature', + dependsOn: ['base'], + ifNot: { base: { disabled: true } }, + }, + async (deps) => { + stepWasCalled['optional_feature'] = true; + await delay(1); + return { ran: true, from: deps.base }; + } + ); + + const worker = startWorker(sql, ConditionalIfNotUnmetFlow, workerConfig); + + try { + await createFlowInDb(sql, ConditionalIfNotUnmetFlow); + + const flowRun = await startFlow(sql, ConditionalIfNotUnmetFlow, { + disabled: true, + }); + const polledRun = await waitForRunCompletion(sql, flowRun.run_id); + + assertEquals(polledRun.status, 'completed'); + + const stepStates = await getStepStatesWithSkip(sql, flowRun.run_id); + const optionalState = stepStates.find( + (s) => s.step_slug === 'optional_feature' + ); + assertEquals(optionalState?.status, 'skipped'); + assertEquals(optionalState?.skip_reason, 'condition_unmet'); + assert(optionalState?.skipped_at !== null); + + assertEquals( + stepWasCalled['optional_feature'], + false, + 'optional_feature should NOT have been called' + ); + } finally { + await worker.stop(); + } + }) +); + +// ============================================================================= +// Test 5: Skip cascades - dependent step also skipped when parent skipped +// ============================================================================= +Deno.test( + 'conditional skip cascades to dependent steps', + withPgNoTransaction(async (sql) => { + await sql`select pgflow_tests.reset_db();`; + + const stepWasCalled: Record = { + premium_feature: false, + depends_on_premium: false, + }; + + const SkipCascadeFlow = new Flow<{ premium: boolean }>({ + slug: 'test_skip_cascade', + }) + .step({ slug: 'base' }, async (input) => { + await delay(1); + return { premium: input.premium }; + }) + .step( + { + slug: 'premium_feature', + dependsOn: ['base'], + if: { base: { premium: true } }, + whenUnmet: 'skip-cascade', // cascade skip to dependents + }, + async (_deps) => { + stepWasCalled['premium_feature'] = true; + await delay(1); + return { accessed: true }; + } + ) + .step( + { slug: 'depends_on_premium', dependsOn: ['premium_feature'] }, + async (deps) => { + stepWasCalled['depends_on_premium'] = true; + await delay(1); + return { enriched: true, from: deps.premium_feature }; + } + ); + + const worker = startWorker(sql, SkipCascadeFlow, workerConfig); + + try { + await createFlowInDb(sql, SkipCascadeFlow); + + const flowRun = await startFlow(sql, SkipCascadeFlow, { + premium: false, + }); + const polledRun = await waitForRunCompletion(sql, flowRun.run_id); + + assertEquals(polledRun.status, 'completed'); + + const stepStates = await getStepStatesWithSkip(sql, flowRun.run_id); + + const baseState = stepStates.find((s) => s.step_slug === 'base'); + assertEquals(baseState?.status, 'completed'); + + const premiumState = stepStates.find( + (s) => s.step_slug === 'premium_feature' + ); + assertEquals(premiumState?.status, 'skipped'); + assertEquals(premiumState?.skip_reason, 'condition_unmet'); + + const dependentState = stepStates.find( + (s) => s.step_slug === 'depends_on_premium' + ); + assertEquals(dependentState?.status, 'skipped'); + assertEquals(dependentState?.skip_reason, 'dependency_skipped'); + + assert( + dependentState?.skipped_at !== null, + 'skipped_at should be set for dependent step' + ); + + assertEquals( + stepWasCalled['premium_feature'], + false, + 'premium_feature should NOT have been called' + ); + assertEquals( + stepWasCalled['depends_on_premium'], + false, + 'depends_on_premium should NOT have been called' + ); + } finally { + await worker.stop(); + } + }) +); + +// ============================================================================= +// Test 6: Non-cascade skip - downstream step still runs when dependency optional +// ============================================================================= +Deno.test( + 'non-cascade skip - downstream runs with optional dependency skipped', + withPgNoTransaction(async (sql) => { + await sql`select pgflow_tests.reset_db();`; + + const stepWasCalled: Record = { + optional_enrichment: false, + final_step: false, + }; + + const NonCascadeSkipFlow = new Flow<{ premium: boolean }>({ + slug: 'test_non_cascade_skip', + }) + .step({ slug: 'base' }, async (input) => { + await delay(1); + return { + premium: input.premium, + value: input.premium ? 'premium' : 'basic', + }; + }) + .step( + { + slug: 'optional_enrichment', + dependsOn: ['base'], + if: { base: { premium: true } }, + whenUnmet: 'skip', // not cascade - downstream can still run + }, + async (deps) => { + stepWasCalled['optional_enrichment'] = true; + await delay(1); + return { enriched: deps.base }; + } + ) + .step( + { + slug: 'final_step', + dependsOn: ['base', 'optional_enrichment'], + }, + async (deps) => { + stepWasCalled['final_step'] = true; + await delay(1); + return { + base: deps.base, + enrichment: deps.optional_enrichment ?? null, + }; + } + ); + + const worker = startWorker(sql, NonCascadeSkipFlow, workerConfig); + + try { + await createFlowInDb(sql, NonCascadeSkipFlow); + + const flowRun = await startFlow(sql, NonCascadeSkipFlow, { + premium: false, + }); + const polledRun = await waitForRunCompletion(sql, flowRun.run_id); + + assertEquals(polledRun.status, 'completed'); + + const stepStates = await getStepStatesWithSkip(sql, flowRun.run_id); + assertEquals(stepStates.length, 3); + + const baseState = stepStates.find((s) => s.step_slug === 'base'); + assertEquals(baseState?.status, 'completed'); + + const enrichmentState = stepStates.find( + (s) => s.step_slug === 'optional_enrichment' + ); + assertEquals(enrichmentState?.status, 'skipped'); + assertEquals(enrichmentState?.skip_reason, 'condition_unmet'); + + const finalState = stepStates.find((s) => s.step_slug === 'final_step'); + assertEquals(finalState?.status, 'completed'); + assertEquals(finalState?.skip_reason, null); + + // Only leaf steps (steps with no dependents) that completed are included in output + // 'base' is not a leaf, 'optional_enrichment' was skipped, only 'final_step' is a completed leaf + assertEquals(polledRun.output, { + final_step: { + base: { premium: false, value: 'basic' }, + enrichment: null, + }, + }); + + assertEquals( + stepWasCalled['optional_enrichment'], + false, + 'optional_enrichment should NOT have been called' + ); + assertEquals( + stepWasCalled['final_step'], + true, + 'final_step should have been called' + ); + } finally { + await worker.stop(); + } + }) +); + +// ============================================================================= +// Test 7: Condition unmet with whenUnmet='fail' causes run failure +// ============================================================================= +Deno.test( + 'condition unmet with when_unmet=fail causes run failure', + withPgNoTransaction(async (sql) => { + await sql`select pgflow_tests.reset_db();`; + + const stepWasCalled: Record = { + premium_only: false, + after_premium: false, + }; + + const FailOnUnmetFlow = new Flow<{ premium: boolean }>({ + slug: 'test_fail_on_unmet', + }) + .step({ slug: 'base' }, async (input) => { + await delay(1); + return { premium: input.premium }; + }) + .step( + { + slug: 'premium_only', + dependsOn: ['base'], + if: { base: { premium: true } }, + whenUnmet: 'fail', + }, + async (deps) => { + stepWasCalled['premium_only'] = true; + await delay(1); + return { accessed: true, from: deps.base }; + } + ) + .step( + { slug: 'after_premium', dependsOn: ['premium_only'] }, + async (deps) => { + stepWasCalled['after_premium'] = true; + await delay(1); + return { received: deps.premium_only }; + } + ); + + const worker = startWorker(sql, FailOnUnmetFlow, workerConfig); + + try { + await createFlowInDb(sql, FailOnUnmetFlow); + + const flowRun = await startFlow(sql, FailOnUnmetFlow, { + premium: false, + }); + const polledRun = await waitForRunCompletion(sql, flowRun.run_id); + + assertEquals(polledRun.status, 'failed'); + + const stepStates = await getStepStatesWithSkip(sql, flowRun.run_id); + assertEquals(stepStates.length, 3); + + const baseState = stepStates.find((s) => s.step_slug === 'base'); + assertEquals(baseState?.status, 'completed'); + assertEquals(baseState?.skip_reason, null); + + const premiumState = stepStates.find( + (s) => s.step_slug === 'premium_only' + ); + assertEquals(premiumState?.status, 'failed'); + assertEquals(premiumState?.skip_reason, null); + + assertEquals( + stepWasCalled['premium_only'], + false, + 'premium_only should NOT have been called' + ); + assertEquals( + stepWasCalled['after_premium'], + false, + 'after_premium should NOT have been called' + ); + } finally { + await worker.stop(); + } + }) +); diff --git a/pkgs/edge-worker/tests/integration/flow/retriesExhausted.test.ts b/pkgs/edge-worker/tests/integration/flow/retriesExhausted.test.ts new file mode 100644 index 000000000..573db8f61 --- /dev/null +++ b/pkgs/edge-worker/tests/integration/flow/retriesExhausted.test.ts @@ -0,0 +1,440 @@ +import { assert, assertEquals } from '@std/assert'; +import { withPgNoTransaction } from '../../db.ts'; +import { Flow } from '@pgflow/dsl'; +import { delay } from '@std/async'; +import { startFlow, startWorker } from '../_helpers.ts'; +import { waitForRunCompletion } from './_testHelpers.ts'; +import { + createFlowInDb, + getStepStatesWithSkip, + getStepTasksWithError, +} from './_conditionalHelpers.ts'; + +// Common worker config for all tests +const workerConfig = { + maxConcurrent: 1, + batchSize: 10, + maxPollSeconds: 1, + pollIntervalMs: 200, +} as const; + +// ============================================================================= +// Test 1: Handler fails with when_failed='fail' (default) - run fails +// ============================================================================= +Deno.test( + 'retries exhausted with when_failed=fail causes run failure', + withPgNoTransaction(async (sql) => { + await sql`select pgflow_tests.reset_db();`; + + const FailOnErrorFlow = new Flow<{ value: number }>({ + slug: 'test_fail_on_error', + }) + .step({ slug: 'first' }, async (input) => { + await delay(1); + return { doubled: input.value * 2 }; + }) + .step( + { + slug: 'failing_step', + dependsOn: ['first'], + maxAttempts: 1, + // default retriesExhausted: 'fail' + }, + async () => { + await delay(1); + throw new Error('Handler intentionally failed'); + } + ) + .step( + { slug: 'after_fail', dependsOn: ['failing_step'] }, + async (deps) => { + await delay(1); + return { received: deps.failing_step }; + } + ); + + const worker = startWorker(sql, FailOnErrorFlow, workerConfig); + + try { + await createFlowInDb(sql, FailOnErrorFlow); + + const flowRun = await startFlow(sql, FailOnErrorFlow, { value: 21 }); + const polledRun = await waitForRunCompletion(sql, flowRun.run_id); + + assertEquals(polledRun.status, 'failed'); + + const stepStates = await getStepStatesWithSkip(sql, flowRun.run_id); + + const firstState = stepStates.find((s) => s.step_slug === 'first'); + assertEquals(firstState?.status, 'completed'); + + const failingState = stepStates.find( + (s) => s.step_slug === 'failing_step' + ); + assertEquals(failingState?.status, 'failed'); + assertEquals(failingState?.skip_reason, null); + + const tasks = await getStepTasksWithError(sql, flowRun.run_id); + const failingTask = tasks.find((t) => t.step_slug === 'failing_step'); + assertEquals(failingTask?.status, 'failed'); + assert( + failingTask?.error_message?.includes('Handler intentionally failed'), + 'Error message should be preserved' + ); + } finally { + await worker.stop(); + } + }) +); + +// ============================================================================= +// Test 2: Handler fails with when_failed='skip' - step skipped, run completes +// ============================================================================= +Deno.test( + 'retries exhausted with when_failed=skip skips step with handler_failed', + withPgNoTransaction(async (sql) => { + await sql`select pgflow_tests.reset_db();`; + + const SkipOnErrorFlow = new Flow<{ value: number }>({ + slug: 'test_skip_on_error', + }) + .step({ slug: 'first' }, async (input) => { + await delay(1); + return { doubled: input.value * 2 }; + }) + .step( + { + slug: 'optional_step', + dependsOn: ['first'], + maxAttempts: 1, + retriesExhausted: 'skip', + }, + async () => { + await delay(1); + throw new Error('Optional step failed'); + } + ) + .step( + { slug: 'final', dependsOn: ['first', 'optional_step'] }, + async (deps) => { + await delay(1); + return { + first: deps.first, + optional: deps.optional_step ?? null, + }; + } + ); + + const worker = startWorker(sql, SkipOnErrorFlow, workerConfig); + + try { + await createFlowInDb(sql, SkipOnErrorFlow); + + const flowRun = await startFlow(sql, SkipOnErrorFlow, { value: 10 }); + const polledRun = await waitForRunCompletion(sql, flowRun.run_id); + + assertEquals(polledRun.status, 'completed'); + + const stepStates = await getStepStatesWithSkip(sql, flowRun.run_id); + + const optionalState = stepStates.find( + (s) => s.step_slug === 'optional_step' + ); + assertEquals(optionalState?.status, 'skipped'); + assertEquals(optionalState?.skip_reason, 'handler_failed'); + assert(optionalState?.skipped_at !== null, 'skipped_at should be set'); + + const finalState = stepStates.find((s) => s.step_slug === 'final'); + assertEquals(finalState?.status, 'completed'); + + const tasks = await getStepTasksWithError(sql, flowRun.run_id); + const optionalTask = tasks.find((t) => t.step_slug === 'optional_step'); + assert( + optionalTask?.error_message?.includes('Optional step failed'), + 'Error message should be preserved even when step skipped' + ); + } finally { + await worker.stop(); + } + }) +); + +// ============================================================================= +// Test 3: Handler fails with when_failed='skip-cascade' - cascades to dependents +// ============================================================================= +Deno.test( + 'retries exhausted with when_failed=skip-cascade skips dependents', + withPgNoTransaction(async (sql) => { + await sql`select pgflow_tests.reset_db();`; + + const SkipCascadeOnErrorFlow = new Flow<{ value: number }>({ + slug: 'test_skip_cascade_error', + }) + .step({ slug: 'first' }, async (input) => { + await delay(1); + return { value: input.value }; + }) + .step( + { + slug: 'risky_step', + dependsOn: ['first'], + maxAttempts: 1, + retriesExhausted: 'skip-cascade', + }, + async () => { + await delay(1); + throw new Error('Risky operation failed'); + } + ) + .step( + { slug: 'depends_on_risky', dependsOn: ['risky_step'] }, + async (deps) => { + await delay(1); + return { used: deps.risky_step }; + } + ); + + const worker = startWorker(sql, SkipCascadeOnErrorFlow, workerConfig); + + try { + await createFlowInDb(sql, SkipCascadeOnErrorFlow); + + const flowRun = await startFlow(sql, SkipCascadeOnErrorFlow, { + value: 5, + }); + const polledRun = await waitForRunCompletion(sql, flowRun.run_id); + + assertEquals(polledRun.status, 'completed'); + + const stepStates = await getStepStatesWithSkip(sql, flowRun.run_id); + + const riskyState = stepStates.find((s) => s.step_slug === 'risky_step'); + assertEquals(riskyState?.status, 'skipped'); + assertEquals(riskyState?.skip_reason, 'handler_failed'); + assert(riskyState?.skipped_at !== null); + + const dependentState = stepStates.find( + (s) => s.step_slug === 'depends_on_risky' + ); + assertEquals(dependentState?.status, 'skipped'); + assertEquals(dependentState?.skip_reason, 'dependency_skipped'); + assert(dependentState?.skipped_at !== null); + + const tasks = await getStepTasksWithError(sql, flowRun.run_id); + const riskyTask = tasks.find((t) => t.step_slug === 'risky_step'); + assert(riskyTask?.error_message?.includes('Risky operation failed')); + } finally { + await worker.stop(); + } + }) +); + +// ============================================================================= +// Test 4: Multiple retries before exhaustion with skip +// ============================================================================= +Deno.test( + 'step retries multiple times before being skipped', + withPgNoTransaction(async (sql) => { + await sql`select pgflow_tests.reset_db();`; + let attemptCount = 0; + + const RetryThenSkipFlow = new Flow<{ maxAttempts: number }>({ + slug: 'test_retry_then_skip', + }) + .step({ slug: 'init' }, async (input) => { + await delay(1); + return { attempts: input.maxAttempts }; + }) + .step( + { + slug: 'flaky_step', + dependsOn: ['init'], + maxAttempts: 3, + baseDelay: 1, + retriesExhausted: 'skip', + }, + async () => { + attemptCount++; + await delay(1); + throw new Error(`Attempt ${attemptCount} failed`); + } + ) + .step( + { slug: 'after_flaky', dependsOn: ['init', 'flaky_step'] }, + async () => { + await delay(1); + return { completed: true }; + } + ); + + const worker = startWorker(sql, RetryThenSkipFlow, workerConfig); + + try { + await createFlowInDb(sql, RetryThenSkipFlow); + + const flowRun = await startFlow(sql, RetryThenSkipFlow, { + maxAttempts: 3, + }); + const polledRun = await waitForRunCompletion(sql, flowRun.run_id); + + assertEquals(polledRun.status, 'completed'); + + const tasks = await getStepTasksWithError(sql, flowRun.run_id); + const flakyTask = tasks.find((t) => t.step_slug === 'flaky_step'); + assertEquals(flakyTask?.attempts_count, 3, 'Should have made 3 attempts'); + assert(flakyTask?.error_message?.includes('Attempt 3 failed')); + + const stepStates = await getStepStatesWithSkip(sql, flowRun.run_id); + const flakyState = stepStates.find((s) => s.step_slug === 'flaky_step'); + assertEquals(flakyState?.status, 'skipped'); + assertEquals(flakyState?.skip_reason, 'handler_failed'); + } finally { + await worker.stop(); + } + }) +); + +// ============================================================================= +// Test 5: Handler succeeds - no skipping occurs (baseline) +// ============================================================================= +Deno.test( + 'successful handler runs normally even with skip configured', + withPgNoTransaction(async (sql) => { + await sql`select pgflow_tests.reset_db();`; + + const SuccessfulHandlerFlow = new Flow<{ value: number }>({ + slug: 'test_successful_handler', + }) + .step({ slug: 'first' }, async (input) => { + await delay(1); + return { value: input.value }; + }) + .step( + { + slug: 'maybe_skip', + dependsOn: ['first'], + retriesExhausted: 'skip', // configured to skip on failure + }, + async (deps) => { + await delay(1); + return { processed: deps.first.value * 2 }; + } + ) + .step({ slug: 'final', dependsOn: ['maybe_skip'] }, async (deps) => { + await delay(1); + return { result: deps.maybe_skip ?? null }; + }); + + const worker = startWorker(sql, SuccessfulHandlerFlow, workerConfig); + + try { + await createFlowInDb(sql, SuccessfulHandlerFlow); + + const flowRun = await startFlow(sql, SuccessfulHandlerFlow, { value: 5 }); + const polledRun = await waitForRunCompletion(sql, flowRun.run_id); + + assertEquals(polledRun.status, 'completed'); + + const stepStates = await getStepStatesWithSkip(sql, flowRun.run_id); + for (const state of stepStates) { + assertEquals(state.status, 'completed'); + assertEquals(state.skip_reason, null); + assertEquals(state.skipped_at, null); + } + + // Only leaf steps (steps with no dependents) that completed are included in output + assertEquals(polledRun.output, { + final: { result: { processed: 10 } }, + }); + } finally { + await worker.stop(); + } + }) +); + +// ============================================================================= +// Test 6: Combined conditions and failure handling +// ============================================================================= +Deno.test( + 'combined condition and failure skipping work together', + withPgNoTransaction(async (sql) => { + await sql`select pgflow_tests.reset_db();`; + + const CombinedConditionsFlow = new Flow<{ + premium: boolean; + risky: boolean; + }>({ + slug: 'test_combined_conditions', + }) + .step({ slug: 'base' }, async (input) => { + await delay(1); + return { premium: input.premium, risky: input.risky }; + }) + .step( + { + slug: 'conditional_risky', + dependsOn: ['base'], + if: { base: { risky: true } }, + maxAttempts: 1, + retriesExhausted: 'skip', + }, + async () => { + await delay(1); + throw new Error('Risky conditional operation failed'); + } + ) + .step( + { + slug: 'uses_conditional', + dependsOn: ['base', 'conditional_risky'], + }, + async (deps) => { + await delay(1); + return { + base: deps.base, + risky: deps.conditional_risky ?? null, + }; + } + ); + + const worker = startWorker(sql, CombinedConditionsFlow, workerConfig); + + try { + await createFlowInDb(sql, CombinedConditionsFlow); + + const flowRun = await startFlow(sql, CombinedConditionsFlow, { + premium: false, + risky: true, + }); + const polledRun = await waitForRunCompletion(sql, flowRun.run_id); + + assertEquals(polledRun.status, 'completed'); + + const stepStates = await getStepStatesWithSkip(sql, flowRun.run_id); + + const conditionalState = stepStates.find( + (s) => s.step_slug === 'conditional_risky' + ); + assertEquals(conditionalState?.status, 'skipped'); + assertEquals(conditionalState?.skip_reason, 'handler_failed'); + assert(conditionalState?.skipped_at !== null); + + const usesState = stepStates.find( + (s) => s.step_slug === 'uses_conditional' + ); + assertEquals(usesState?.status, 'completed'); + + const tasks = await getStepTasksWithError(sql, flowRun.run_id); + const conditionalTask = tasks.find( + (t) => t.step_slug === 'conditional_risky' + ); + assert( + conditionalTask?.error_message?.includes( + 'Risky conditional operation failed' + ) + ); + } finally { + await worker.stop(); + } + }) +);