Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
2eb4963
feat: add DAG extractor that introspects Luigi task graph
neoneye Apr 7, 2026
3be325a
feat: add source_files to DAG extractor via PlanTask.source_files() +…
neoneye Apr 7, 2026
f8ccc22
refactor: move source_files logic from PlanTask into extract_dag.py
neoneye Apr 7, 2026
5e04c63
fix: narrow infrastructure prefix filter so plan/ implementation file…
neoneye Apr 7, 2026
c8816d7
feat: add description field to DAG extractor
neoneye Apr 7, 2026
195a993
fix: improve extract_constraints description
neoneye Apr 7, 2026
f64a256
fix: improve screen_planning_prompt description
neoneye Apr 7, 2026
4d10a71
fix: improve redline_gate description
neoneye Apr 7, 2026
fb5f0ee
fix: improve premise_attack description
neoneye Apr 7, 2026
c409adb
fix: improve potential_levers description
neoneye Apr 7, 2026
8225d7c
fix: improve deduplicate_levers description
neoneye Apr 7, 2026
aa1156a
fix: improve enrich_levers description
neoneye Apr 7, 2026
8753952
fix: improve descriptions for 5 unclear task docstrings
neoneye Apr 7, 2026
c629213
fix: improve descriptions for 5 more unclear task docstrings
neoneye Apr 7, 2026
328981b
fix: improve descriptions for 5 more task docstrings
neoneye Apr 7, 2026
2252052
fix: improve descriptions for 10 more task docstrings
neoneye Apr 7, 2026
e3bbfc9
fix: polish final 10 task descriptions
neoneye Apr 7, 2026
f7b80b2
fix: final polish on 10 task descriptions
neoneye Apr 7, 2026
41c8d48
fix: differentiate the 6 constraint checker descriptions
neoneye Apr 7, 2026
72874bd
refactor: rename upstream_stages to depends_on in DAG JSON output
neoneye Apr 7, 2026
1101fd4
feat: wrap DAG output in a top-level schema object
neoneye Apr 7, 2026
ef9213e
refactor: rename "name" to "id" in DAG JSON stage objects
neoneye Apr 7, 2026
26f8b3b
fix: mention PlanExe in pipeline description
neoneye Apr 7, 2026
9b6044b
fix: update pipeline description to mention DAG and AI-driven planning
neoneye Apr 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 244 additions & 0 deletions worker_plan/worker_plan_internal/extract_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
"""Extract the pipeline DAG from Luigi task introspection.

Walks the FullPlanPipeline task graph via requires()/output() and produces
a JSON description of every stage: name, output files, upstream stages,
and source code files. This replaces the hand-maintained registry with a
generated artifact that stays in sync with the actual pipeline code.

Usage:
cd worker_plan
python -m worker_plan_internal.extract_dag
python -m worker_plan_internal.extract_dag --output pipeline_dag.json
"""
import inspect
import json
import re
import sys
from pathlib import Path
from typing import Any

import luigi

_WORKER_PLAN_DIR = Path(__file__).resolve().parent.parent # worker_plan/

# Module prefixes that are infrastructure/utilities, not implementation logic.
# Imports from these are excluded from source_files auto-detection.
_INFRASTRUCTURE_PREFIXES = (
"worker_plan_internal.plan.stages.",
"worker_plan_internal.plan.run_plan_pipeline",
"worker_plan_internal.plan.pipeline_environment",
"worker_plan_internal.plan.ping_llm",
"worker_plan_internal.llm_util.",
"worker_plan_internal.llm_factory",
"worker_plan_internal.luigi_util.",
"worker_plan_internal.utils.",
"worker_plan_internal.format_",
"worker_plan_api.",
)


def _class_name_to_stage_name(class_name: str) -> str:
"""Convert CamelCase task class name to snake_case stage name.

Removes the 'Task' suffix, then converts CamelCase → snake_case.

Examples:
PotentialLeversTask → potential_levers
SWOTAnalysisTask → swot_analysis
WBSProjectLevel1AndLevel2Task → wbs_project_level1_and_level2
GovernancePhase1AuditTask → governance_phase1_audit
"""
name = class_name.removesuffix("Task")
# Insert underscore between lowercase/digit and uppercase
name = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", name)
# Insert underscore between consecutive uppercase run and uppercase+lowercase
name = re.sub(r"([A-Z]+)([A-Z][a-z])", r"\1_\2", name)
return name.lower()


def _extract_output_filenames(task: luigi.Task) -> list[str]:
"""Extract output filenames (basenames) from a task's output() method."""
try:
outputs = task.output()
except Exception:
return []

targets: list[Any] = []
if isinstance(outputs, dict):
targets = list(outputs.values())
elif isinstance(outputs, (list, tuple)):
targets = list(outputs)
else:
targets = [outputs]

filenames: list[str] = []
for target in targets:
if hasattr(target, "path"):
filenames.append(Path(target.path).name)
return filenames


def _extract_upstream_tasks(task: luigi.Task) -> list[luigi.Task]:
"""Extract upstream task instances from a task's requires() method."""
try:
deps = task.requires()
except Exception:
return []

if deps is None:
return []
if isinstance(deps, dict):
return list(deps.values())
if isinstance(deps, (list, tuple)):
return list(deps)
if isinstance(deps, luigi.Task):
return [deps]
return []


def _detect_implementation_files(cls: type) -> list[str]:
"""Auto-detect implementation source files from module-level imports.

Scans the module that defines *cls* for classes and functions imported
from ``worker_plan_internal.*`` that are NOT infrastructure (stages,
LLM utilities, API types, etc.). Returns paths relative to worker_plan/.
"""
module = inspect.getmodule(cls)
if module is None:
return []

files: list[str] = []
seen_modules: set[str] = set()

for attr_name in dir(module):
obj = getattr(module, attr_name, None)
if obj is None or not (inspect.isclass(obj) or inspect.isfunction(obj)):
continue

obj_module_name = getattr(obj, "__module__", "") or ""
if not obj_module_name.startswith("worker_plan_internal."):
continue
if any(obj_module_name.startswith(p) for p in _INFRASTRUCTURE_PREFIXES):
continue
if obj_module_name in seen_modules:
continue
seen_modules.add(obj_module_name)

try:
obj_file = Path(inspect.getfile(obj)).resolve()
rel = str(obj_file.relative_to(_WORKER_PLAN_DIR))
if rel not in files:
files.append(rel)
except (TypeError, ValueError, OSError):
continue

return files


def _extract_source_files(task: luigi.Task) -> list[str]:
"""Get source files: task's own file + auto-detected implementation files."""
cls = type(task)

# The task's own file
result: list[str] = []
try:
task_file = Path(inspect.getfile(cls)).resolve()
result.append(str(task_file.relative_to(_WORKER_PLAN_DIR)))
except (TypeError, ValueError, OSError):
pass

# Supplement with auto-detected implementation files
for f in _detect_implementation_files(cls):
if f not in result:
result.append(f)

return result


def _output_sort_key(stage: dict[str, Any]) -> tuple[int, int, str]:
"""Sort key: numeric prefix from the first output filename, then name."""
filename = stage["output_files"][0] if stage.get("output_files") else ""
match = re.match(r"(\d+)-?(\d+)?", filename)
if match:
major = int(match.group(1))
minor = int(match.group(2)) if match.group(2) else 0
return (major, minor, stage["id"])
return (9999, 0, stage["id"])


def extract_dag() -> dict[str, Any]:
"""Walk the FullPlanPipeline task graph and extract DAG info.

Returns a top-level schema object with stages sorted by pipeline order.
"""
from worker_plan_internal.plan.stages.full_plan_pipeline import FullPlanPipeline

root = FullPlanPipeline(run_id_dir=Path("/tmp/_dag_extract_dummy"))

stages: list[dict[str, Any]] = []
visited: set[str] = set()

def _walk(task: luigi.Task) -> None:
class_name = task.__class__.__name__
if class_name in visited:
return
visited.add(class_name)

upstream_tasks = _extract_upstream_tasks(task)

# Recurse into dependencies first (depth-first)
for dep in upstream_tasks:
_walk(dep)

# Skip the orchestrator itself
if class_name == "FullPlanPipeline":
return

cls = type(task)
stage_name = _class_name_to_stage_name(class_name)
description = cls.description() if hasattr(cls, "description") else ""
output_files = _extract_output_filenames(task)
source_files = _extract_source_files(task)
depends_on_names = sorted(set(
_class_name_to_stage_name(dep.__class__.__name__)
for dep in upstream_tasks
))

stages.append({
"id": stage_name,
"description": description,
"output_files": output_files,
"depends_on": depends_on_names,
"source_files": source_files,
})

_walk(root)

stages.sort(key=_output_sort_key)

return {
"schema_version": "1.0",
"pipeline_name": "planning_pipeline",
"description": "DAG for PlanExe, an AI-driven project planning system.",
"stages": stages,
}


def main() -> None:
output_path = None
args = sys.argv[1:]
if len(args) >= 2 and args[0] == "--output":
output_path = args[1]

dag = extract_dag()
dag_json = json.dumps(dag, indent=2, ensure_ascii=False)

if output_path:
Path(output_path).write_text(dag_json + "\n", encoding="utf-8")
print(f"Wrote {len(dag['stages'])} stages to {output_path}", file=sys.stderr)
else:
print(dag_json)


if __name__ == "__main__":
main()
12 changes: 12 additions & 0 deletions worker_plan/worker_plan_internal/plan/run_plan_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ class PlanTask(luigi.Task):
# If the callback is not provided, the pipeline will run until completion.
_pipeline_executor_callback = luigi.Parameter(default=None, significant=False, visibility=luigi.parameter.ParameterVisibility.PRIVATE)

@classmethod
def description(cls) -> str:
"""Brief description of what this task does.

Default returns the first line of the class docstring.
Override in subclasses for a custom description.
"""
doc = cls.__doc__
if doc:
return doc.strip().split("\n")[0].strip()
return ""

def file_path(self, filename: FilenameEnum) -> Path:
return self.run_id_dir / filename.value

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@


class CandidateScenariosTask(PlanTask):
"""
Combinations of the vital few levers.
"""
"""Generate aggressive, moderate, and conservative scenarios from the vital few levers."""
def requires(self):
return {
'setup': self.clone(SetupTask),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@


class ConsolidateAssumptionsMarkdownTask(PlanTask):
"""
Combines multiple small markdown documents into a single big document.
"""
"""Merge locations, currency, risks, and assumption stages into one reference document."""
def requires(self):
return {
'identify_purpose': self.clone(IdentifyPurposeTask),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@


class ConsolidateGovernanceTask(PlanTask):
"""Consolidate all governance phases into a single markdown document."""

def requires(self):
return {
'governance_phase1_audit': self.clone(GovernancePhase1AuditTask),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def _read_constraints_json(task: PlanTask) -> str:


class PotentialLeversConstraintTask(PlanTask):
"""Check potential levers output for constraint violations."""
"""Guardrail: verify brainstormed levers respect the user's constraints."""
def requires(self):
return {
'extract_constraints': self.clone(ExtractConstraintsTask),
Expand All @@ -49,7 +49,7 @@ def run_with_llm(self, llm: LLM) -> None:


class DeduplicatedLeversConstraintTask(PlanTask):
"""Check deduplicated levers output for constraint violations."""
"""Guardrail: verify triaged levers still respect the user's constraints."""
def requires(self):
return {
'extract_constraints': self.clone(ExtractConstraintsTask),
Expand All @@ -68,7 +68,7 @@ def run_with_llm(self, llm: LLM) -> None:


class EnrichedLeversConstraintTask(PlanTask):
"""Check enriched levers output for constraint violations."""
"""Guardrail: verify enriched levers still respect the user's constraints."""
def requires(self):
return {
'extract_constraints': self.clone(ExtractConstraintsTask),
Expand All @@ -87,7 +87,7 @@ def run_with_llm(self, llm: LLM) -> None:


class VitalFewLeversConstraintTask(PlanTask):
"""Check vital few levers output for constraint violations."""
"""Guardrail: verify the selected vital levers respect the user's constraints."""
def requires(self):
return {
'extract_constraints': self.clone(ExtractConstraintsTask),
Expand All @@ -106,7 +106,7 @@ def run_with_llm(self, llm: LLM) -> None:


class CandidateScenariosConstraintTask(PlanTask):
"""Check candidate scenarios output for constraint violations."""
"""Guardrail: verify generated scenarios respect the user's constraints."""
def requires(self):
return {
'extract_constraints': self.clone(ExtractConstraintsTask),
Expand All @@ -125,7 +125,7 @@ def run_with_llm(self, llm: LLM) -> None:


class SelectedScenarioConstraintTask(PlanTask):
"""Check selected scenario output for constraint violations."""
"""Guardrail: verify the chosen scenario respects the user's constraints before planning begins."""
def requires(self):
return {
'extract_constraints': self.clone(ExtractConstraintsTask),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,7 @@


class ConvertPitchToMarkdownTask(PlanTask):
"""
Human readable version of the pitch.

This task depends on:
- CreatePitchTask: Creates the pitch JSON.
"""
"""Convert the raw pitch JSON into a polished, scannable markdown document."""
def output(self):
return {
'raw': self.local_target(FilenameEnum.PITCH_CONVERT_TO_MARKDOWN_RAW),
Expand Down
10 changes: 1 addition & 9 deletions worker_plan/worker_plan_internal/plan/stages/create_pitch.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,7 @@


class CreatePitchTask(PlanTask):
"""
Create a the pitch that explains the project plan, from multiple perspectives.

This task depends on:
- ProjectPlanTask: provides the project plan JSON.
- WBSProjectLevel1AndLevel2Task: containing the top level of the project plan.

The resulting pitch JSON is written to the file specified by FilenameEnum.PITCH.
"""
"""Create a compelling project pitch with target audience, call to action, and risk mitigation."""
def output(self):
return self.local_target(FilenameEnum.PITCH_RAW)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@


class CreateScheduleTask(PlanTask):
"""Build the project schedule and generate Gantt charts."""

def output(self):
return {
'dhtmlx_html': self.local_target(FilenameEnum.SCHEDULE_GANTT_DHTMLX_HTML),
Expand Down
Loading