You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Adds stage-level resume support for chained workflows so compatible completed stages can be reused, matching partial stages can continue through the existing single-stage resume path, and downstream stages rerun when upstream outputs change.
🔗 Related Issue
N/A
🔄 Changes
Add CompositeWorkflow.run(resume=...) with completed-stage reuse and partial-stage delegation.
Invalidate downstream stages when an upstream stage reruns, changes, or has missing selected/callback output.
Harden workflow metadata writes and let ResumeMode.IF_POSSIBLE fall back to fresh runs when prior metadata is unusable.
Add public workflow resume tests for skip, rerun, partial/failed-stage resume, callback-output, output-processor, completed-empty, corrupt metadata, and strict resume behavior.
Document workflow resume behavior in MkDocs and Fern docs.
Update the workflow chaining plan with the completed stage-level resume slice.
.venv/bin/ruff format --check packages/data-designer/src/data_designer/interface/composite_workflow.py packages/data-designer/tests/interface/test_composite_workflow.py
.venv/bin/pytest /home/ubuntu/Code/reviews/DataDesigner-747/smoke_test.py -q -s - 2 passed against NVIDIA Build (nvidia/nemotron-3-nano-30b-a3b) and NVIDIA Inference (openai/openai/gpt-5.4-nano) using /home/ubuntu/Code/.env
Note: full .venv/bin/ruff check --fix . currently hits an unrelated existing generated-notebook lint in docs/colab_notebooks/7-nemotron-personas.ipynb (F404).
Fern previews include the docs-website version archive with PR changes synced into latest. Notebook tutorials are rendered without execution outputs in previews.
This PR adds stage-level resume support to CompositeWorkflow.run(resume=...). Completed stages with matching fingerprints are reused, partially-run stages delegate to the single-stage resume path, and any stage that runs marks all descendants for forced rerun via force_rerun_downstream.
Core logic (composite_workflow.py): introduces _read_prior_workflow_metadata, _can_skip_prior_stage, _get_prior_stage_metadata, _stage_result_from_metadata, and supporting helpers for path normalisation; hardens metadata writes with a pid/uuid-namespaced temp file and os.replace for atomicity.
Tests (test_composite_workflow.py): adds 12 new parametrised tests covering skip, changed-stage rerun, partial/failed-stage resume, missing callback output, moved artifacts, corrupt metadata, and strict-resume rejection.
Docs (workflow-chaining.mdx, workflow-chaining.md): adds a ## Resume section and updates the current-limits list and plan status.
Confidence Score: 5/5
Safe to merge. The resume logic is well-encapsulated, the existing fresh-run path is unchanged, and the new code is gated entirely behind the new resume parameter which defaults to ResumeMode.NEVER.
The fingerprint comparison, downstream invalidation via force_rerun_downstream, callback-output validation in _can_skip_prior_stage, and the ALWAYS/IF_POSSIBLE branching all behave as intended across the 12 new tests. Path normalisation to relative values is handled symmetrically in write and read helpers. The atomic metadata write is correct. No logic errors were found.
Docs added in both mkdocs and Fern; phase-3 plan updated.
Diff is well-scoped (5 files, +503/−10) and lives entirely in the interface layer — no engine/config touch, no import-direction violations.
Findings
Correctness — Medium
output_seed_path is persisted and reused as an absolute path (composite_workflow.py:400, consumed at composite_workflow.py:309). If a user moves their artifact root (rsync, archive, container mount), prior_metadata still matches by name/stage_dir, but output_seed_path points at the old location while _stage_result_from_metadata builds an ArtifactStorage rooted at the newworkflow_path. The result is a mixed-state read (old data, new storage object) or a _count_parquet_records raise — silently invalidating reuse in the best case, confusing behavior in the worst. Storing this as a path relative to workflow_path would be more robust. (Same applies to callback_output_path.)
ResumeMode.ALWAYS semantics drift once any stage is re-run. The added doc says: "If a stage changed or its selected output is missing, the workflow raises instead of starting fresh." But once an ALWAYS resume successfully resumes a partial upstream stage from checkpoint (stage_resume == ALWAYS, line 328), force_rerun_downstream flips to True and the downstream elif resume == ResumeMode.ALWAYS and not force_rerun_downstream: guard (line 330) is bypassed — so a downstream stage whose fingerprint also differs from prior metadata silently runs fresh instead of raising. That may be the intended behavior, but it deviates from the doc and from the strict reading of the test_composite_workflow_resume_always_rejects_changed_stage contract. Consider tightening the doc and adding a test for "ALWAYS, upstream resumed-from-checkpoint, downstream fingerprint differs".
Correctness — Low
_load_stage_analysis swallows every exception (composite_workflow.py:552):
Bare except Exception masks both ValidationError (the only one you'd plausibly recover from) and unrelated bugs introduced by future schema changes. Narrow it to pydantic.ValidationError so a real bug doesn't decay into a silent analysis=None.
stage_metadata.update(prior_stage_metadata) on the skip path (line 308) wholesale-imports every key the prior run wrote, including ones not produced by the current run (config, seed_path, seeded_from_stage, num_records_requested, duration_sec). That's by design here, but means the new metadata file's seeded_from_stage for a re-skipped stage may name a stage from the prior run that no longer exists in the current workflow definition. Fine for inspection; worth a note if later phases lean on those fields.
_stage_result_from_metadata returns DatasetMetadata() — empty (line 528). The original run may have collected real DatasetMetadata. Reusing the cached stage exposes a stripped-down DatasetCreationResults to user code. Document or persist+rehydrate.
Style / Nits
Unused import in TYPE_CHECKING removal: DatasetProfilerResults was lifted out of TYPE_CHECKING (line 18) because it's now used at module scope by _load_stage_analysis. That's correct, but DatasetProfilerResults is heavy; the project lazy-loads pandas already (lazy.pd). Confirm that data_designer.config.analysis.dataset_profiler doesn't drag in numpy/pandas at import — if it does, this regresses the import-time profile (make perf-import would catch it).
tmp_path = path.with_name(f"{path.name}.tmp.{os.getpid()}") (line 706): if two threads in the same process both write workflow metadata for the same workflow (uncommon but possible during async cleanup), they collide on the temp filename. Adding a uuid suffix would be safer; PID alone is not unique within a process. Probably out of scope.
force_rerun_downstream is largely redundant with the upstream-fingerprint chain (since each stage's fingerprint folds in upstream_fingerprint). Its real job is to short-circuit the ResumeMode.ALWAYS raise on line 330. A short comment to that effect would help future readers — right now the flag's necessity is non-obvious.
test_composite_workflow_resume_if_possible_skips_stage_with_output_processors checks main-batch mtime; consider also asserting the output-processors directory wasn't touched (its mtime is the actual cache-hit signal for the output-processor work).
Worth a regression test for prior metadata whose stages length exceeds the current workflow's stage list — _get_prior_stage_metadata handles index >= len(stages), but the inverse case (current workflow has fewer stages than prior) isn't explicitly covered.
Security
No new attack surface. Metadata is JSON-validated on read and treated as data. The os.fsync + os.replace pattern is the right defense against torn writes from crashes mid-run.
Performance
One extra _count_parquet_records call per skippable stage (validation in _can_skip_prior_stage + the actual count in run). Cheap (parquet metadata, not row read), but doubling it is unnecessary — could cache the count or skip the validation since the count immediately follows.
_stage_result_from_metadata instantiates ArtifactStorage with resume=ResumeMode.ALWAYS, which triggers the resolved_dataset_name check. Fine for hits; on misses you'll get an ArtifactStorageError from the validator instead of going through DataDesignerWorkflowError. Consider catching and re-wrapping at the boundary so callers see a single error type per the project's "Errors normalize at boundaries" invariant.
Docs
docs/concepts/workflow-chaining.md and the Fern mirror are updated symmetrically — good.
Phase-3 plan update accurately describes the slice and remaining deferred items.
One precision nit: the ResumeMode.ALWAYS blurb should clarify the "first changed stage raises; downstream of a checkpoint-resumed stage runs fresh" behavior (or close the gap).
Verdict
Solid, well-tested addition. The core logic is correct and the new behavior is opt-in through resume=, so risk to existing callers is minimal. Address #1 (relative paths) before this is relied on for portable artifacts; #2 (ALWAYS doc/test) and #3 (narrow the except) are easy follow-ups. Everything else is taste.
Recommendation: approve with the path-portability and except Exception narrowing addressed (or filed as follow-ups). Other findings are non-blocking polish.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
📋 Summary
Adds stage-level resume support for chained workflows so compatible completed stages can be reused, matching partial stages can continue through the existing single-stage resume path, and downstream stages rerun when upstream outputs change.
🔗 Related Issue
N/A
🔄 Changes
CompositeWorkflow.run(resume=...)with completed-stage reuse and partial-stage delegation.ResumeMode.IF_POSSIBLEfall back to fresh runs when prior metadata is unusable.🧪 Testing
make testpassesRan:
.venv/bin/ruff format ..venv/bin/ruff check packages/data-designer/src/data_designer/interface/composite_workflow.py packages/data-designer/tests/interface/test_composite_workflow.py.venv/bin/ruff format --check packages/data-designer/src/data_designer/interface/composite_workflow.py packages/data-designer/tests/interface/test_composite_workflow.py.venv/bin/pytest packages/data-designer/tests/interface/test_composite_workflow.py -q- 55 passed, 2 warnings.venv/bin/pytest /home/ubuntu/Code/reviews/DataDesigner-747/smoke_test.py -q -s- 2 passed against NVIDIA Build (nvidia/nemotron-3-nano-30b-a3b) and NVIDIA Inference (openai/openai/gpt-5.4-nano) using/home/ubuntu/Code/.envNote: full
.venv/bin/ruff check --fix .currently hits an unrelated existing generated-notebook lint indocs/colab_notebooks/7-nemotron-personas.ipynb(F404).✅ Checklist