Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
428 changes: 214 additions & 214 deletions docs/superpowers/plans/2026-04-02-split-run-plan-pipeline.md

Large diffs are not rendered by default.

36 changes: 18 additions & 18 deletions docs/superpowers/plans/2026-04-03-extract-constraints.md
Original file line number Diff line number Diff line change
Expand Up @@ -594,19 +594,19 @@ git commit -m "Add FilenameEnum entries for extract_constraints stage"
### Task 4: Create the pipeline stage wrapper

**Files:**
- Create: `worker_plan/worker_plan_internal/plan/stages/extract_constraints.py`
- Create: `worker_plan/worker_plan_internal/plan/nodes/extract_constraints.py`

- [ ] **Step 1: Create the stage file**

Create `worker_plan/worker_plan_internal/plan/stages/extract_constraints.py`:
Create `worker_plan/worker_plan_internal/plan/nodes/extract_constraints.py`:

```python
"""Pipeline stage: extract constraints from user prompt."""
from llama_index.core.llms.llm import LLM
from worker_plan_internal.plan.run_plan_pipeline import PlanTask
from worker_plan_internal.diagnostics.extract_constraints import ExtractConstraints
from worker_plan_api.filenames import FilenameEnum
from worker_plan_internal.plan.stages.setup import SetupTask
from worker_plan_internal.plan.nodes.setup import SetupTask


class ExtractConstraintsTask(PlanTask):
Expand Down Expand Up @@ -637,13 +637,13 @@ class ExtractConstraintsTask(PlanTask):

- [ ] **Step 2: Verify syntax**

Run: `cd worker_plan && /opt/homebrew/bin/python3.11 -c "import ast; ast.parse(open('worker_plan_internal/plan/stages/extract_constraints.py').read()); print('OK')"`
Run: `cd worker_plan && /opt/homebrew/bin/python3.11 -c "import ast; ast.parse(open('worker_plan_internal/plan/nodes/extract_constraints.py').read()); print('OK')"`
Expected: `OK`

- [ ] **Step 3: Commit**

```bash
git add worker_plan/worker_plan_internal/plan/stages/extract_constraints.py
git add worker_plan/worker_plan_internal/plan/nodes/extract_constraints.py
git commit -m "Add ExtractConstraintsTask pipeline stage wrapper"
```

Expand All @@ -652,15 +652,15 @@ git commit -m "Add ExtractConstraintsTask pipeline stage wrapper"
### Task 5: Wire into the full pipeline

**Files:**
- Modify: `worker_plan/worker_plan_internal/plan/stages/full_plan_pipeline.py:10` (add import)
- Modify: `worker_plan/worker_plan_internal/plan/stages/full_plan_pipeline.py:90-91` (add to requires dict)
- Modify: `worker_plan/worker_plan_internal/plan/nodes/full_plan_pipeline.py:10` (add import)
- Modify: `worker_plan/worker_plan_internal/plan/nodes/full_plan_pipeline.py:90-91` (add to requires dict)

- [ ] **Step 1: Add the import**

In `worker_plan/worker_plan_internal/plan/stages/full_plan_pipeline.py`, add after line 10 (`from ... import ScreenPlanningPromptTask`):
In `worker_plan/worker_plan_internal/plan/nodes/full_plan_pipeline.py`, add after line 10 (`from ... import ScreenPlanningPromptTask`):

```python
from worker_plan_internal.plan.stages.extract_constraints import ExtractConstraintsTask
from worker_plan_internal.plan.nodes.extract_constraints import ExtractConstraintsTask
```

- [ ] **Step 2: Add to the requires() dict**
Expand All @@ -673,13 +673,13 @@ In the `requires()` method, add after the `'screen_planning_prompt'` entry (line

- [ ] **Step 3: Verify syntax**

Run: `cd worker_plan && /opt/homebrew/bin/python3.11 -c "from worker_plan_internal.plan.stages.full_plan_pipeline import FullPlanPipeline; print('OK')"`
Run: `cd worker_plan && /opt/homebrew/bin/python3.11 -c "from worker_plan_internal.plan.nodes.full_plan_pipeline import FullPlanPipeline; print('OK')"`
Expected: `OK`

- [ ] **Step 4: Commit**

```bash
git add worker_plan/worker_plan_internal/plan/stages/full_plan_pipeline.py
git add worker_plan/worker_plan_internal/plan/nodes/full_plan_pipeline.py
git commit -m "Register extract_constraints in the full plan pipeline"
```

Expand All @@ -688,16 +688,16 @@ git commit -m "Register extract_constraints in the full plan pipeline"
### Task 6: Add to the report

**Files:**
- Modify: `worker_plan/worker_plan_internal/plan/stages/report.py:28` (add import)
- Modify: `worker_plan/worker_plan_internal/plan/stages/report.py:41-42` (add to requires dict)
- Modify: `worker_plan/worker_plan_internal/plan/stages/report.py:90-91` (add append_markdown call)
- Modify: `worker_plan/worker_plan_internal/plan/nodes/report.py:28` (add import)
- Modify: `worker_plan/worker_plan_internal/plan/nodes/report.py:41-42` (add to requires dict)
- Modify: `worker_plan/worker_plan_internal/plan/nodes/report.py:90-91` (add append_markdown call)

- [ ] **Step 1: Add the import**

In `worker_plan/worker_plan_internal/plan/stages/report.py`, add after line 28 (`from ... import ScreenPlanningPromptTask`):
In `worker_plan/worker_plan_internal/plan/nodes/report.py`, add after line 28 (`from ... import ScreenPlanningPromptTask`):

```python
from worker_plan_internal.plan.stages.extract_constraints import ExtractConstraintsTask
from worker_plan_internal.plan.nodes.extract_constraints import ExtractConstraintsTask
```

- [ ] **Step 2: Add to the requires() dict**
Expand All @@ -718,13 +718,13 @@ In `run_inner()`, add after the Self Audit line (line 90, `rg.append_markdown_wi

- [ ] **Step 4: Verify syntax**

Run: `cd worker_plan && /opt/homebrew/bin/python3.11 -c "from worker_plan_internal.plan.stages.report import ReportTask; print('OK')"`
Run: `cd worker_plan && /opt/homebrew/bin/python3.11 -c "from worker_plan_internal.plan.nodes.report import ReportTask; print('OK')"`
Expected: `OK`

- [ ] **Step 5: Commit**

```bash
git add worker_plan/worker_plan_internal/plan/stages/report.py
git add worker_plan/worker_plan_internal/plan/nodes/report.py
git commit -m "Add extract_constraints to the HTML report"
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ To insert a new `ValidateBudgetTask` between `CurrencyStrategyTask` and `Identif

1. Create `stages/validate_budget.py`:
```python
from worker_plan_internal.plan.stages.currency_strategy import CurrencyStrategyTask
from worker_plan_internal.plan.nodes.currency_strategy import CurrencyStrategyTask

class ValidateBudgetTask(PlanTask):
def requires(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Output:

## Pipeline Stage

**Stage file:** `worker_plan_internal/plan/stages/extract_constraints.py`
**Stage file:** `worker_plan_internal/plan/nodes/extract_constraints.py`

- `ExtractConstraintsTask` extends `PlanTask`
- `requires()` — depends on `SetupTask` only (reads the raw user prompt)
Expand Down Expand Up @@ -101,10 +101,10 @@ EXTRACT_CONSTRAINTS_MARKDOWN = "002-0-extract_constraints.md"

**New files:**
- `worker_plan_internal/diagnostics/extract_constraints.py` — Pydantic models, dataclass, system prompt, execute()
- `worker_plan_internal/plan/stages/extract_constraints.py` — PlanTask wrapper
- `worker_plan_internal/plan/nodes/extract_constraints.py` — PlanTask wrapper
- `worker_plan_internal/diagnostics/tests/test_extract_constraints.py` — unit + LLM integration tests

**Modified files:**
- `worker_plan_api/filenames.py` — add FilenameEnum entries
- `worker_plan_internal/plan/stages/full_plan_pipeline.py` — register stage
- `worker_plan_internal/plan/nodes/full_plan_pipeline.py` — register stage
- `worker_plan_internal/report/report_generator.py` — add to report
2 changes: 1 addition & 1 deletion worker_plan/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ consumers.
`rate_limit`, etc.) stored in `usage_metrics.jsonl`. Unknown errors preserve
a truncated `error_detail` field.

## Pipeline Stages (`worker_plan_internal/plan/stages/`)
## Pipeline Stages (`worker_plan_internal/plan/nodes/`)

Each Luigi pipeline task lives in its own file under `stages/`. This enables:
- Multiple agents working on different stages without merge conflicts
Expand Down
8 changes: 4 additions & 4 deletions worker_plan/worker_plan_internal/extract_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
# Module prefixes that are infrastructure/utilities, not implementation logic.
# Imports from these are excluded from source_files auto-detection.
_INFRASTRUCTURE_PREFIXES = (
"worker_plan_internal.plan.stages.",
"worker_plan_internal.plan.nodes.",
"worker_plan_internal.plan.run_plan_pipeline",
"worker_plan_internal.plan.pipeline_environment",
"worker_plan_internal.plan.ping_llm",
Expand Down Expand Up @@ -171,7 +171,7 @@ def extract_dag() -> dict[str, Any]:

Returns a top-level schema object with stages sorted by pipeline order.
"""
from worker_plan_internal.plan.stages.full_plan_pipeline import FullPlanPipeline
from worker_plan_internal.plan.nodes.full_plan_pipeline import FullPlanPipeline

root = FullPlanPipeline(run_id_dir=Path("/tmp/_dag_extract_dummy"))

Expand Down Expand Up @@ -220,7 +220,7 @@ def _walk(task: luigi.Task) -> None:
"schema_version": "1.0",
"pipeline_name": "planning_pipeline",
"description": "DAG for PlanExe, an AI-driven project planning system.",
"stages": stages,
"nodes": stages,
}


Expand All @@ -235,7 +235,7 @@ def main() -> None:

if output_path:
Path(output_path).write_text(dag_json + "\n", encoding="utf-8")
print(f"Wrote {len(dag['stages'])} stages to {output_path}", file=sys.stderr)
print(f"Wrote {len(dag['nodes'])} stages to {output_path}", file=sys.stderr)
else:
print(dag_json)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
from worker_plan_internal.lever.candidate_scenarios import CandidateScenarios
from worker_plan_api.filenames import FilenameEnum
from worker_plan_internal.llm_util.llm_executor import LLMExecutor
from worker_plan_internal.plan.stages.setup import SetupTask
from worker_plan_internal.plan.stages.identify_purpose import IdentifyPurposeTask
from worker_plan_internal.plan.stages.plan_type import PlanTypeTask
from worker_plan_internal.plan.stages.focus_on_vital_few_levers import FocusOnVitalFewLeversTask
from worker_plan_internal.plan.nodes.setup import SetupTask
from worker_plan_internal.plan.nodes.identify_purpose import IdentifyPurposeTask
from worker_plan_internal.plan.nodes.plan_type import PlanTypeTask
from worker_plan_internal.plan.nodes.focus_on_vital_few_levers import FocusOnVitalFewLeversTask


class CandidateScenariosTask(PlanTask):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
from worker_plan_internal.assume.shorten_markdown import ShortenMarkdown
from worker_plan_api.filenames import FilenameEnum
from worker_plan_internal.llm_util.llm_executor import LLMExecutor, PipelineStopRequested
from worker_plan_internal.plan.stages.identify_purpose import IdentifyPurposeTask
from worker_plan_internal.plan.stages.plan_type import PlanTypeTask
from worker_plan_internal.plan.stages.physical_locations import PhysicalLocationsTask
from worker_plan_internal.plan.stages.currency_strategy import CurrencyStrategyTask
from worker_plan_internal.plan.stages.identify_risks import IdentifyRisksTask
from worker_plan_internal.plan.stages.make_assumptions import MakeAssumptionsTask
from worker_plan_internal.plan.stages.distill_assumptions import DistillAssumptionsTask
from worker_plan_internal.plan.stages.review_assumptions import ReviewAssumptionsTask
from worker_plan_internal.plan.nodes.identify_purpose import IdentifyPurposeTask
from worker_plan_internal.plan.nodes.plan_type import PlanTypeTask
from worker_plan_internal.plan.nodes.physical_locations import PhysicalLocationsTask
from worker_plan_internal.plan.nodes.currency_strategy import CurrencyStrategyTask
from worker_plan_internal.plan.nodes.identify_risks import IdentifyRisksTask
from worker_plan_internal.plan.nodes.make_assumptions import MakeAssumptionsTask
from worker_plan_internal.plan.nodes.distill_assumptions import DistillAssumptionsTask
from worker_plan_internal.plan.nodes.review_assumptions import ReviewAssumptionsTask

logger = logging.getLogger(__name__)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
"""ConsolidateGovernanceTask - Combines all governance phase markdown documents."""
from worker_plan_internal.plan.run_plan_pipeline import PlanTask
from worker_plan_api.filenames import FilenameEnum
from worker_plan_internal.plan.stages.governance_phase1_audit import GovernancePhase1AuditTask
from worker_plan_internal.plan.stages.governance_phase2_bodies import GovernancePhase2BodiesTask
from worker_plan_internal.plan.stages.governance_phase3_impl_plan import GovernancePhase3ImplPlanTask
from worker_plan_internal.plan.stages.governance_phase4_decision_escalation_matrix import GovernancePhase4DecisionEscalationMatrixTask
from worker_plan_internal.plan.stages.governance_phase5_monitoring_progress import GovernancePhase5MonitoringProgressTask
from worker_plan_internal.plan.stages.governance_phase6_extra import GovernancePhase6ExtraTask
from worker_plan_internal.plan.nodes.governance_phase1_audit import GovernancePhase1AuditTask
from worker_plan_internal.plan.nodes.governance_phase2_bodies import GovernancePhase2BodiesTask
from worker_plan_internal.plan.nodes.governance_phase3_impl_plan import GovernancePhase3ImplPlanTask
from worker_plan_internal.plan.nodes.governance_phase4_decision_escalation_matrix import GovernancePhase4DecisionEscalationMatrixTask
from worker_plan_internal.plan.nodes.governance_phase5_monitoring_progress import GovernancePhase5MonitoringProgressTask
from worker_plan_internal.plan.nodes.governance_phase6_extra import GovernancePhase6ExtraTask


class ConsolidateGovernanceTask(PlanTask):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
from worker_plan_internal.plan.run_plan_pipeline import PlanTask
from worker_plan_internal.diagnostics.constraint_checker import ConstraintChecker
from worker_plan_api.filenames import FilenameEnum
from worker_plan_internal.plan.stages.extract_constraints import ExtractConstraintsTask
from worker_plan_internal.plan.stages.potential_levers import PotentialLeversTask
from worker_plan_internal.plan.stages.deduplicate_levers import DeduplicateLeversTask
from worker_plan_internal.plan.stages.enrich_levers import EnrichLeversTask
from worker_plan_internal.plan.stages.focus_on_vital_few_levers import FocusOnVitalFewLeversTask
from worker_plan_internal.plan.stages.candidate_scenarios import CandidateScenariosTask
from worker_plan_internal.plan.stages.select_scenario import SelectScenarioTask
from worker_plan_internal.plan.nodes.extract_constraints import ExtractConstraintsTask
from worker_plan_internal.plan.nodes.potential_levers import PotentialLeversTask
from worker_plan_internal.plan.nodes.deduplicate_levers import DeduplicateLeversTask
from worker_plan_internal.plan.nodes.enrich_levers import EnrichLeversTask
from worker_plan_internal.plan.nodes.focus_on_vital_few_levers import FocusOnVitalFewLeversTask
from worker_plan_internal.plan.nodes.candidate_scenarios import CandidateScenariosTask
from worker_plan_internal.plan.nodes.select_scenario import SelectScenarioTask


def _read_constraints_json(task: PlanTask) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from worker_plan_internal.pitch.convert_pitch_to_markdown import ConvertPitchToMarkdown
from worker_plan_internal.format_json_for_use_in_query import format_json_for_use_in_query
from worker_plan_api.filenames import FilenameEnum
from worker_plan_internal.plan.stages.create_pitch import CreatePitchTask
from worker_plan_internal.plan.nodes.create_pitch import CreatePitchTask


class ConvertPitchToMarkdownTask(PlanTask):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
from worker_plan_internal.wbs.wbs_task import WBSProject
from worker_plan_internal.format_json_for_use_in_query import format_json_for_use_in_query
from worker_plan_api.filenames import FilenameEnum
from worker_plan_internal.plan.stages.strategic_decisions_markdown import StrategicDecisionsMarkdownTask
from worker_plan_internal.plan.stages.scenarios_markdown import ScenariosMarkdownTask
from worker_plan_internal.plan.stages.project_plan import ProjectPlanTask
from worker_plan_internal.plan.stages.wbs_project_level1_and_level2 import WBSProjectLevel1AndLevel2Task
from worker_plan_internal.plan.stages.related_resources import RelatedResourcesTask
from worker_plan_internal.plan.nodes.strategic_decisions_markdown import StrategicDecisionsMarkdownTask
from worker_plan_internal.plan.nodes.scenarios_markdown import ScenariosMarkdownTask
from worker_plan_internal.plan.nodes.project_plan import ProjectPlanTask
from worker_plan_internal.plan.nodes.wbs_project_level1_and_level2 import WBSProjectLevel1AndLevel2Task
from worker_plan_internal.plan.nodes.related_resources import RelatedResourcesTask

logger = logging.getLogger(__name__)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
from worker_plan_internal.wbs.wbs_task_tooltip import WBSTaskTooltip
from worker_plan_internal.plan.pipeline_config import PIPELINE_CONFIG
from worker_plan_api.filenames import FilenameEnum
from worker_plan_internal.plan.stages.start_time import StartTimeTask
from worker_plan_internal.plan.stages.create_wbs_level1 import CreateWBSLevel1Task
from worker_plan_internal.plan.stages.identify_task_dependencies import IdentifyTaskDependenciesTask
from worker_plan_internal.plan.stages.estimate_task_durations import EstimateTaskDurationsTask
from worker_plan_internal.plan.stages.wbs_project_level1_level2_level3 import WBSProjectLevel1AndLevel2AndLevel3Task
from worker_plan_internal.plan.nodes.start_time import StartTimeTask
from worker_plan_internal.plan.nodes.create_wbs_level1 import CreateWBSLevel1Task
from worker_plan_internal.plan.nodes.identify_task_dependencies import IdentifyTaskDependenciesTask
from worker_plan_internal.plan.nodes.estimate_task_durations import EstimateTaskDurationsTask
from worker_plan_internal.plan.nodes.wbs_project_level1_level2_level3 import WBSProjectLevel1AndLevel2AndLevel3Task


class CreateScheduleTask(PlanTask):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from worker_plan_internal.plan.create_wbs_level1 import CreateWBSLevel1
from worker_plan_internal.format_json_for_use_in_query import format_json_for_use_in_query
from worker_plan_api.filenames import FilenameEnum
from worker_plan_internal.plan.stages.project_plan import ProjectPlanTask
from worker_plan_internal.plan.nodes.project_plan import ProjectPlanTask

logger = logging.getLogger(__name__)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
from worker_plan_internal.plan.create_wbs_level2 import CreateWBSLevel2
from worker_plan_internal.format_json_for_use_in_query import format_json_for_use_in_query
from worker_plan_api.filenames import FilenameEnum
from worker_plan_internal.plan.stages.strategic_decisions_markdown import StrategicDecisionsMarkdownTask
from worker_plan_internal.plan.stages.scenarios_markdown import ScenariosMarkdownTask
from worker_plan_internal.plan.stages.project_plan import ProjectPlanTask
from worker_plan_internal.plan.stages.create_wbs_level1 import CreateWBSLevel1Task
from worker_plan_internal.plan.stages.data_collection import DataCollectionTask
from worker_plan_internal.plan.nodes.strategic_decisions_markdown import StrategicDecisionsMarkdownTask
from worker_plan_internal.plan.nodes.scenarios_markdown import ScenariosMarkdownTask
from worker_plan_internal.plan.nodes.project_plan import ProjectPlanTask
from worker_plan_internal.plan.nodes.create_wbs_level1 import CreateWBSLevel1Task
from worker_plan_internal.plan.nodes.data_collection import DataCollectionTask

logger = logging.getLogger(__name__)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
from worker_plan_api.speedvsdetail import SpeedVsDetailEnum
from worker_plan_internal.format_json_for_use_in_query import format_json_for_use_in_query
from worker_plan_api.filenames import FilenameEnum
from worker_plan_internal.plan.stages.project_plan import ProjectPlanTask
from worker_plan_internal.plan.stages.wbs_project_level1_and_level2 import WBSProjectLevel1AndLevel2Task
from worker_plan_internal.plan.stages.estimate_task_durations import EstimateTaskDurationsTask
from worker_plan_internal.plan.stages.data_collection import DataCollectionTask
from worker_plan_internal.plan.nodes.project_plan import ProjectPlanTask
from worker_plan_internal.plan.nodes.wbs_project_level1_and_level2 import WBSProjectLevel1AndLevel2Task
from worker_plan_internal.plan.nodes.estimate_task_durations import EstimateTaskDurationsTask
from worker_plan_internal.plan.nodes.data_collection import DataCollectionTask

logger = logging.getLogger(__name__)

Expand Down
Loading