diff --git a/docs/superpowers/plans/2026-04-05-rca.md b/docs/superpowers/plans/2026-04-05-rca.md new file mode 100644 index 00000000..8b977049 --- /dev/null +++ b/docs/superpowers/plans/2026-04-05-rca.md @@ -0,0 +1,1498 @@ +# Root Cause Analysis (RCA) Implementation Plan + +> **Historical note:** This plan was written under the name "flaw tracer". The module +> has been renamed to `rca` (root cause analysis) — all paths referencing `rca` +> in this document now correspond to `rca`. +> The static DAG registry described here has since been replaced by `extract_dag.py` +> which introspects the Luigi task graph at import time. + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Build a CLI tool that traces problems in PlanExe reports upstream through the pipeline DAG to find root causes. + +**Architecture:** Recursive depth-first search using a DAG registry built from Luigi task introspection. Three LLM prompts (problem identification, upstream check, source code analysis) use Pydantic structured output via LLMExecutor. Produces JSON + markdown reports. + +**Tech Stack:** Python 3.13, llama-index LLM infrastructure, Pydantic v2, argparse, pytest + +--- + +## File Structure + +``` +worker_plan/worker_plan_internal/rca/ + __init__.py — Package init, exports RootCauseAnalyzer + registry.py — DAG mapping (auto-generated from Luigi task introspection) + prompts.py — 3 Pydantic models + 3 prompt builders + tracer.py — Recursive tracing algorithm (RootCauseAnalyzer class) + output.py — JSON + markdown report generation + __main__.py — CLI entry point (argparse) + tests/ + __init__.py + test_registry.py — Lookup function tests + test_prompts.py — Prompt construction tests + test_tracer.py — Tracing algorithm tests with mock LLM + test_output.py — Report generation tests +``` + +--- + +### Task 1: Registry — DAG Mapping + +**Files:** +- Create: `worker_plan/worker_plan_internal/rca/__init__.py` +- Create: `worker_plan/worker_plan_internal/rca/registry.py` +- Create: `worker_plan/worker_plan_internal/rca/tests/__init__.py` +- Create: `worker_plan/worker_plan_internal/rca/tests/test_registry.py` + +- [ ] **Step 1: Write the failing tests** + +```python +# worker_plan/worker_plan_internal/rca/tests/test_registry.py +import unittest +from pathlib import Path +from tempfile import TemporaryDirectory +from worker_plan_internal.rca.registry import ( + NodeInfo, + NODES, + find_node_by_filename, + get_upstream_files, + get_source_code_paths, +) + + +class TestNodeInfo(unittest.TestCase): + def test_stages_is_nonempty(self): + self.assertGreater(len(NODES), 40) + + def test_all_stages_have_required_fields(self): + for node in NODES: + self.assertIsInstance(node.name, str, f"{node.name} name") + self.assertIsInstance(node.output_files, list, f"{node.name} output_files") + self.assertTrue(len(node.output_files) > 0, f"{node.name} has no output_files") + self.assertIsInstance(node.inputs, list, f"{node.name} inputs") + self.assertIsInstance(node.source_code_files, list, f"{node.name} source_code_files") + self.assertIsInstance(node.primary_output, str, f"{node.name} primary_output") + self.assertIn(node.primary_output, node.output_files, f"{node.name} primary_output not in output_files") + + def test_no_duplicate_node_names(self): + names = [s.name for s in NODES] + self.assertEqual(len(names), len(set(names))) + + def test_upstream_references_are_valid(self): + valid_names = {s.name for s in NODES} + for node in NODES: + for upstream in node.inputs: + self.assertIn(upstream, valid_names, f"{node.name} references unknown upstream '{upstream}'") + + +class TestFindStageByFilename(unittest.TestCase): + def test_find_report(self): + stage = find_node_by_filename("030-report.html") + self.assertIsNotNone(stage) + self.assertEqual(node.name, "report") + + def test_find_potential_levers_clean(self): + stage = find_node_by_filename("002-10-potential_levers.json") + self.assertIsNotNone(stage) + self.assertEqual(node.name, "potential_levers") + + def test_find_potential_levers_raw(self): + stage = find_node_by_filename("002-9-potential_levers_raw.json") + self.assertIsNotNone(stage) + self.assertEqual(node.name, "potential_levers") + + def test_find_executive_summary(self): + stage = find_node_by_filename("025-2-executive_summary.md") + self.assertIsNotNone(stage) + self.assertEqual(node.name, "executive_summary") + + def test_unknown_filename_returns_none(self): + stage = find_node_by_filename("zzz-unknown.txt") + self.assertIsNone(stage) + + +class TestGetUpstreamFiles(unittest.TestCase): + def test_setup_has_no_upstream(self): + with TemporaryDirectory() as d: + result = get_upstream_files("setup", Path(d)) + self.assertEqual(result, []) + + def test_potential_levers_upstream(self): + with TemporaryDirectory() as d: + output_dir = Path(d) + # Create the expected upstream files on disk + (output_dir / "001-2-plan.txt").write_text("plan", encoding="utf-8") + (output_dir / "002-6-identify_purpose.md").write_text("purpose", encoding="utf-8") + (output_dir / "002-8-plan_type.md").write_text("type", encoding="utf-8") + (output_dir / "002-0-extract_constraints.md").write_text("constraints", encoding="utf-8") + + result = get_upstream_files("potential_levers", output_dir) + node_names = [name for name, _ in result] + self.assertIn("setup", node_names) + self.assertIn("identify_purpose", node_names) + self.assertIn("plan_type", node_names) + self.assertIn("extract_constraints", node_names) + + def test_missing_files_are_skipped(self): + with TemporaryDirectory() as d: + output_dir = Path(d) + # Only create one of the upstream files + (output_dir / "001-2-plan.txt").write_text("plan", encoding="utf-8") + + result = get_upstream_files("potential_levers", output_dir) + node_names = [name for name, _ in result] + self.assertIn("setup", node_names) + # The others should be skipped because their files don't exist + self.assertNotIn("identify_purpose", node_names) + + +class TestGetSourceCodePaths(unittest.TestCase): + def test_potential_levers_source(self): + paths = get_source_code_paths("potential_levers") + filenames = [p.name for p in paths] + self.assertIn("potential_levers.py", filenames) + self.assertIn("identify_potential_levers.py", filenames) + + def test_unknown_stage_returns_empty(self): + paths = get_source_code_paths("nonexistent_stage") + self.assertEqual(paths, []) +``` + +- [ ] **Step 2: Create package init files** + +```python +# worker_plan/worker_plan_internal/rca/__init__.py +"""RCA — Root-cause analysis for PlanExe reports.""" +``` + +```python +# worker_plan/worker_plan_internal/rca/tests/__init__.py +``` + +- [ ] **Step 3: Run tests to verify they fail** + +Run: `cd worker_plan && python -m pytest worker_plan_internal/rca/tests/test_registry.py -v` +Expected: FAIL with `ModuleNotFoundError` or `ImportError` + +- [ ] **Step 4: Implement registry.py** + +```python +# worker_plan/worker_plan_internal/rca/registry.py +"""Static DAG mapping for the PlanExe pipeline. + +Maps every pipeline stage to its output files, upstream dependencies, +and source code files. Derived from the Luigi task classes in +worker_plan_internal/plan/stages/. +""" +from dataclasses import dataclass, field +from pathlib import Path +from typing import Optional + +# Base path for source code, relative to worker_plan/ +_SOURCE_BASE = Path(__file__).resolve().parent.parent.parent # worker_plan/ + + +@dataclass(frozen=True) +class NodeInfo: + """One pipeline stage.""" + name: str + output_files: list[str] + primary_output: str # preferred file to read when checking for problems + inputs: list[str] = field(default_factory=list) + source_code_files: list[str] = field(default_factory=list) + + +# ── Pipeline registry (auto-generated at import time) ───────────────── +# The registry is built by extract_dag.py which introspects the Luigi +# task graph. See extract_dag.py for the full ~70-node DAG. +# Example entry: +NODES: tuple[NodeInfo, ...] = _build_registry() # see registry.py + +# Example NodeInfo: +# NodeInfo( +# name="potential_levers", +# output_files=("002-9-potential_levers_raw.json", "002-10-potential_levers.json"), +# inputs=(NodeInput(from_node="setup", artifact_path="001-2-plan.txt"), ...), +# source_code_files=("worker_plan_internal/plan/nodes/potential_levers.py", ...), +# ) + +_NODE_BY_NAME: dict[str, NodeInfo] = {n.name: n for n in NODES} +_NODE_BY_FILENAME: dict[str, NodeInfo] = {} +for _node in NODES: + for _fname in _node.output_files: + _NODE_BY_FILENAME[_fname] = _node + + +def find_node_by_filename(filename: str) -> NodeInfo | None: + return _NODE_BY_FILENAME.get(filename) + + +def get_upstream_files(node_name: str, output_dir: Path) -> list[tuple[str, Path]]: + node = _NODE_BY_NAME.get(node_name) + if node is None: + return [] + result = [] + for inp in node.inputs: + artifact_path = output_dir / inp.artifact_path + if artifact_path.exists(): + result.append((inp.from_node, artifact_path)) + return result + + +def get_source_code_paths(node_name: str) -> list[Path]: + node = _NODE_BY_NAME.get(node_name) + if node is None: + return [] + return [_SOURCE_BASE / f for f in node.source_code_files] +``` +- [ ] **Step 5: Run tests to verify they pass** + +Run: `cd worker_plan && python -m pytest worker_plan_internal/rca/tests/test_registry.py -v` +Expected: All tests PASS + +- [ ] **Step 6: Commit** + +```bash +git add worker_plan/worker_plan_internal/rca/__init__.py worker_plan/worker_plan_internal/rca/registry.py worker_plan/worker_plan_internal/rca/tests/__init__.py worker_plan/worker_plan_internal/rca/tests/test_registry.py +git commit -m "feat: add rca registry with full pipeline DAG mapping" +``` + +--- + +### Task 2: Prompts — Pydantic Models and Prompt Builders + +**Files:** +- Create: `worker_plan/worker_plan_internal/rca/prompts.py` +- Create: `worker_plan/worker_plan_internal/rca/tests/test_prompts.py` + +- [ ] **Step 1: Write the failing tests** + +```python +# worker_plan/worker_plan_internal/rca/tests/test_prompts.py +import unittest +from llama_index.core.llms import ChatMessage, MessageRole +from worker_plan_internal.rca.prompts import ( + IdentifiedProblem, + ProblemIdentificationResult, + UpstreamCheckResult, + SourceCodeAnalysisResult, + build_problem_identification_messages, + build_upstream_check_messages, + build_source_code_analysis_messages, +) + + +class TestPydanticModels(unittest.TestCase): + def test_identified_problem_valid(self): + problem = IdentifiedProblem( + description="Budget figure is fabricated", + evidence="The budget is CZK 500,000", + severity="HIGH", + ) + self.assertEqual(problem.severity, "HIGH") + + def test_identified_problem_rejects_invalid_severity(self): + with self.assertRaises(Exception): + IdentifiedProblem( + description="test", + evidence="test", + severity="CRITICAL", + ) + + def test_problem_identification_result(self): + result = ProblemIdentificationResult(problems=[ + IdentifiedProblem(description="test", evidence="quote", severity="LOW"), + ]) + self.assertEqual(len(result.problems), 1) + + def test_upstream_check_result_found(self): + result = UpstreamCheckResult(found=True, evidence="quote", explanation="precursor") + self.assertTrue(result.found) + self.assertEqual(result.evidence, "quote") + + def test_upstream_check_result_not_found(self): + result = UpstreamCheckResult(found=False, evidence=None, explanation="clean") + self.assertFalse(result.found) + + def test_source_code_analysis_result(self): + result = SourceCodeAnalysisResult( + likely_cause="prompt lacks validation", + relevant_code_section="system_prompt = ...", + suggestion="add grounding check", + ) + self.assertIsInstance(result.likely_cause, str) + + +class TestBuildProblemIdentificationMessages(unittest.TestCase): + def test_returns_chat_messages(self): + messages = build_problem_identification_messages( + filename="030-report.html", + file_content="report content", + user_problem_description="budget is wrong", + ) + self.assertIsInstance(messages, list) + self.assertEqual(len(messages), 2) + self.assertEqual(messages[0].role, MessageRole.SYSTEM) + self.assertEqual(messages[1].role, MessageRole.USER) + + def test_user_message_contains_inputs(self): + messages = build_problem_identification_messages( + filename="025-2-executive_summary.md", + file_content="# Summary\nBudget: 500k", + user_problem_description="fabricated budget", + ) + user_content = messages[1].content + self.assertIn("025-2-executive_summary.md", user_content) + self.assertIn("# Summary", user_content) + self.assertIn("fabricated budget", user_content) + + +class TestBuildUpstreamCheckMessages(unittest.TestCase): + def test_returns_chat_messages(self): + messages = build_upstream_check_messages( + problem_description="Budget is fabricated", + evidence_quote="CZK 500,000", + upstream_filename="005-2-project_plan.md", + upstream_file_content="# Project Plan\nBudget: 500k", + ) + self.assertIsInstance(messages, list) + self.assertEqual(len(messages), 2) + + def test_user_message_contains_problem_and_upstream(self): + messages = build_upstream_check_messages( + problem_description="Missing market sizing", + evidence_quote="growing Czech market", + upstream_filename="003-5-make_assumptions.md", + upstream_file_content="# Assumptions\nMarket is growing", + ) + user_content = messages[1].content + self.assertIn("Missing market sizing", user_content) + self.assertIn("growing Czech market", user_content) + self.assertIn("003-5-make_assumptions.md", user_content) + + +class TestBuildSourceCodeAnalysisMessages(unittest.TestCase): + def test_returns_chat_messages(self): + messages = build_source_code_analysis_messages( + problem_description="Budget fabricated", + evidence_quote="CZK 500,000", + source_code_contents=[ + ("stages/make_assumptions.py", "class MakeAssumptionsTask: ..."), + ("assume/make_assumptions.py", "def execute(llm, query): ..."), + ], + ) + self.assertIsInstance(messages, list) + self.assertEqual(len(messages), 2) + + def test_user_message_contains_source_code(self): + messages = build_source_code_analysis_messages( + problem_description="Missing analysis", + evidence_quote="no data", + source_code_contents=[ + ("my_stage.py", "SYSTEM_PROMPT = 'Generate assumptions'"), + ], + ) + user_content = messages[1].content + self.assertIn("my_stage.py", user_content) + self.assertIn("SYSTEM_PROMPT", user_content) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `cd worker_plan && python -m pytest worker_plan_internal/rca/tests/test_prompts.py -v` +Expected: FAIL with `ImportError` + +- [ ] **Step 3: Implement prompts.py** + +```python +# worker_plan/worker_plan_internal/rca/prompts.py +"""Pydantic models and prompt builders for RCA.""" +from typing import Literal +from pydantic import BaseModel, Field +from llama_index.core.llms import ChatMessage, MessageRole + + +# ── Pydantic models for structured LLM output ────────────────────────── + +class IdentifiedProblem(BaseModel): + """A discrete problem found in a pipeline output file.""" + description: str = Field(description="One-sentence description of the problem") + evidence: str = Field(description="Direct quote from the file demonstrating the problem") + severity: Literal["HIGH", "MEDIUM", "LOW"] = Field( + description="HIGH: fabricated data or missing critical analysis. MEDIUM: weak reasoning or vague claims. LOW: minor gaps." + ) + + +class ProblemIdentificationResult(BaseModel): + """Result of analyzing a file for problems.""" + problems: list[IdentifiedProblem] = Field(description="List of discrete problems found in the file") + + +class UpstreamCheckResult(BaseModel): + """Result of checking an upstream file for a problem precursor.""" + found: bool = Field(description="True if this file contains the problem or a precursor to it") + evidence: str | None = Field(description="Direct quote from the file if found, null otherwise") + explanation: str = Field(description="How this connects to the downstream problem, or why this file is clean") + + +class SourceCodeAnalysisResult(BaseModel): + """Result of analyzing source code at a problem's origin stage.""" + likely_cause: str = Field(description="What in the prompt or logic likely caused the problem") + relevant_code_section: str = Field(description="The specific code or prompt text responsible") + suggestion: str = Field(description="How to fix or prevent this problem") + + +# ── Prompt builders ───────────────────────────────────────────────────── + +def build_problem_identification_messages( + filename: str, + file_content: str, + user_problem_description: str, +) -> list[ChatMessage]: + """Build messages for Phase 1: identifying discrete problems in a file.""" + system = ( + "You are analyzing an intermediary file from a project planning pipeline.\n" + "The user has identified problems in this output. Identify each discrete problem.\n" + "For each problem, provide a short description (one sentence), a direct quote " + "from the file as evidence, and a severity level.\n" + "Only identify real problems — do not flag stylistic preferences or minor formatting issues.\n" + "Severity levels:\n" + "- HIGH: fabricated data, invented statistics, or missing critical analysis\n" + "- MEDIUM: weak reasoning, vague unsupported claims, or shallow treatment\n" + "- LOW: minor gaps that don't significantly impact the plan" + ) + user = ( + f"User's observation:\n{user_problem_description}\n\n" + f"Filename: {filename}\n" + f"File content:\n{file_content}" + ) + return [ + ChatMessage(role=MessageRole.SYSTEM, content=system), + ChatMessage(role=MessageRole.USER, content=user), + ] + + +def build_upstream_check_messages( + problem_description: str, + evidence_quote: str, + upstream_filename: str, + upstream_file_content: str, +) -> list[ChatMessage]: + """Build messages for Phase 2: checking if a problem exists in an upstream file.""" + system = ( + "You are tracing a problem through a project planning pipeline to find where it originated.\n" + "A downstream file contains a problem. You are examining an upstream file that was an input " + "to the stage that produced the problematic output.\n" + "Determine if this upstream file contains the same problem or a precursor to it.\n" + "If YES: quote the relevant passage and explain how it connects to the downstream problem.\n" + "If NO: explain why this file is clean regarding this specific problem." + ) + user = ( + f"Problem: {problem_description}\n" + f"Evidence from downstream: {evidence_quote}\n\n" + f"Upstream filename: {upstream_filename}\n" + f"Upstream file content:\n{upstream_file_content}" + ) + return [ + ChatMessage(role=MessageRole.SYSTEM, content=system), + ChatMessage(role=MessageRole.USER, content=user), + ] + + +def build_source_code_analysis_messages( + problem_description: str, + evidence_quote: str, + source_code_contents: list[tuple[str, str]], +) -> list[ChatMessage]: + """Build messages for Phase 3: analyzing source code at problem origin. + + Args: + source_code_contents: list of (filename, content) tuples + """ + system = ( + "A problem was introduced at this pipeline stage. The problem exists in its output " + "but NOT in any of its inputs, so this stage created it.\n" + "Examine the source code to identify what in the prompt text, logic, or processing " + "likely caused this problem. Be specific — point to lines or prompt phrases.\n" + "Focus on the system prompt text and the data transformation logic." + ) + source_sections = [] + for fname, content in source_code_contents: + source_sections.append(f"--- {fname} ---\n{content}") + source_text = "\n\n".join(source_sections) + + user = ( + f"Problem: {problem_description}\n" + f"Evidence from output: {evidence_quote}\n\n" + f"Source code files:\n{source_text}" + ) + return [ + ChatMessage(role=MessageRole.SYSTEM, content=system), + ChatMessage(role=MessageRole.USER, content=user), + ] +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `cd worker_plan && python -m pytest worker_plan_internal/rca/tests/test_prompts.py -v` +Expected: All tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add worker_plan/worker_plan_internal/rca/prompts.py worker_plan/worker_plan_internal/rca/tests/test_prompts.py +git commit -m "feat: add rca Pydantic models and prompt builders" +``` + +--- + +### Task 3: Tracer — Recursive Algorithm + +**Files:** +- Create: `worker_plan/worker_plan_internal/rca/tracer.py` +- Create: `worker_plan/worker_plan_internal/rca/tests/test_tracer.py` + +- [ ] **Step 1: Write the failing tests** + +```python +# worker_plan/worker_plan_internal/rca/tests/test_tracer.py +import json +import unittest +from pathlib import Path +from tempfile import TemporaryDirectory + +from worker_plan_internal.rca.tracer import RootCauseAnalyzer, RCAResult, TracedProblem, TraceEntry +from worker_plan_internal.llm_util.response_mockllm import ResponseMockLLM +from worker_plan_internal.llm_util.llm_executor import LLMExecutor, LLMModelWithInstance + + +def _make_executor(responses: list[str]) -> LLMExecutor: + """Create an LLMExecutor backed by a mock LLM with pre-set responses.""" + llm = ResponseMockLLM(responses=responses) + llm_models = LLMModelWithInstance.from_instances([llm]) + return LLMExecutor(llm_models=llm_models) + + +class TestRCAResult(unittest.TestCase): + def test_dataclass_creation(self): + result = RCAResult( + starting_file="030-report.html", + problem_description="test", + output_dir="/tmp/test", + problems=[], + llm_calls_made=0, + ) + self.assertEqual(result.starting_file, "030-report.html") + self.assertEqual(len(result.problems), 0) + + +class TestRootCauseAnalyzerPhase1(unittest.TestCase): + """Test problem identification (Phase 1) using mock LLM.""" + + def test_identify_problems(self): + """The tracer should parse LLM output into IdentifiedProblem objects.""" + with TemporaryDirectory() as d: + output_dir = Path(d) + # Create a minimal output file + report_file = output_dir / "025-2-executive_summary.md" + report_file.write_text("# Summary\nBudget: CZK 500,000", encoding="utf-8") + # Create upstream file so trace can proceed + (output_dir / "005-2-project_plan.md").write_text("# Plan", encoding="utf-8") + + # Mock LLM response for problem identification (Phase 1) + problem_response = json.dumps({ + "problems": [ + { + "description": "Budget is unvalidated", + "evidence": "CZK 500,000", + "severity": "HIGH", + } + ] + }) + # Mock LLM response for upstream check (Phase 2) — not found, so origin is here + upstream_response = json.dumps({ + "found": False, + "evidence": None, + "explanation": "No budget mentioned upstream", + }) + # Mock LLM response for source code analysis (Phase 3) + source_response = json.dumps({ + "likely_cause": "Prompt asks for budget without data", + "relevant_code_section": "system_prompt = ...", + "suggestion": "Add validation step", + }) + + executor = _make_executor([problem_response, upstream_response, source_response]) + source_base = Path(__file__).resolve().parent.parent.parent.parent # worker_plan/ + tracer = RootCauseAnalyzer( + output_dir=output_dir, + llm_executor=executor, + source_code_base=source_base, + max_depth=15, + verbose=False, + ) + result = tracer.trace("025-2-executive_summary.md", "budget is unvalidated") + + self.assertIsInstance(result, RCAResult) + self.assertGreaterEqual(len(result.problems), 1) + problem = result.problems[0] + self.assertEqual(problem.description, "Budget is unvalidated") + self.assertEqual(problem.severity, "HIGH") + + +class TestRootCauseAnalyzerUpstreamTrace(unittest.TestCase): + """Test upstream tracing (Phase 2) with a simple two-level chain.""" + + def test_traces_problem_upstream(self): + with TemporaryDirectory() as d: + output_dir = Path(d) + # Create files for a simple chain: executive_summary -> project_plan -> setup + (output_dir / "025-2-executive_summary.md").write_text("Budget: CZK 500,000", encoding="utf-8") + (output_dir / "005-2-project_plan.md").write_text("Budget: CZK 500,000", encoding="utf-8") + (output_dir / "001-2-plan.txt").write_text("Open a tea shop", encoding="utf-8") + # Create other upstream files that executive_summary depends on + (output_dir / "002-14-strategic_decisions.md").write_text("decisions", encoding="utf-8") + (output_dir / "002-19-scenarios.md").write_text("scenarios", encoding="utf-8") + (output_dir / "003-10-consolidate_assumptions_full.md").write_text("assumptions", encoding="utf-8") + + responses = [ + # Phase 1: identify problems in executive_summary + json.dumps({"problems": [{"description": "Budget fabricated", "evidence": "CZK 500,000", "severity": "HIGH"}]}), + # Phase 2: check each upstream of executive_summary + # strategic_decisions_markdown + json.dumps({"found": False, "evidence": None, "explanation": "clean"}), + # scenarios_markdown + json.dumps({"found": False, "evidence": None, "explanation": "clean"}), + # consolidate_assumptions_markdown + json.dumps({"found": False, "evidence": None, "explanation": "clean"}), + # project_plan — problem found here + json.dumps({"found": True, "evidence": "Budget: CZK 500,000", "explanation": "Budget originates here"}), + # Now trace project_plan's upstreams + # setup + json.dumps({"found": False, "evidence": None, "explanation": "clean"}), + # strategic_decisions_markdown (already checked, dedup) + # scenarios_markdown (already checked, dedup) + # consolidate_assumptions_markdown (already checked, dedup) + # pre_project_assessment (not on disk, skipped) + # Phase 3: source code analysis at project_plan (the origin) + json.dumps({"likely_cause": "Prompt generates budget", "relevant_code_section": "...", "suggestion": "fix"}), + ] + + executor = _make_executor(responses) + source_base = Path(__file__).resolve().parent.parent.parent.parent + tracer = RootCauseAnalyzer( + output_dir=output_dir, + llm_executor=executor, + source_code_base=source_base, + max_depth=15, + verbose=False, + ) + result = tracer.trace("025-2-executive_summary.md", "budget is fabricated") + + self.assertEqual(len(result.problems), 1) + problem = result.problems[0] + # The trace should include at least executive_summary and project_plan + trace_stages = [entry.stage for entry in problem.trace] + self.assertIn("executive_summary", trace_stages) + self.assertIn("project_plan", trace_stages) + # Origin should be project_plan (problem found there but not in its upstream) + self.assertEqual(problem.origin_stage, "project_plan") + + +class TestRootCauseAnalyzerMaxDepth(unittest.TestCase): + def test_respects_max_depth(self): + with TemporaryDirectory() as d: + output_dir = Path(d) + (output_dir / "025-2-executive_summary.md").write_text("Budget: 500k", encoding="utf-8") + + responses = [ + json.dumps({"problems": [{"description": "test problem", "evidence": "500k", "severity": "LOW"}]}), + ] + executor = _make_executor(responses) + source_base = Path(__file__).resolve().parent.parent.parent.parent + tracer = RootCauseAnalyzer( + output_dir=output_dir, + llm_executor=executor, + source_code_base=source_base, + max_depth=0, # zero depth = no upstream tracing + verbose=False, + ) + result = tracer.trace("025-2-executive_summary.md", "test") + + self.assertEqual(len(result.problems), 1) + # With max_depth=0, no upstream tracing happens + self.assertEqual(len(result.problems[0].trace), 1) # only the starting file +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `cd worker_plan && python -m pytest worker_plan_internal/rca/tests/test_tracer.py -v` +Expected: FAIL with `ImportError` + +- [ ] **Step 3: Implement tracer.py** + +```python +# worker_plan/worker_plan_internal/rca/tracer.py +"""Recursive depth-first root cause analyzer for PlanExe pipeline outputs.""" +import json +import logging +import sys +from dataclasses import dataclass, field +from pathlib import Path +from typing import Optional + +from llama_index.core.llms.llm import LLM + +from worker_plan_internal.rca.registry import ( + find_node_by_filename, + get_upstream_files, + get_source_code_paths, +) +from worker_plan_internal.rca.prompts import ( + ProblemIdentificationResult, + UpstreamCheckResult, + SourceCodeAnalysisResult, + build_problem_identification_messages, + build_upstream_check_messages, + build_source_code_analysis_messages, +) +from worker_plan_internal.llm_util.llm_executor import LLMExecutor + +logger = logging.getLogger(__name__) + + +@dataclass +class TraceEntry: + """One hop in a problem's upstream trace.""" + stage: str + file: str + evidence: str + is_origin: bool = False + + +@dataclass +class OriginInfo: + """Source code analysis at a problem's origin stage.""" + stage: str + file: str + source_code_files: list[str] + likely_cause: str + suggestion: str + + +@dataclass +class TracedProblem: + """A fully traced problem with its upstream chain.""" + id: str + description: str + severity: str + starting_evidence: str + trace: list[TraceEntry] + origin_stage: Optional[str] = None + origin: Optional[OriginInfo] = None + depth: int = 0 + trace_complete: bool = True + + +@dataclass +class RCAResult: + """Complete result of a root cause analysis run.""" + starting_file: str + problem_description: str + output_dir: str + problems: list[TracedProblem] + llm_calls_made: int = 0 + + +class RootCauseAnalyzer: + """Traces problems upstream through the PlanExe pipeline DAG.""" + + def __init__( + self, + output_dir: Path, + llm_executor: LLMExecutor, + source_code_base: Path, + max_depth: int = 15, + verbose: bool = False, + ): + self.output_dir = output_dir + self.llm_executor = llm_executor + self.source_code_base = source_code_base + self.max_depth = max_depth + self.verbose = verbose + self._llm_calls = 0 + self._checked: set[tuple[str, str]] = set() # (node_name, problem_description) dedup + + def trace(self, starting_file: str, problem_description: str) -> RCAResult: + """Main entry point. Identify problems and trace each upstream.""" + self._llm_calls = 0 + self._checked.clear() + + file_path = self.output_dir / starting_file + if not file_path.exists(): + raise FileNotFoundError(f"Starting file not found: {file_path}") + + file_content = file_path.read_text(encoding="utf-8") + stage = find_node_by_filename(starting_file) + node_name = node.name if stage else "unknown" + + # Phase 1: Identify problems + self._log(f"Phase 1: Identifying problems in {starting_file}") + identified = self._identify_problems(starting_file, file_content, problem_description) + self._log(f" Found {len(identified.problems)} problem(s)") + + traced_problems: list[TracedProblem] = [] + for i, problem in enumerate(identified.problems): + problem_id = f"problem_{i + 1:03d}" + self._log(f"\nTracing {problem_id}: {problem.description}") + + starting_entry = TraceEntry( + stage=node_name, + file=starting_file, + evidence=problem.evidence, + is_origin=False, + ) + + traced = TracedProblem( + id=problem_id, + description=problem.description, + severity=problem.severity, + starting_evidence=problem.evidence, + trace=[starting_entry], + ) + + if stage and self.max_depth > 0: + self._trace_upstream(traced, node_name, problem.description, problem.evidence, depth=0) + + # Mark the last trace entry as origin if no deeper origin was found + if traced.origin_stage is None and traced.trace: + last = traced.trace[-1] + last.is_origin = True + traced.origin_stage = last.stage + traced.depth = len(traced.trace) - 1 + + # Phase 3: Source code analysis at origin + self._analyze_source_code(traced, last.stage, problem.description, last.evidence) + + traced_problems.append(traced) + + # Sort by depth (deepest origin first) + traced_problems.sort(key=lambda f: f.depth, reverse=True) + + return RCAResult( + starting_file=starting_file, + problem_description=problem_description, + output_dir=str(self.output_dir), + problems=traced_problems, + llm_calls_made=self._llm_calls, + ) + + def _identify_problems(self, filename: str, file_content: str, user_description: str) -> ProblemIdentificationResult: + """Phase 1: Ask LLM to identify discrete problems in the starting file.""" + messages = build_problem_identification_messages(filename, file_content, user_description) + + def execute(llm: LLM) -> ProblemIdentificationResult: + sllm = llm.as_structured_llm(ProblemIdentificationResult) + response = sllm.chat(messages) + return response.raw + + self._llm_calls += 1 + return self.llm_executor.run(execute) + + def _check_upstream(self, problem_description: str, evidence: str, upstream_filename: str, upstream_content: str) -> UpstreamCheckResult: + """Phase 2: Ask LLM if a problem exists in an upstream file.""" + messages = build_upstream_check_messages(problem_description, evidence, upstream_filename, upstream_content) + + def execute(llm: LLM) -> UpstreamCheckResult: + sllm = llm.as_structured_llm(UpstreamCheckResult) + response = sllm.chat(messages) + return response.raw + + self._llm_calls += 1 + return self.llm_executor.run(execute) + + def _trace_upstream( + self, + traced: TracedProblem, + current_stage: str, + problem_description: str, + evidence: str, + depth: int, + ) -> None: + """Recursively trace a problem through upstream stages.""" + if depth >= self.max_depth: + traced.trace_complete = False + self._log(f" Max depth {self.max_depth} reached at {current_stage}") + return + + upstream_files = get_upstream_files(current_stage, self.output_dir) + if not upstream_files: + return # No upstream = this is the origin + + found_upstream = False + for upstream_name, upstream_path in upstream_files: + dedup_key = (upstream_name, problem_description) + if dedup_key in self._checked: + self._log(f" Skipping {upstream_name} (already checked for this problem)") + continue + self._checked.add(dedup_key) + + upstream_content = upstream_path.read_text(encoding="utf-8") + self._log(f" Checking upstream: {upstream_name} ({upstream_path.name})") + + result = self._check_upstream(problem_description, evidence, upstream_path.name, upstream_content) + + if result.found: + self._log(f" -> FOUND in {upstream_name}") + found_upstream = True + entry = TraceEntry( + stage=upstream_name, + file=upstream_path.name, + evidence=result.evidence or "", + is_origin=False, + ) + traced.trace.append(entry) + + # Recurse deeper + self._trace_upstream( + traced, upstream_name, problem_description, + result.evidence or evidence, depth + 1, + ) + # After recursion, if origin was found deeper, stop tracing other branches + if traced.origin_stage is not None: + return + + if not found_upstream: + # Current stage is the origin — problem exists here but not in any upstream + traced.origin_stage = current_stage + traced.depth = len(traced.trace) + # Mark the current stage entry as origin + for entry in traced.trace: + if entry.stage == current_stage: + entry.is_origin = True + + def _analyze_source_code(self, traced: TracedProblem, node_name: str, problem_description: str, evidence: str) -> None: + """Phase 3: Analyze source code at the origin stage.""" + source_paths = get_source_code_paths(node_name) + if not source_paths: + return + + source_contents: list[tuple[str, str]] = [] + for path in source_paths: + if path.exists(): + content = path.read_text(encoding="utf-8") + source_contents.append((path.name, content)) + + if not source_contents: + return + + self._log(f" Phase 3: Analyzing source code for {node_name}") + messages = build_source_code_analysis_messages(problem_description, evidence, source_contents) + + def execute(llm: LLM) -> SourceCodeAnalysisResult: + sllm = llm.as_structured_llm(SourceCodeAnalysisResult) + response = sllm.chat(messages) + return response.raw + + self._llm_calls += 1 + try: + analysis = self.llm_executor.run(execute) + source_file_names = [name for name, _ in source_contents] + traced.origin = OriginInfo( + stage=node_name, + file=traced.trace[-1].file if traced.trace else "", + source_code_files=source_file_names, + likely_cause=analysis.likely_cause, + suggestion=analysis.suggestion, + ) + except Exception as e: + logger.warning(f"Source code analysis failed for {node_name}: {e}") + + def _log(self, message: str) -> None: + """Print to stderr if verbose mode is enabled.""" + if self.verbose: + print(message, file=sys.stderr) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `cd worker_plan && python -m pytest worker_plan_internal/rca/tests/test_tracer.py -v` +Expected: All tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add worker_plan/worker_plan_internal/rca/tracer.py worker_plan/worker_plan_internal/rca/tests/test_tracer.py +git commit -m "feat: add rca recursive tracing algorithm" +``` + +--- + +### Task 4: Output — JSON and Markdown Reports + +**Files:** +- Create: `worker_plan/worker_plan_internal/rca/output.py` +- Create: `worker_plan/worker_plan_internal/rca/tests/test_output.py` + +- [ ] **Step 1: Write the failing tests** + +```python +# worker_plan/worker_plan_internal/rca/tests/test_output.py +import json +import unittest +from pathlib import Path +from tempfile import TemporaryDirectory + +from worker_plan_internal.rca.tracer import ( + RCAResult, + TracedProblem, + TraceEntry, + OriginInfo, +) +from worker_plan_internal.rca.output import write_json_report, write_markdown_report + + +def _make_sample_result() -> RCAResult: + """Create a sample RCAResult for testing.""" + return RCAResult( + starting_file="025-2-executive_summary.md", + problem_description="Budget is unvalidated", + output_dir="/tmp/test_output", + problems=[ + TracedProblem( + id="problem_001", + description="Budget of CZK 500,000 is unvalidated", + severity="HIGH", + starting_evidence="CZK 500,000", + trace=[ + TraceEntry(stage="executive_summary", file="025-2-executive_summary.md", evidence="CZK 500,000", is_origin=False), + TraceEntry(stage="project_plan", file="005-2-project_plan.md", evidence="Budget: 500k", is_origin=False), + TraceEntry(stage="make_assumptions", file="003-5-make_assumptions.md", evidence="Assume budget of 500k", is_origin=True), + ], + origin_stage="make_assumptions", + origin=OriginInfo( + stage="make_assumptions", + file="003-5-make_assumptions.md", + source_code_files=["make_assumptions.py"], + likely_cause="Prompt generates budget without data", + suggestion="Add validation step", + ), + depth=3, + ), + TracedProblem( + id="problem_002", + description="Missing market sizing", + severity="MEDIUM", + starting_evidence="growing Czech market", + trace=[ + TraceEntry(stage="executive_summary", file="025-2-executive_summary.md", evidence="growing Czech market", is_origin=True), + ], + origin_stage="executive_summary", + depth=1, + ), + ], + llm_calls_made=8, + ) + + +class TestWriteJsonReport(unittest.TestCase): + def test_writes_valid_json(self): + with TemporaryDirectory() as d: + output_path = Path(d) / "root_cause_analysis.json" + result = _make_sample_result() + write_json_report(result, output_path) + + self.assertTrue(output_path.exists()) + data = json.loads(output_path.read_text(encoding="utf-8")) + self.assertIn("input", data) + self.assertIn("problems", data) + self.assertIn("summary", data) + + def test_json_contains_correct_summary(self): + with TemporaryDirectory() as d: + output_path = Path(d) / "root_cause_analysis.json" + result = _make_sample_result() + write_json_report(result, output_path) + + data = json.loads(output_path.read_text(encoding="utf-8")) + summary = data["summary"] + self.assertEqual(summary["total_problems"], 2) + self.assertEqual(summary["deepest_origin_node"], "make_assumptions") + self.assertEqual(summary["deepest_origin_depth"], 3) + self.assertEqual(summary["llm_calls_made"], 8) + + def test_json_problems_sorted_by_depth(self): + with TemporaryDirectory() as d: + output_path = Path(d) / "root_cause_analysis.json" + result = _make_sample_result() + write_json_report(result, output_path) + + data = json.loads(output_path.read_text(encoding="utf-8")) + depths = [f["depth"] for f in data["problems"]] + self.assertEqual(depths, sorted(depths, reverse=True)) + + +class TestWriteMarkdownReport(unittest.TestCase): + def test_writes_markdown_file(self): + with TemporaryDirectory() as d: + output_path = Path(d) / "root_cause_analysis.md" + result = _make_sample_result() + write_markdown_report(result, output_path) + + self.assertTrue(output_path.exists()) + content = output_path.read_text(encoding="utf-8") + self.assertIn("# Root Cause Analysis Report", content) + + def test_markdown_contains_problem_details(self): + with TemporaryDirectory() as d: + output_path = Path(d) / "root_cause_analysis.md" + result = _make_sample_result() + write_markdown_report(result, output_path) + + content = output_path.read_text(encoding="utf-8") + self.assertIn("Budget of CZK 500,000 is unvalidated", content) + self.assertIn("make_assumptions", content) + self.assertIn("executive_summary", content) + + def test_markdown_contains_trace_table(self): + with TemporaryDirectory() as d: + output_path = Path(d) / "root_cause_analysis.md" + result = _make_sample_result() + write_markdown_report(result, output_path) + + content = output_path.read_text(encoding="utf-8") + self.assertIn("| Node |", content) + self.assertIn("| File |", content) + + def test_empty_result_produces_valid_markdown(self): + with TemporaryDirectory() as d: + output_path = Path(d) / "root_cause_analysis.md" + result = RCAResult( + starting_file="030-report.html", + problem_description="test", + output_dir="/tmp", + problems=[], + llm_calls_made=1, + ) + write_markdown_report(result, output_path) + + content = output_path.read_text(encoding="utf-8") + self.assertIn("Problems found:** 0", content) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `cd worker_plan && python -m pytest worker_plan_internal/rca/tests/test_output.py -v` +Expected: FAIL with `ImportError` + +- [ ] **Step 3: Implement output.py** + +```python +# worker_plan/worker_plan_internal/rca/output.py +"""JSON and markdown report generation for root cause analysis results.""" +import json +from datetime import datetime, UTC +from pathlib import Path + +from worker_plan_internal.rca.tracer import RCAResult + + +def write_json_report(result: RCAResult, output_path: Path) -> None: + """Write the RCA result as a JSON file.""" + data = { + "input": { + "starting_file": result.starting_file, + "problem_description": result.problem_description, + "output_dir": result.output_dir, + "timestamp": datetime.now(UTC).isoformat(), + }, + "problems": [], + "summary": { + "total_problems": len(result.problems), + "deepest_origin_node": None, + "deepest_origin_depth": 0, + "llm_calls_made": result.llm_calls_made, + }, + } + + max_depth = 0 + deepest_stage = None + + for problem in result.problems: + problem_data = { + "id": problem.id, + "description": problem.description, + "severity": problem.severity, + "starting_evidence": problem.starting_evidence, + "trace": [ + { + "stage": entry.stage, + "file": entry.file, + "evidence": entry.evidence, + "is_origin": entry.is_origin, + } + for entry in problem.trace + ], + "origin": None, + "depth": problem.depth, + "trace_complete": problem.trace_complete, + } + + if problem.origin: + problem_data["origin"] = { + "stage": problem.origin.stage, + "file": problem.origin.file, + "source_code_files": problem.origin.source_code_files, + "likely_cause": problem.origin.likely_cause, + "suggestion": problem.origin.suggestion, + } + + if problem.depth > max_depth: + max_depth = problem.depth + deepest_stage = problem.origin_stage + + data["problems"].append(problem_data) + + data["summary"]["deepest_origin_node"] = deepest_stage + data["summary"]["deepest_origin_depth"] = max_depth + + output_path.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8") + + +def write_markdown_report(result: RCAResult, output_path: Path) -> None: + """Write the RCA result as a markdown report.""" + lines: list[str] = [] + lines.append("# Root Cause Analysis Report") + lines.append("") + lines.append(f"**Input:** {result.starting_file}") + lines.append(f"**Problems found:** {len(result.problems)}") + + if result.problems: + deepest = max(result.problems, key=lambda f: f.depth) + lines.append(f"**Deepest origin:** {deepest.origin_stage} (depth {deepest.depth})") + lines.append(f"**LLM calls:** {result.llm_calls_made}") + lines.append("") + + for problem in result.problems: + lines.append("---") + lines.append("") + lines.append(f"## {problem.id.replace('_', ' ').title()} ({problem.severity}): {problem.description}") + lines.append("") + + # Trace chain summary + node_names = [entry.stage for entry in problem.trace] + chain_parts = [] + for name in node_names: + if name == problem.origin_stage: + chain_parts.append(f"**{name}** (origin)") + else: + chain_parts.append(name) + lines.append(f"**Trace:** {' -> '.join(chain_parts)}") + lines.append("") + + if not problem.trace_complete: + lines.append("*Note: trace incomplete — max depth reached.*") + lines.append("") + + # Trace table + lines.append("| Node | File | Evidence |") + lines.append("|-------|------|----------|") + for entry in problem.trace: + node_cell = f"**{entry.stage}**" if entry.is_origin else entry.stage + evidence_cell = _escape_table_cell(entry.evidence) + lines.append(f"| {node_cell} | {entry.file} | {evidence_cell} |") + lines.append("") + + # Origin analysis + if problem.origin: + lines.append(f"**Root cause:** {problem.origin.likely_cause}") + lines.append("") + lines.append(f"**Source files:** {', '.join(problem.origin.source_code_files)}") + lines.append("") + lines.append(f"**Suggestion:** {problem.origin.suggestion}") + lines.append("") + + output_path.write_text("\n".join(lines), encoding="utf-8") + + +def _escape_table_cell(text: str) -> str: + """Escape pipe characters and collapse newlines for markdown table cells.""" + return text.replace("|", "\\|").replace("\n", " ") +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `cd worker_plan && python -m pytest worker_plan_internal/rca/tests/test_output.py -v` +Expected: All tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add worker_plan/worker_plan_internal/rca/output.py worker_plan/worker_plan_internal/rca/tests/test_output.py +git commit -m "feat: add rca JSON and markdown report generation" +``` + +--- + +### Task 5: CLI Entry Point + +**Files:** +- Create: `worker_plan/worker_plan_internal/rca/__main__.py` + +- [ ] **Step 1: Implement __main__.py** + +```python +# worker_plan/worker_plan_internal/rca/__main__.py +"""CLI entry point for RCA. + +Usage: + python -m worker_plan_internal.rca \ + --dir /path/to/output \ + --file 030-report.html \ + --problem "The budget appears unvalidated..." \ + --output-dir /path/to/output \ + --max-depth 15 \ + --verbose +""" +import argparse +import sys +from pathlib import Path + +from worker_plan_internal.rca.tracer import RootCauseAnalyzer +from worker_plan_internal.rca.output import write_json_report, write_markdown_report +from worker_plan_internal.llm_util.llm_executor import LLMExecutor, LLMModelFromName, RetryConfig +from worker_plan_internal.llm_factory import get_llm_names_by_priority + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Trace problems in PlanExe reports upstream to their root cause.", + ) + parser.add_argument( + "--dir", required=True, type=Path, + help="Path to the output directory containing intermediary files", + ) + parser.add_argument( + "--file", required=True, + help="Starting file to analyze (relative to --dir)", + ) + parser.add_argument( + "--problem", required=True, + help="Text description of the observed problem(s)", + ) + parser.add_argument( + "--output-dir", type=Path, default=None, + help="Where to write root_cause_analysis.json and root_cause_analysis.md (defaults to --dir)", + ) + parser.add_argument( + "--max-depth", type=int, default=15, + help="Maximum upstream hops per problem (default: 15)", + ) + parser.add_argument( + "--verbose", action="store_true", + help="Print each LLM call and result to stderr", + ) + args = parser.parse_args() + + output_dir: Path = args.dir.resolve() + if not output_dir.is_dir(): + print(f"Error: --dir is not a directory: {output_dir}", file=sys.stderr) + sys.exit(1) + + starting_file = args.file + if not (output_dir / starting_file).exists(): + print(f"Error: starting file not found: {output_dir / starting_file}", file=sys.stderr) + sys.exit(1) + + report_dir: Path = (args.output_dir or args.dir).resolve() + report_dir.mkdir(parents=True, exist_ok=True) + + # Set up LLM executor with priority-ordered models from the active profile + llm_names = get_llm_names_by_priority() + if not llm_names: + print("Error: no LLM models configured. Check PLANEXE_MODEL_PROFILE.", file=sys.stderr) + sys.exit(1) + + llm_models = LLMModelFromName.from_names(llm_names) + executor = LLMExecutor( + llm_models=llm_models, + retry_config=RetryConfig(max_retries=2), + max_validation_retries=1, + ) + + # Source code base is the worker_plan/ directory + source_code_base = Path(__file__).resolve().parent.parent.parent + + tracer = RootCauseAnalyzer( + output_dir=output_dir, + llm_executor=executor, + source_code_base=source_code_base, + max_depth=args.max_depth, + verbose=args.verbose, + ) + + print(f"Tracing problems in {starting_file}...", file=sys.stderr) + result = tracer.trace(starting_file, args.problem) + + # Write reports + json_path = report_dir / "root_cause_analysis.json" + md_path = report_dir / "root_cause_analysis.md" + write_json_report(result, json_path) + write_markdown_report(result, md_path) + + # Print summary + print(f"\nProblems found: {len(result.problems)}", file=sys.stderr) + if result.problems: + deepest = max(result.problems, key=lambda f: f.depth) + print(f"Deepest origin: {deepest.origin_stage} (depth {deepest.depth})", file=sys.stderr) + print(f"LLM calls made: {result.llm_calls_made}", file=sys.stderr) + print(f"\nReports written:", file=sys.stderr) + print(f" JSON: {json_path}", file=sys.stderr) + print(f" Markdown: {md_path}", file=sys.stderr) + + +if __name__ == "__main__": + main() +``` + +- [ ] **Step 2: Verify the module is importable** + +Run: `cd worker_plan && python -c "from worker_plan_internal.rca.__main__ import main; print('OK')"` +Expected: `OK` + +- [ ] **Step 3: Verify --help works** + +Run: `cd worker_plan && python -m worker_plan_internal.rca --help` +Expected: Help text showing `--dir`, `--file`, `--problem`, `--output-dir`, `--max-depth`, `--verbose` + +- [ ] **Step 4: Commit** + +```bash +git add worker_plan/worker_plan_internal/rca/__main__.py +git commit -m "feat: add rca CLI entry point" +``` + +--- + +### Task 6: Run All Tests and Final Verification + +**Files:** +- No new files + +- [ ] **Step 1: Run the full test suite for the rca package** + +Run: `cd worker_plan && python -m pytest worker_plan_internal/rca/tests/ -v` +Expected: All tests PASS + +- [ ] **Step 2: Run the broader worker_plan test suite to check for regressions** + +Run: `cd worker_plan && python -m pytest -v --timeout=30` +Expected: No new failures + +- [ ] **Step 3: Commit any fixes if needed** + +If tests required fixes, commit them: +```bash +git add -u +git commit -m "fix: address test issues in rca" +``` diff --git a/docs/superpowers/specs/2026-04-05-rca-design.md b/docs/superpowers/specs/2026-04-05-rca-design.md new file mode 100644 index 00000000..49d4a0de --- /dev/null +++ b/docs/superpowers/specs/2026-04-05-rca-design.md @@ -0,0 +1,286 @@ +# Root Cause Analysis (RCA) for PlanExe Reports + +> **Historical note:** This spec was written under the name "flaw tracer". The module +> has been renamed to `rca` (root cause analysis). +> The static DAG registry described here has since been replaced by `extract_dag.py` +> which introspects the Luigi task graph at import time. + +## Goal + +A CLI tool that takes a PlanExe output directory, a starting file, and a problem description, then recursively traces the problem upstream through the DAG of intermediary files to find where it originated. Produces both JSON and markdown output. Built on PlanExe's existing LLM infrastructure so it can eventually become a pipeline stage. + +## Architecture + +The tool performs a recursive depth-first search through the pipeline DAG. Starting from a downstream file where a problem is observed, it walks upstream one hop at a time — reading input files, asking an LLM whether the problem or a precursor exists there, and continuing until it reaches a node where the problem exists in the output but not in any inputs. At that origin point, it reads the node's source code to identify the likely cause. + +Three LLM prompts drive the analysis: problem identification (once at the start), upstream checking (at each hop), and source code analysis (at each origin). All use Pydantic models for structured output and LLMExecutor for fallback resilience. + +## Components + +``` +worker_plan/worker_plan_internal/rca/ + __init__.py + __main__.py — CLI entry point (argparse, LLM setup, orchestration) + registry.py — Static DAG mapping: stages, output files, dependencies, source code paths + tracer.py — Recursive tracing algorithm + prompts.py — Pydantic models and LLM prompt templates + output.py — JSON + markdown report generation +``` + +### `registry.py` — DAG Mapping + +A static Python data structure mapping the full pipeline topology. Each entry describes one pipeline stage: + +```python +@dataclass +class NodeInfo: + name: str # e.g., "potential_levers" + output_files: list[str] # e.g., ["002-9-potential_levers_raw.json", "002-10-potential_levers.json"] + inputs: list[str] # e.g., ["setup", "identify_purpose", "plan_type", "extract_constraints"] + source_code_files: list[str] # Relative to worker_plan/, e.g., ["worker_plan_internal/plan/stages/potential_levers.py", "worker_plan_internal/lever/identify_potential_levers.py"] +``` + +The registry covers all ~48 pipeline stages. Key functions: + +- `find_node_by_filename(filename: str) -> NodeInfo | None` — Given an output filename, return the stage that produced it. +- `get_upstream_files(stage_name: str, output_dir: Path) -> list[tuple[str, Path]]` — Return `(stage_name, file_path)` pairs for all upstream stages, resolved against the output directory. Skip files that don't exist on disk. When a stage has multiple output files (e.g., both `_raw.json` and `.json`), prefer the clean/processed file since that's what downstream stages consume. If only the raw file exists, use that. +- `get_source_code_paths(stage_name: str) -> list[Path]` — Return absolute paths to source code files for a stage. + +The mapping is derived from the Luigi task classes (`requires()` and `output()` methods) but hard-coded for reliability. When the pipeline changes, this file needs updating. + +### `prompts.py` — Pydantic Models and Prompt Templates + +Three Pydantic models for structured LLM output: + +```python +class IdentifiedProblem(BaseModel): + description: str = Field(description="One-sentence description of the problem") + evidence: str = Field(description="Direct quote from the file demonstrating the problem") + severity: Literal["HIGH", "MEDIUM", "LOW"] = Field( + description="HIGH: fabricated data or missing critical analysis. MEDIUM: weak reasoning or vague claims. LOW: minor gaps." + ) + +class ProblemIdentificationResult(BaseModel): + problems: list[IdentifiedProblem] = Field(description="List of discrete problems found in the file") + +class UpstreamCheckResult(BaseModel): + found: bool = Field(description="True if this file contains the problem or a precursor to it") + evidence: str | None = Field(description="Direct quote from the file if found, null otherwise") + explanation: str = Field(description="How this connects to the downstream problem, or why this file is clean") + +class SourceCodeAnalysisResult(BaseModel): + likely_cause: str = Field(description="What in the prompt or logic likely caused the problem") + relevant_code_section: str = Field(description="The specific code or prompt text responsible") + suggestion: str = Field(description="How to fix or prevent this problem") +``` + +Three prompt-building functions, each returning a `list[ChatMessage]`: + +**`build_problem_identification_messages(filename, file_content, user_problem_description)`** + +System message: +``` +You are analyzing an intermediary file from a project planning pipeline. +The user has identified problems in this output. Identify each discrete problem. +For each problem, provide a short description, a direct quote as evidence, and a severity level. +Only identify real problems — do not flag stylistic preferences or minor formatting issues. +``` + +User message contains the filename, file content, and the user's problem description. + +**`build_upstream_check_messages(problem_description, evidence_quote, upstream_filename, upstream_file_content)`** + +System message: +``` +You are tracing a problem through a project planning pipeline to find where it originated. +A downstream file contains a problem. You are examining an upstream file that was an input +to the stage that produced the problematic output. Determine if this upstream file contains +the same problem or a precursor to it. +``` + +User message contains the problem details and the upstream file content. + +**`build_source_code_analysis_messages(problem_description, evidence_quote, source_code_contents)`** + +System message: +``` +A problem was introduced at this pipeline stage. The problem exists in its output but NOT +in any of its inputs. Examine the source code to identify what in the prompt text, +logic, or processing likely caused this problem. Be specific — point to lines or prompt phrases. +``` + +User message contains the problem details and the concatenated source code. + +### `tracer.py` — Recursive Tracing Algorithm + +```python +class RootCauseAnalyzer: + def __init__(self, output_dir: Path, llm_executor: LLMExecutor, source_code_base: Path, max_depth: int = 15, verbose: bool = False): + ... + + def trace(self, starting_file: str, problem_description: str) -> RCAResult: + """Main entry point. Returns the complete trace result.""" + ... +``` + +The `trace` method implements three phases: + +**Phase 1 — Identify problems.** +Read the starting file. Build the problem identification prompt with the file content and user's description. Call the LLM via `LLMExecutor.run()` using `llm.as_structured_llm(ProblemIdentificationResult)`. Returns a list of `IdentifiedProblem` objects. + +**Phase 2 — Recursive upstream trace.** +For each identified problem, call `_trace_upstream(problem, node_name, current_file, depth)`: + +1. Look up the current node's upstream nodes via the registry. +2. For each upstream node, resolve its output files on disk. +3. Read each upstream file. Build the upstream check prompt. Call the LLM. +4. If `found=True`: append to the trace chain and recurse into that node's upstream dependencies. +5. If `found=False`: this branch is clean, stop. +6. If depth reaches `max_depth`: stop and mark trace as incomplete. + +**Deduplication:** Track which `(node_name, problem_description)` pairs have already been analyzed. If two problems converge on the same upstream file, reuse the earlier result. + +**Multiple upstream branches:** When a node has multiple upstream inputs and the problem is found in more than one, follow all branches. The trace can fork — the JSON output represents this as a list of trace entries per problem (each entry has a node and file), ordered from downstream to upstream. + +**Phase 3 — Source code analysis at origin.** +When a problem is found in a node's output but not in any of its inputs, that node is the origin. Read the source code files for that node (via registry). Build the source code analysis prompt. Call the LLM. Attach the result to the problem's origin data. + +### `output.py` — Report Generation + +Two functions: + +**`write_json_report(result: RCAResult, output_path: Path)`** + +Writes the full trace as JSON: + +```json +{ + "input": { + "starting_file": "030-report.html", + "problem_description": "...", + "output_dir": "/path/to/output", + "timestamp": "2026-04-05T14:30:00Z" + }, + "problems": [ + { + "id": "problem_001", + "description": "Budget of CZK 500,000 is unvalidated", + "severity": "HIGH", + "starting_evidence": "quote from starting file...", + "trace": [ + { + "node": "executive_summary", + "file": "025-2-executive_summary.md", + "evidence": "...", + "is_origin": false + }, + { + "node": "make_assumptions", + "file": "003-5-make_assumptions.md", + "evidence": "...", + "is_origin": true + } + ], + "origin": { + "node": "make_assumptions", + "file": "003-5-make_assumptions.md", + "source_code_files": ["stages/make_assumptions.py", "assumption/make_assumptions.py"], + "likely_cause": "The prompt asks the LLM to...", + "suggestion": "Add a validation step that..." + }, + "depth": 2 + } + ], + "summary": { + "total_problems": 3, + "deepest_origin_node": "make_assumptions", + "deepest_origin_depth": 3, + "llm_calls_made": 12 + } +} +``` + +**`write_markdown_report(result: RCAResult, output_path: Path)`** + +Writes a human-readable report: + +```markdown +# Root Cause Analysis Report + +**Input:** 030-report.html +**Problems found:** 3 +**Deepest origin:** make_assumptions (depth 3) + +--- + +## Problem 1 (HIGH): Budget of CZK 500,000 is unvalidated + +**Trace:** executive_summary -> project_plan -> **make_assumptions** (origin) + +| Node | File | Evidence | +|------|------|----------| +| executive_summary | 025-2-executive_summary.md | "The budget is CZK 500,000..." | +| project_plan | 005-2-project_plan.md | "Estimated budget: CZK 500,000..." | +| **make_assumptions** | 003-5-make_assumptions.md | "Assume total budget..." | + +**Root cause:** The prompt asks the LLM to generate budget assumptions +without requiring external data sources... + +**Suggestion:** Add a validation step that... +``` + +Problems are sorted by depth (deepest origin first) so the most upstream root cause appears at the top. + +### `__main__.py` — CLI Entry Point + +``` +python -m worker_plan_internal.rca \ + --dir /path/to/output \ + --file 030-report.html \ + --problem "The budget is CZK 500,000 but this number appears unvalidated..." \ + --output-dir /path/to/output \ + --max-depth 15 \ + --verbose +``` + +Arguments: +- `--dir` (required): Path to the output directory containing intermediary files. +- `--file` (required): Starting file to analyze, relative to `--dir`. +- `--problem` (required): Text description of the observed problem(s). +- `--output-dir` (optional): Where to write `root_cause_analysis.json` and `root_cause_analysis.md`. Defaults to `--dir`. +- `--max-depth` (optional): Maximum upstream hops per problem. Default 15. +- `--verbose` (optional): Print each LLM call and result to stderr as the trace runs. + +Orchestration: +1. Parse arguments. +2. Load model profile via `PlanExeLLMConfig.load()` and create `LLMExecutor` with priority-ordered models from the profile. +3. Create `RootCauseAnalyzer` instance. +4. Call `analyzer.trace(starting_file, problem_description)`. +5. Write JSON and markdown reports via `output.py`. +6. Print summary to stdout. + +## LLM Infrastructure Integration + +- **LLMExecutor** with `LLMModelFromName.from_names()` for multi-model fallback. +- **Pydantic models** with `llm.as_structured_llm()` for all three prompt types. +- **Model profile** loaded from `PLANEXE_MODEL_PROFILE` environment variable (defaults to baseline). +- **RetryConfig** with defaults (2 retries, exponential backoff) for transient errors. +- **`max_validation_retries=1`** to allow one structured output retry with feedback on parse failure. + +## Scope Boundaries + +**In scope:** +- CLI tool with `--dir`, `--file`, `--problem`, `--output-dir`, `--max-depth`, `--verbose`. +- Static registry of all ~48 pipeline stages with dependencies and source code paths. +- Recursive depth-first upstream tracing with three LLM prompt types. +- JSON + markdown output sorted by trace depth. +- Source code analysis only at origin stages (lazy evaluation). +- Full file contents sent to LLM (no chunking or summarization). + +**Out of scope (future work):** +- Library/module API (CLI first, refactor later). +- Integration as a Luigi pipeline stage. +- Approach B (full reverse-topological sweep). +- Approach C (scout-then-trace optimization). +- Automatic registry generation from Luigi task introspection. +- UI/web integration. diff --git a/worker_plan/worker_plan_internal/extract_dag.py b/worker_plan/worker_plan_internal/extract_dag.py index 642e2727..eb0665bf 100644 --- a/worker_plan/worker_plan_internal/extract_dag.py +++ b/worker_plan/worker_plan_internal/extract_dag.py @@ -135,29 +135,75 @@ def _detect_implementation_files(cls: type) -> list[str]: return files -def _extract_source_files(task: luigi.Task) -> list[str]: - """Get source files: task's own file + auto-detected implementation files.""" +def _extract_source_files(task: luigi.Task) -> list[dict[str, str]]: + """Get source files: workflow node file + auto-detected business logic files.""" cls = type(task) + files: list[dict[str, str]] = [] - # The task's own file - result: list[str] = [] + # The task's own file (workflow node) try: task_file = Path(inspect.getfile(cls)).resolve() - result.append(str(task_file.relative_to(_WORKER_PLAN_DIR))) + files.append({ + "role": "workflow_node", + "path": str(task_file.relative_to(_WORKER_PLAN_DIR)), + }) except (TypeError, ValueError, OSError): pass - # Supplement with auto-detected implementation files - for f in _detect_implementation_files(cls): - if f not in result: - result.append(f) + # Auto-detected implementation files (business logic) + seen = {f["path"] for f in files} + for path in _detect_implementation_files(cls): + if path not in seen: + files.append({ + "role": "business_logic", + "path": path, + }) + seen.add(path) + + return files + + +def _pick_primary_output(filenames: list[str]) -> str: + """Pick the most likely file to be read from a node's outputs. + + Preference: .md > .html > non-raw file > first file. + """ + for ext in (".md", ".html"): + for f in filenames: + if f.endswith(ext): + return f + non_raw = [f for f in filenames if "_raw" not in f] + if non_raw: + return non_raw[0] + return filenames[0] if filenames else "" + + +def _extract_inputs(upstream_tasks: list[luigi.Task]) -> list[dict[str, str]]: + """Build inputs list: for each upstream task, identify the primary artifact it provides.""" + inputs: list[dict[str, str]] = [] + seen: set[str] = set() + + for dep in upstream_tasks: + node_name = _class_name_to_stage_name(dep.__class__.__name__) + if node_name in seen: + continue + seen.add(node_name) + + output_files = _extract_output_filenames(dep) + primary = _pick_primary_output(output_files) + if primary: + inputs.append({ + "from_node": node_name, + "artifact_path": primary, + }) - return result + inputs.sort(key=lambda x: x["from_node"]) + return inputs def _output_sort_key(stage: dict[str, Any]) -> tuple[int, int, str]: """Sort key: numeric prefix from the first output filename, then name.""" - filename = stage["output_files"][0] if stage.get("output_files") else "" + filename = stage["artifacts"][0]["path"] if stage.get("artifacts") else "" match = re.match(r"(\d+)-?(\d+)?", filename) if match: major = int(match.group(1)) @@ -197,18 +243,15 @@ def _walk(task: luigi.Task) -> None: cls = type(task) stage_name = _class_name_to_stage_name(class_name) description = cls.description() if hasattr(cls, "description") else "" - output_files = _extract_output_filenames(task) + artifacts = [{"path": f} for f in _extract_output_filenames(task)] + inputs = _extract_inputs(upstream_tasks) source_files = _extract_source_files(task) - depends_on_names = sorted(set( - _class_name_to_stage_name(dep.__class__.__name__) - for dep in upstream_tasks - )) stages.append({ "id": stage_name, "description": description, - "output_files": output_files, - "depends_on": depends_on_names, + "artifacts": artifacts, + "inputs": inputs, "source_files": source_files, }) diff --git a/worker_plan/worker_plan_internal/rca/AGENTS.md b/worker_plan/worker_plan_internal/rca/AGENTS.md new file mode 100644 index 00000000..f7ec4f64 --- /dev/null +++ b/worker_plan/worker_plan_internal/rca/AGENTS.md @@ -0,0 +1,97 @@ +# Root Cause Analysis (RCA) — Status and Known Issues + +## What works well + +- **DAG is auto-generated.** The registry builds from `extract_dag.py` via Luigi task introspection at import time — no hand-maintained mapping. Adding, removing, or renaming pipeline nodes requires zero manual updates. +- **Phase 1 anchors to the user's problem.** The user's specific problem is always the first result, with additional problems limited to the same family. +- **Upstream checks require causal links.** The prompt requires the LLM to explain *how* upstream content caused the downstream problem, not just topical overlap. This produces tighter, more accurate traces. +- **Phase 3 classifies root causes.** Each origin is categorized as `prompt_fixable`, `domain_complexity`, or `missing_input`. Verified: the India census caste enumeration problem is correctly classified as `domain_complexity`, while the workforce feasibility problem is `prompt_fixable`. +- **Evidence quotes are concise.** Both Phase 1 and Phase 2 prompts instruct the LLM to keep quotes under 200 characters. +- **Source code filenames are disambiguated.** Shows `nodes/identify_purpose.py` and `assume/identify_purpose.py` instead of duplicate bare filenames. +- **Depth sorting is useful.** Deepest root causes appear first, matching the user's intent of finding the earliest upstream origin. +- **Events.jsonl enables live monitoring.** Users can `tail -f events.jsonl` to watch progress instead of waiting blindly. +- **Focused output.** A typical run finds 2-3 problems in the same family and makes 15-30 LLM calls (down from 17 problems / 153 calls before prompt improvements). +- **DAG schema is rich.** Each node has artifacts, inputs (with from_node + artifact_path), and source_files (with role: workflow_node or business_logic). This enables artifact-level provenance tracing. + +## Fixed issues + +### Phase 1 didn't anchor to user's problem (was HIGH, fixed) + +The Phase 1 prompt now requires the user's specific problem as the first item, with additional problems limited to the same family. Before the fix, the LLM would ignore the user's problem and identify unrelated issues. + +### Upstream checks were too loose (was MEDIUM, fixed) + +The Phase 2 prompt now requires a causal mechanism ("how did this upstream content lead to the downstream problem?") and explicitly rejects topical overlap. Before the fix, the LLM would say "found" whenever an upstream file discussed a related topic. + +### Evidence quotes were too long (was MEDIUM, fixed) + +Both Phase 1 and Phase 2 prompts now instruct "keep quotes under 200 characters." Before the fix, evidence fields contained entire JSON objects (100+ lines). + +### Phase 3 always blamed the prompt (was MEDIUM, fixed) + +Phase 3 now classifies each root cause into one of three categories: +- **prompt_fixable** — the prompt has a gap that can be edited (e.g., "list specific permits with lead times") +- **domain_complexity** — inherently uncertain or contentious, no prompt change resolves it (e.g., caste enumeration politics in India) +- **missing_input** — the user's plan didn't provide enough detail + +Before the fix, every suggestion was "modify the system prompt" even when the real issue was domain complexity. Verified on India census run: caste enumeration correctly classified as `domain_complexity`, workforce feasibility as `prompt_fixable`. + +### Duplicate source code filenames (was LOW, fixed) + +Source code paths now include the parent directory (`nodes/identify_purpose.py`) to disambiguate files with the same name in different packages. + +### Static registry drifted (was MEDIUM, fixed) + +The DAG registry was a 780-line hand-maintained copy of the pipeline topology. Now replaced with `extract_dag.py` which introspects the Luigi task graph at import time. Zero maintenance needed when pipeline changes. + +## Open issues + +### MEDIUM: Non-determinism untested + +This is LLM judging LLM output. Every upstream check is a subjective call. Two runs on the same input may produce different traces. We haven't tested reproducibility — run the same input 3 times and compare. If traces diverge significantly, consider requiring higher-confidence matches or running multiple passes and intersecting results. + +### LOW: First-match-wins may miss parallel origins + +The `_trace_upstream` method follows only the first upstream branch where the problem is found. Real problems often have multiple contributing causes from parallel branches, but only one is traced. The trace looks clean and linear, but reality is messier. + +**Fix direction:** Add a `--thorough` mode that follows all branches where the problem is found, producing a tree instead of a chain. + +### LOW: Problem convergence on same origin + +After prompt tightening, convergence makes sense — problems in the same family naturally trace to the same origin. Monitor across more diverse runs. + +### LOW: Artifact-level only, not claim-level + +The tool traces at the artifact level (which file introduced the problem) but cannot yet attribute individual sentences to specific input spans. See `docs/proposals/133-dag-and-rca.md` for the gap analysis and future directions. + +## Test runs completed + +1. **India census v1** (`20250101_india_census`): Old prompts. 17 problems, 153 LLM calls, deepest origin: `potential_levers` (depth 6). Problems not anchored to user input, traces loose, evidence bloated. + +2. **Minecraft escape v1** (`20251016_minecraft_escape`): Old prompts. Problem about zoning/permits. 5 problems, 43 LLM calls. User's problem not identified. Exposed Phase 1 anchoring problem. + +3. **Minecraft escape v2** (`20251016_minecraft_escape`): New prompts. 3 problems, 31 LLM calls, deepest origin: `identify_risks` (depth 5). User's problem correctly identified as problem_001. All problems in same family (regulatory gaps). + +4. **India census v2** (`20250101_india_census`): New prompts. 2 problems (down from 17), 17 LLM calls (down from 153), deepest origin: `potential_levers` (depth 6). User's problem correctly identified. Exposed Phase 3 "always blames prompt" limitation. + +5. **India census v3** (`20250101_india_census`): New prompts + Phase 3 classification. 2 problems, 17 LLM calls. Caste enumeration correctly classified as `domain_complexity`. Workforce feasibility correctly classified as `prompt_fixable`. All fixes verified working. + +## Honest assessment + +The tool is a useful diagnostic prototype for root cause analysis. The trace chains are the most trustworthy part — they're mechanically grounded in the DAG structure. The suggestions are LLM opinions — useful starting points, not patches. + +The category classification (`prompt_fixable` / `domain_complexity` / `missing_input`) turned out to be the most valuable feature. It prevents wasted effort on problems that can't be fixed by prompt editing. + +The tool is diagnostic, not prescriptive. It tells you *where* a problem originated and *why*, but someone still has to decide what to do. It can't catch problems that don't leave textual evidence — timing issues, model-specific quirks, or structural DAG problems are invisible. + +Starting from `029-2-self_audit.md` is the sweet spot. That file already contains identified issues, so the tracer is tracing known problems upstream rather than discovering problems from scratch. + +Before relying on this for automated decisions (e.g., in the self-improve loop), it needs more diverse test runs (10+ plans) and reproducibility testing. + +## Architecture notes + +- The tool runs from `worker_plan/` directory using Python 3.11. +- The DAG registry is built from `extract_dag.py` at import time — no static data. +- LLM calls go through `LLMExecutor` with the active model profile (`PLANEXE_MODEL_PROFILE`). +- The `record_usage_metric called but no usage metrics path is set` warnings are harmless — the RCA tool doesn't set up the metrics path since it's a standalone CLI tool, not a pipeline task. +- The first-match-wins strategy in `_trace_upstream` means only one upstream branch is followed per problem. If the problem exists in multiple upstream branches, only the first one encountered is traced. diff --git a/worker_plan/worker_plan_internal/rca/README.md b/worker_plan/worker_plan_internal/rca/README.md new file mode 100644 index 00000000..df83c481 --- /dev/null +++ b/worker_plan/worker_plan_internal/rca/README.md @@ -0,0 +1,154 @@ +# Root Cause Analysis (RCA) for PlanExe + +Given a problem observed in a pipeline output, this tool traces upstream through the DAG of intermediary artifacts to find where the problem originated and classify its root cause. The classification (`prompt_fixable`, `domain_complexity`, `missing_input`) tells the self-improve loop whether a problem can be fixed automatically by editing a prompt, or whether it requires human input or is an inherent domain limitation. + +## How it works + +PlanExe runs a DAG of ~70 nodes. Each node reads upstream artifacts, calls an LLM, and writes output artifacts (prefixed `001-` through `030-`). Problems introduced early propagate downstream into later nodes and the final report. + +The DAG structure is extracted automatically from the Luigi task graph by `extract_dag.py` — no hand-maintained registry needed. The registry builds at import time via Luigi task introspection. + +The tool performs a recursive depth-first search: + +1. **Phase 1 — Identify problems.** Reads the starting artifact and locates the specific problem you described, plus any closely related problems in the same family. +2. **Phase 2 — Trace upstream.** For each problem, walks upstream through the DAG one hop at a time, asking the LLM whether the problem was *caused by* content in each input artifact (requires causal link, not just topical overlap). Continues until it finds a node where the problem exists in the output but not in any inputs. +3. **Phase 3 — Analyze source code and classify.** At the origin node, reads the Python source code and classifies the root cause: + - **Prompt fixable** — the prompt has a gap that can be fixed by editing it + - **Domain complexity** — the topic is inherently uncertain or contentious, no prompt change resolves it + - **Missing input** — the user's plan prompt didn't provide enough detail + +Output is a JSON file (`root_cause_analysis.json`), a markdown report (`root_cause_analysis.md`), and a live event log (`events.jsonl`), sorted by trace depth so the deepest root cause appears first. + +## DAG integration + +The pipeline DAG is defined in `extract_dag.py` which introspects the actual Luigi task graph at import time. Each node in the DAG provides: +- **artifacts** — output files the node produces +- **inputs** — which upstream node and specific artifact each node reads +- **source_files** — the workflow node file and business logic files + +This means the RCA tool always stays in sync with the pipeline — no manual registry updates needed when nodes are added, removed, or renamed. + +## Prerequisites + +- Python 3.11 (`/opt/homebrew/bin/python3.11` on macOS with Homebrew) +- An LLM configured via `PLANEXE_MODEL_PROFILE` environment variable (defaults to `baseline`) +- API key for your LLM provider (e.g., `OPENROUTER_API_KEY`) + +## Usage + +All commands are run from the `worker_plan/` directory: + +```bash +cd worker_plan +``` + +Basic usage: + +```bash +/opt/homebrew/bin/python3.11 -m worker_plan_internal.rca \ + --dir /path/to/output \ + --file 030-report.html \ + --problem "Description of the problem you observed" \ + --verbose +``` + +### Arguments + +| Argument | Required | Description | +|----------|----------|-------------| +| `--dir` | Yes | Path to the output directory containing intermediary artifacts | +| `--file` | Yes | Starting artifact to analyze (relative to `--dir`) | +| `--problem` | Yes | Text description of the observed problem(s) | +| `--output-dir` | No | Where to write reports (defaults to `--dir`) | +| `--max-depth` | No | Maximum upstream hops per problem (default: 15) | +| `--verbose` | No | Print each LLM call to stderr as the trace runs | + +### Starting files + +You can start from any intermediary artifact. Common starting points: + +| File | What it is | +|------|------------| +| `030-report.html` | The final HTML report (largest, most problems to find) | +| `029-2-self_audit.md` | Self-audit (already identifies issues — good for tracing them back) | +| `025-2-executive_summary.md` | Executive summary | +| `024-2-review_plan.md` | Plan review | +| `028-2-premortem.md` | Premortem analysis | + +### Examples + +Trace a problem from the self-audit: + +```bash +/opt/homebrew/bin/python3.11 -m worker_plan_internal.rca \ + --dir /path/to/output/20250101_india_census \ + --file 029-2-self_audit.md \ + --problem "No Real-World Proof. The plan combines a digital census with caste enumeration at an unprecedented scale, lacking independent evidence of success." \ + --output-dir /tmp/rca-analysis \ + --verbose +``` + +Trace a zoning/permits problem: + +```bash +/opt/homebrew/bin/python3.11 -m worker_plan_internal.rca \ + --dir /path/to/output/20251016_minecraft_escape \ + --file 029-2-self_audit.md \ + --problem "Infeasible Constraints Rated MEDIUM because the plan mentions zoning and permits but lacks specifics for the Shanghai location." \ + --output-dir /tmp/rca-analysis2 \ + --verbose +``` + +### Monitoring progress + +While the tracer runs, watch the live event log in another terminal: + +```bash +tail -f /tmp/rca-analysis/events.jsonl +``` + +### Output + +Each run produces three files in `--output-dir` (or `--dir` if not specified): + +- `root_cause_analysis.json` — machine-readable trace with full details +- `root_cause_analysis.md` — human-readable report with trace tables +- `events.jsonl` — live event log for monitoring progress + +Problems are sorted by trace depth (deepest root cause first). Each problem's origin includes a **category** (`prompt_fixable`, `domain_complexity`, or `missing_input`) so you know whether the fix is a prompt edit, a domain limitation to accept, or a need for more detail in the plan input. + +A typical run finds 2-3 focused problems and makes 15-30 LLM calls. + +## RCA investigation strategy + +The tool implements the investigation strategy described in `docs/proposals/133-dag-and-rca.md`: + +1. Start from the final artifact (e.g., `030-report.html`) +2. Inspect direct input artifacts to the producing node +3. Search those artifacts for the false claim or problem +4. When found upstream, recurse into that node's inputs +5. Continue until reaching the earliest artifact containing the problem +6. Inspect the producing node's source files +7. Classify the failure mode + +## Tips + +- **Start from `029-2-self_audit.md`.** This file already contains identified issues, so you're tracing *known* problems upstream rather than asking the LLM to find problems from scratch. +- **Trust the trace chains more than the suggestions.** The upstream path (which nodes the problem passed through) is mechanically grounded in the DAG. The suggestions are LLM opinions — useful starting points, not patches. +- **Check the category before acting.** If the origin is `domain_complexity`, don't spend time tweaking the prompt. If it's `prompt_fixable`, the suggestion is likely actionable. +- **Results are non-deterministic.** This is LLM judging LLM output. Two runs on the same input may produce slightly different traces. If a finding matters, run it twice. + +## Limitations + +- **LLM subjectivity.** Every hop in the trace is a judgment call by the LLM ("did this upstream artifact cause the downstream problem?"). The causal-link requirement helps, but it's still one LLM's opinion. +- **First-match-wins.** When a problem has precursors in multiple parallel upstream branches, only the first branch found is followed. Real problems often have multiple contributing causes. +- **Text-only.** The tracer can only catch problems that leave textual evidence in intermediary artifacts. Timing issues, model-specific quirks, or structural DAG problems are invisible to it. +- **Artifact-level, not claim-level.** The tool can identify which artifact and node likely introduced a problem, but cannot yet prove which exact sentence transformation introduced a specific false claim (see `docs/proposals/133-dag-and-rca.md` for the gap analysis). +- **Diagnostic, not prescriptive.** It tells you *where* and *why*, but someone still has to decide what to do about it. + +## Running tests + +```bash +cd worker_plan +/opt/homebrew/bin/python3.11 -m pytest worker_plan_internal/rca/tests/ -v +``` diff --git a/worker_plan/worker_plan_internal/rca/__init__.py b/worker_plan/worker_plan_internal/rca/__init__.py new file mode 100644 index 00000000..adec8b49 --- /dev/null +++ b/worker_plan/worker_plan_internal/rca/__init__.py @@ -0,0 +1 @@ +"""RCA — Root-cause analysis for PlanExe reports.""" diff --git a/worker_plan/worker_plan_internal/rca/__main__.py b/worker_plan/worker_plan_internal/rca/__main__.py new file mode 100644 index 00000000..aea03cf9 --- /dev/null +++ b/worker_plan/worker_plan_internal/rca/__main__.py @@ -0,0 +1,111 @@ +# worker_plan/worker_plan_internal/rca/__main__.py +"""CLI entry point for RCA (root cause analysis). + +Usage: + python -m worker_plan_internal.rca \ + --dir /path/to/output \ + --file 030-report.html \ + --problem "The budget appears unvalidated..." \ + --output-dir /path/to/output \ + --max-depth 15 \ + --verbose +""" +import argparse +import sys +from pathlib import Path + +from worker_plan_internal.rca.tracer import RootCauseAnalyzer +from worker_plan_internal.rca.output import write_json_report, write_markdown_report +from worker_plan_internal.llm_util.llm_executor import LLMExecutor, LLMModelFromName, RetryConfig +from worker_plan_internal.llm_factory import get_llm_names_by_priority + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Trace problems in PlanExe reports upstream to their root cause.", + ) + parser.add_argument( + "--dir", required=True, type=Path, + help="Path to the output directory containing intermediary files", + ) + parser.add_argument( + "--file", required=True, + help="Starting file to analyze (relative to --dir)", + ) + parser.add_argument( + "--problem", required=True, + help="Text description of the observed problem(s)", + ) + parser.add_argument( + "--output-dir", type=Path, default=None, + help="Where to write root_cause_analysis.json and root_cause_analysis.md (defaults to --dir)", + ) + parser.add_argument( + "--max-depth", type=int, default=15, + help="Maximum upstream hops per problem (default: 15)", + ) + parser.add_argument( + "--verbose", action="store_true", + help="Print each LLM call and result to stderr", + ) + args = parser.parse_args() + + output_dir: Path = args.dir.resolve() + if not output_dir.is_dir(): + print(f"Error: --dir is not a directory: {output_dir}", file=sys.stderr) + sys.exit(1) + + starting_file = args.file + if not (output_dir / starting_file).exists(): + print(f"Error: starting file not found: {output_dir / starting_file}", file=sys.stderr) + sys.exit(1) + + report_dir: Path = (args.output_dir or args.dir).resolve() + report_dir.mkdir(parents=True, exist_ok=True) + + # Set up LLM executor with priority-ordered models from the active profile + llm_names = get_llm_names_by_priority() + if not llm_names: + print("Error: no LLM models configured. Check PLANEXE_MODEL_PROFILE.", file=sys.stderr) + sys.exit(1) + + llm_models = LLMModelFromName.from_names(llm_names) + executor = LLMExecutor( + llm_models=llm_models, + retry_config=RetryConfig(max_retries=2), + max_validation_retries=1, + ) + + events_path = report_dir / "events.jsonl" + + tracer = RootCauseAnalyzer( + output_dir=output_dir, + llm_executor=executor, + max_depth=args.max_depth, + verbose=args.verbose, + events_path=events_path, + ) + + print(f"Tracing problems in {starting_file}...", file=sys.stderr) + result = tracer.trace(starting_file, args.problem) + + # Write reports + json_path = report_dir / "root_cause_analysis.json" + md_path = report_dir / "root_cause_analysis.md" + write_json_report(result, json_path) + write_markdown_report(result, md_path) + + # Print summary + print(f"\nProblems found: {len(result.problems)}", file=sys.stderr) + if result.problems: + deepest = max(result.problems, key=lambda p: p.depth) + print(f"Deepest origin: {deepest.origin_node} (depth {deepest.depth})", file=sys.stderr) + print(f"LLM calls made: {result.llm_calls_made}", file=sys.stderr) + print(f"\nReports written:", file=sys.stderr) + print(f" JSON: {json_path}", file=sys.stderr) + print(f" Markdown: {md_path}", file=sys.stderr) + print(f" Events: {events_path}", file=sys.stderr) + + +if __name__ == "__main__": + main() diff --git a/worker_plan/worker_plan_internal/rca/output.py b/worker_plan/worker_plan_internal/rca/output.py new file mode 100644 index 00000000..7f3c2dbf --- /dev/null +++ b/worker_plan/worker_plan_internal/rca/output.py @@ -0,0 +1,141 @@ +# worker_plan/worker_plan_internal/rca/output.py +"""JSON and markdown report generation for root cause analysis results.""" +import json +from datetime import datetime, UTC +from pathlib import Path + +from worker_plan_internal.rca.tracer import RCAResult + + +def write_json_report(result: RCAResult, output_path: Path) -> None: + """Write the RCA result as a JSON file.""" + data = { + "input": { + "starting_file": result.starting_file, + "problem_description": result.problem_description, + "output_dir": result.output_dir, + "timestamp": datetime.now(UTC).isoformat(), + }, + "problems": [], + "summary": { + "total_problems": len(result.problems), + "deepest_origin_node": None, + "deepest_origin_depth": 0, + "llm_calls_made": result.llm_calls_made, + }, + } + + max_depth = 0 + deepest_node = None + + for problem in result.problems: + problem_data = { + "id": problem.id, + "description": problem.description, + "severity": problem.severity, + "starting_evidence": problem.starting_evidence, + "trace": [ + { + "node": entry.node, + "file": entry.file, + "evidence": entry.evidence, + "is_origin": entry.is_origin, + } + for entry in problem.trace + ], + "origin": None, + "depth": problem.depth, + "trace_complete": problem.trace_complete, + } + + if problem.origin: + problem_data["origin"] = { + "node": problem.origin.node, + "file": problem.origin.file, + "source_code_files": problem.origin.source_code_files, + "category": problem.origin.category, + "likely_cause": problem.origin.likely_cause, + "suggestion": problem.origin.suggestion, + } + + if problem.depth > max_depth: + max_depth = problem.depth + deepest_node = problem.origin_node + + data["problems"].append(problem_data) + + data["problems"].sort(key=lambda p: p["depth"], reverse=True) + data["summary"]["deepest_origin_node"] = deepest_node + data["summary"]["deepest_origin_depth"] = max_depth + + output_path.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8") + + +def write_markdown_report(result: RCAResult, output_path: Path) -> None: + """Write the RCA result as a markdown report.""" + lines: list[str] = [] + lines.append("# Root Cause Analysis Report") + lines.append("") + lines.append(f"**Input:** {result.starting_file}") + lines.append(f"**Problems found:** {len(result.problems)}") + + if result.problems: + deepest = max(result.problems, key=lambda p: p.depth) + lines.append(f"**Deepest origin:** {deepest.origin_node} (depth {deepest.depth})") + lines.append(f"**LLM calls:** {result.llm_calls_made}") + lines.append("") + + sorted_problems = sorted(result.problems, key=lambda p: p.depth, reverse=True) + for problem in sorted_problems: + lines.append("---") + lines.append("") + lines.append(f"## {problem.id.replace('_', ' ').title()} ({problem.severity}): {problem.description}") + lines.append("") + + # Trace chain summary + node_names = [entry.node for entry in problem.trace] + chain_parts = [] + for name in node_names: + if name == problem.origin_node: + chain_parts.append(f"**{name}** (origin)") + else: + chain_parts.append(name) + lines.append(f"**Trace:** {' -> '.join(chain_parts)}") + lines.append("") + + if not problem.trace_complete: + lines.append("*Note: trace incomplete — max depth reached.*") + lines.append("") + + # Trace table + lines.append("| Node | File | Evidence |") + lines.append("|-------|------|----------|") + for entry in problem.trace: + node_cell = f"**{entry.node}**" if entry.is_origin else entry.node + evidence_cell = _escape_table_cell(entry.evidence) + lines.append(f"| {node_cell} | {entry.file} | {evidence_cell} |") + lines.append("") + + # Origin analysis + if problem.origin: + category_labels = { + "prompt_fixable": "Prompt fixable", + "domain_complexity": "Domain complexity", + "missing_input": "Missing input", + } + category_label = category_labels.get(problem.origin.category, problem.origin.category) + lines.append(f"**Category:** {category_label}") + lines.append("") + lines.append(f"**Root cause:** {problem.origin.likely_cause}") + lines.append("") + lines.append(f"**Source files:** {', '.join(problem.origin.source_code_files)}") + lines.append("") + lines.append(f"**Suggestion:** {problem.origin.suggestion}") + lines.append("") + + output_path.write_text("\n".join(lines), encoding="utf-8") + + +def _escape_table_cell(text: str) -> str: + """Escape pipe characters and collapse newlines for markdown table cells.""" + return text.replace("|", "\\|").replace("\n", " ") diff --git a/worker_plan/worker_plan_internal/rca/prompts.py b/worker_plan/worker_plan_internal/rca/prompts.py new file mode 100644 index 00000000..93173dc6 --- /dev/null +++ b/worker_plan/worker_plan_internal/rca/prompts.py @@ -0,0 +1,152 @@ +# worker_plan/worker_plan_internal/rca/prompts.py +"""Pydantic models and prompt builders for RCA.""" +from typing import Literal +from pydantic import BaseModel, Field +from llama_index.core.llms import ChatMessage, MessageRole + + +# -- Pydantic models for structured LLM output -------------------------------- + +class IdentifiedProblem(BaseModel): + """A discrete problem found in a pipeline output file.""" + description: str = Field(description="One-sentence description of the problem") + evidence: str = Field(description="Direct quote from the file demonstrating the problem") + severity: Literal["HIGH", "MEDIUM", "LOW"] = Field( + description="HIGH: fabricated data or missing critical analysis. MEDIUM: weak reasoning or vague claims. LOW: minor gaps." + ) + + +class ProblemIdentificationResult(BaseModel): + """Result of analyzing a file for problems.""" + problems: list[IdentifiedProblem] = Field(description="List of discrete problems found in the file") + + +class UpstreamCheckResult(BaseModel): + """Result of checking an upstream file for a problem precursor.""" + found: bool = Field(description="True if this file contains the problem or a precursor to it") + evidence: str | None = Field(description="Direct quote from the file if found, null otherwise") + explanation: str = Field(description="How this connects to the downstream problem, or why this file is clean") + + +class SourceCodeAnalysisResult(BaseModel): + """Result of analyzing source code at a problem's origin node.""" + category: Literal["prompt_fixable", "domain_complexity", "missing_input"] = Field( + description=( + "prompt_fixable: the prompt forgot to ask for something or has a gap that can be fixed by editing the prompt. " + "domain_complexity: the topic is inherently uncertain, contentious, or requires domain expertise that no prompt change can resolve. " + "missing_input: the user's plan prompt didn't provide enough context for the pipeline to work with." + ) + ) + likely_cause: str = Field(description="What in the prompt, logic, or domain caused the problem") + relevant_code_section: str = Field(description="The specific code or prompt text responsible") + suggestion: str = Field(description="How to fix or prevent this problem") + + +# -- Prompt builders ----------------------------------------------------------- + +def build_problem_identification_messages( + filename: str, + file_content: str, + user_problem_description: str, +) -> list[ChatMessage]: + """Build messages for Phase 1: identifying discrete problems in a file.""" + system = ( + "You are analyzing an intermediary file from a project planning pipeline.\n" + "The user has described a specific problem they observed. Your job:\n\n" + "1. FIRST, locate the user's specific problem in the file. Find the passage that " + "corresponds to what the user described. This problem MUST be the first item in your list.\n" + "2. THEN, identify any additional discrete problems that are closely related to the " + "user's concern (e.g., other instances of the same problem pattern, or problems that " + "share the same root cause). Do NOT list every possible problem in the file — only " + "those connected to what the user raised.\n\n" + "For each problem, provide a short description (one sentence), a direct quote " + "from the file as evidence (keep quotes under 200 characters), and a severity level.\n" + "Only identify real problems — do not flag stylistic preferences or minor formatting issues.\n" + "Severity levels:\n" + "- HIGH: fabricated data, invented statistics, or missing critical analysis\n" + "- MEDIUM: weak reasoning, vague unsupported claims, or shallow treatment\n" + "- LOW: minor gaps that don't significantly impact the plan" + ) + user = ( + f"User's problem description:\n{user_problem_description}\n\n" + f"Filename: {filename}\n" + f"File content:\n{file_content}" + ) + return [ + ChatMessage(role=MessageRole.SYSTEM, content=system), + ChatMessage(role=MessageRole.USER, content=user), + ] + + +def build_upstream_check_messages( + problem_description: str, + evidence_quote: str, + upstream_filename: str, + upstream_file_content: str, +) -> list[ChatMessage]: + """Build messages for Phase 2: checking if a problem exists in an upstream file.""" + system = ( + "You are tracing a problem through a project planning pipeline to find where it originated.\n" + "A downstream file contains a problem. You are examining an upstream file that was an input " + "to the node that produced the problematic output.\n\n" + "Determine if this upstream file CAUSED or CONTRIBUTED to the downstream problem.\n" + "This means the upstream file contains content that was carried forward, transformed, " + "or amplified into the downstream problem. Merely discussing a related topic is NOT enough.\n\n" + "If YES: quote the specific sentence or phrase (under 200 characters) and explain " + "the causal mechanism — how this upstream content led to the downstream problem.\n" + "If NO: explain why this file is clean regarding this specific problem.\n\n" + "Be strict. Only say YES if you can identify a clear causal link, not just topical overlap." + ) + user = ( + f"Problem: {problem_description}\n" + f"Evidence from downstream: {evidence_quote}\n\n" + f"Upstream filename: {upstream_filename}\n" + f"Upstream file content:\n{upstream_file_content}" + ) + return [ + ChatMessage(role=MessageRole.SYSTEM, content=system), + ChatMessage(role=MessageRole.USER, content=user), + ] + + +def build_source_code_analysis_messages( + problem_description: str, + evidence_quote: str, + source_code_contents: list[tuple[str, str]], +) -> list[ChatMessage]: + """Build messages for Phase 3: analyzing source code at problem origin. + + Args: + source_code_contents: list of (filename, content) tuples + """ + system = ( + "A problem was introduced at this pipeline node. The problem exists in its output " + "but NOT in any of its inputs, so this node created it.\n\n" + "First, classify the root cause into one of three categories:\n" + "- prompt_fixable: The prompt has a gap or oversight that can be fixed by editing " + "the prompt text. Example: the prompt asks for budget estimates but doesn't require " + "sourcing or validation.\n" + "- domain_complexity: The topic is inherently uncertain, politically sensitive, or " + "requires specialized domain expertise that no prompt change can fully resolve. " + "Example: caste enumeration in India is politically contentious regardless of how " + "the prompt is worded.\n" + "- missing_input: The user's original plan description didn't provide enough detail " + "for this node to produce quality output. Example: the plan says 'open a shop' " + "without specifying location, budget, or target market.\n\n" + "Then examine the source code to identify the specific cause. Be specific — point " + "to lines or prompt phrases. Focus on the system prompt text and data transformation logic." + ) + source_sections = [] + for fname, content in source_code_contents: + source_sections.append(f"--- {fname} ---\n{content}") + source_text = "\n\n".join(source_sections) + + user = ( + f"Problem: {problem_description}\n" + f"Evidence from output: {evidence_quote}\n\n" + f"Source code files:\n{source_text}" + ) + return [ + ChatMessage(role=MessageRole.SYSTEM, content=system), + ChatMessage(role=MessageRole.USER, content=user), + ] diff --git a/worker_plan/worker_plan_internal/rca/registry.py b/worker_plan/worker_plan_internal/rca/registry.py new file mode 100644 index 00000000..3db15708 --- /dev/null +++ b/worker_plan/worker_plan_internal/rca/registry.py @@ -0,0 +1,89 @@ +# worker_plan/worker_plan_internal/rca/registry.py +"""DAG registry for RCA, built from Luigi task introspection. + +Replaces the former hand-maintained static registry with data extracted +from the actual pipeline via extract_dag. The public API is unchanged: + - find_node_by_filename(filename) -> NodeInfo | None + - get_upstream_files(node_name, output_dir) -> list[tuple[str, Path]] + - get_source_code_paths(node_name) -> list[Path] +""" +from dataclasses import dataclass +from pathlib import Path + +from worker_plan_internal.extract_dag import extract_dag + +# Base path for source code, relative to worker_plan/ +_SOURCE_BASE = Path(__file__).resolve().parent.parent.parent # worker_plan/ + + +@dataclass(frozen=True) +class NodeInput: + """One input to a pipeline node: the upstream node name and the artifact it provides.""" + from_node: str + artifact_path: str + + +@dataclass(frozen=True) +class NodeInfo: + """One pipeline node.""" + name: str + output_files: tuple[str, ...] + inputs: tuple[NodeInput, ...] = () + source_code_files: tuple[str, ...] = () + + +def _build_registry() -> tuple[NodeInfo, ...]: + """Build the registry from Luigi task introspection.""" + dag = extract_dag() + nodes = [] + for entry in dag["nodes"]: + output_files = tuple(a["path"] for a in entry["artifacts"]) + inputs = tuple( + NodeInput(from_node=inp["from_node"], artifact_path=inp["artifact_path"]) + for inp in entry["inputs"] + ) + nodes.append(NodeInfo( + name=entry["id"], + output_files=output_files, + inputs=inputs, + source_code_files=tuple(f["path"] for f in entry["source_files"]), + )) + return tuple(nodes) + + +# ── Build once at import time ────────────────────────────────────────── + +NODES: tuple[NodeInfo, ...] = _build_registry() + +_NODE_BY_NAME: dict[str, NodeInfo] = {n.name: n for n in NODES} +_NODE_BY_FILENAME: dict[str, NodeInfo] = {} +for _node in NODES: + for _fname in _node.output_files: + _NODE_BY_FILENAME[_fname] = _node + + +def find_node_by_filename(filename: str) -> NodeInfo | None: + """Given an output filename, return the node that produced it.""" + return _NODE_BY_FILENAME.get(filename) + + +def get_upstream_files(node_name: str, output_dir: Path) -> list[tuple[str, Path]]: + """Return (node_name, file_path) pairs for upstream nodes whose artifact exists on disk.""" + node = _NODE_BY_NAME.get(node_name) + if node is None: + return [] + + result = [] + for inp in node.inputs: + artifact_path = output_dir / inp.artifact_path + if artifact_path.exists(): + result.append((inp.from_node, artifact_path)) + return result + + +def get_source_code_paths(node_name: str) -> list[Path]: + """Return absolute paths to source code files for a node.""" + node = _NODE_BY_NAME.get(node_name) + if node is None: + return [] + return [_SOURCE_BASE / f for f in node.source_code_files] diff --git a/worker_plan/worker_plan_internal/rca/tests/__init__.py b/worker_plan/worker_plan_internal/rca/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/worker_plan/worker_plan_internal/rca/tests/test_output.py b/worker_plan/worker_plan_internal/rca/tests/test_output.py new file mode 100644 index 00000000..315b4a40 --- /dev/null +++ b/worker_plan/worker_plan_internal/rca/tests/test_output.py @@ -0,0 +1,142 @@ +# worker_plan/worker_plan_internal/rca/tests/test_output.py +import json +import unittest +from pathlib import Path +from tempfile import TemporaryDirectory + +from worker_plan_internal.rca.tracer import ( + RCAResult, + TracedProblem, + TraceEntry, + OriginInfo, +) +from worker_plan_internal.rca.output import write_json_report, write_markdown_report + + +def _make_sample_result() -> RCAResult: + """Create a sample RCAResult for testing.""" + return RCAResult( + starting_file="025-2-executive_summary.md", + problem_description="Budget is unvalidated", + output_dir="/tmp/test_output", + problems=[ + TracedProblem( + id="problem_001", + description="Budget of CZK 500,000 is unvalidated", + severity="HIGH", + starting_evidence="CZK 500,000", + trace=[ + TraceEntry(node="executive_summary", file="025-2-executive_summary.md", evidence="CZK 500,000", is_origin=False), + TraceEntry(node="project_plan", file="005-2-project_plan.md", evidence="Budget: 500k", is_origin=False), + TraceEntry(node="make_assumptions", file="003-5-make_assumptions.md", evidence="Assume budget of 500k", is_origin=True), + ], + origin_node="make_assumptions", + origin=OriginInfo( + node="make_assumptions", + file="003-5-make_assumptions.md", + source_code_files=["make_assumptions.py"], + category="prompt_fixable", + likely_cause="Prompt generates budget without data", + suggestion="Add validation step", + ), + depth=3, + ), + TracedProblem( + id="problem_002", + description="Missing market sizing", + severity="MEDIUM", + starting_evidence="growing Czech market", + trace=[ + TraceEntry(node="executive_summary", file="025-2-executive_summary.md", evidence="growing Czech market", is_origin=True), + ], + origin_node="executive_summary", + depth=1, + ), + ], + llm_calls_made=8, + ) + + +class TestWriteJsonReport(unittest.TestCase): + def test_writes_valid_json(self): + with TemporaryDirectory() as d: + output_path = Path(d) / "root_cause_analysis.json" + result = _make_sample_result() + write_json_report(result, output_path) + + self.assertTrue(output_path.exists()) + data = json.loads(output_path.read_text(encoding="utf-8")) + self.assertIn("input", data) + self.assertIn("problems", data) + self.assertIn("summary", data) + + def test_json_contains_correct_summary(self): + with TemporaryDirectory() as d: + output_path = Path(d) / "root_cause_analysis.json" + result = _make_sample_result() + write_json_report(result, output_path) + + data = json.loads(output_path.read_text(encoding="utf-8")) + summary = data["summary"] + self.assertEqual(summary["total_problems"], 2) + self.assertEqual(summary["deepest_origin_node"], "make_assumptions") + self.assertEqual(summary["deepest_origin_depth"], 3) + self.assertEqual(summary["llm_calls_made"], 8) + + def test_json_problems_sorted_by_depth(self): + with TemporaryDirectory() as d: + output_path = Path(d) / "root_cause_analysis.json" + result = _make_sample_result() + write_json_report(result, output_path) + + data = json.loads(output_path.read_text(encoding="utf-8")) + depths = [f["depth"] for f in data["problems"]] + self.assertEqual(depths, sorted(depths, reverse=True)) + + +class TestWriteMarkdownReport(unittest.TestCase): + def test_writes_markdown_file(self): + with TemporaryDirectory() as d: + output_path = Path(d) / "root_cause_analysis.md" + result = _make_sample_result() + write_markdown_report(result, output_path) + + self.assertTrue(output_path.exists()) + content = output_path.read_text(encoding="utf-8") + self.assertIn("# Root Cause Analysis Report", content) + + def test_markdown_contains_problem_details(self): + with TemporaryDirectory() as d: + output_path = Path(d) / "root_cause_analysis.md" + result = _make_sample_result() + write_markdown_report(result, output_path) + + content = output_path.read_text(encoding="utf-8") + self.assertIn("Budget of CZK 500,000 is unvalidated", content) + self.assertIn("make_assumptions", content) + self.assertIn("executive_summary", content) + + def test_markdown_contains_trace_table(self): + with TemporaryDirectory() as d: + output_path = Path(d) / "root_cause_analysis.md" + result = _make_sample_result() + write_markdown_report(result, output_path) + + content = output_path.read_text(encoding="utf-8") + self.assertIn("| Node |", content) + self.assertIn("| File |", content) + + def test_empty_result_produces_valid_markdown(self): + with TemporaryDirectory() as d: + output_path = Path(d) / "root_cause_analysis.md" + result = RCAResult( + starting_file="030-report.html", + problem_description="test", + output_dir="/tmp", + problems=[], + llm_calls_made=1, + ) + write_markdown_report(result, output_path) + + content = output_path.read_text(encoding="utf-8") + self.assertIn("Problems found:** 0", content) diff --git a/worker_plan/worker_plan_internal/rca/tests/test_prompts.py b/worker_plan/worker_plan_internal/rca/tests/test_prompts.py new file mode 100644 index 00000000..45fb4fae --- /dev/null +++ b/worker_plan/worker_plan_internal/rca/tests/test_prompts.py @@ -0,0 +1,138 @@ +# worker_plan/worker_plan_internal/rca/tests/test_prompts.py +import unittest +from llama_index.core.llms import ChatMessage, MessageRole +from worker_plan_internal.rca.prompts import ( + IdentifiedProblem, + ProblemIdentificationResult, + UpstreamCheckResult, + SourceCodeAnalysisResult, + build_problem_identification_messages, + build_upstream_check_messages, + build_source_code_analysis_messages, +) + + +class TestPydanticModels(unittest.TestCase): + def test_identified_problem_valid(self): + problem = IdentifiedProblem( + description="Budget figure is fabricated", + evidence="The budget is CZK 500,000", + severity="HIGH", + ) + self.assertEqual(problem.severity, "HIGH") + + def test_identified_problem_rejects_invalid_severity(self): + with self.assertRaises(Exception): + IdentifiedProblem( + description="test", + evidence="test", + severity="CRITICAL", + ) + + def test_problem_identification_result(self): + result = ProblemIdentificationResult(problems=[ + IdentifiedProblem(description="test", evidence="quote", severity="LOW"), + ]) + self.assertEqual(len(result.problems), 1) + + def test_upstream_check_result_found(self): + result = UpstreamCheckResult(found=True, evidence="quote", explanation="precursor") + self.assertTrue(result.found) + self.assertEqual(result.evidence, "quote") + + def test_upstream_check_result_not_found(self): + result = UpstreamCheckResult(found=False, evidence=None, explanation="clean") + self.assertFalse(result.found) + + def test_source_code_analysis_result(self): + result = SourceCodeAnalysisResult( + category="prompt_fixable", + likely_cause="prompt lacks validation", + relevant_code_section="system_prompt = ...", + suggestion="add grounding check", + ) + self.assertEqual(result.category, "prompt_fixable") + self.assertIsInstance(result.likely_cause, str) + + def test_source_code_analysis_rejects_invalid_category(self): + with self.assertRaises(Exception): + SourceCodeAnalysisResult( + category="unknown_category", + likely_cause="test", + relevant_code_section="test", + suggestion="test", + ) + + +class TestBuildProblemIdentificationMessages(unittest.TestCase): + def test_returns_chat_messages(self): + messages = build_problem_identification_messages( + filename="030-report.html", + file_content="report content", + user_problem_description="budget is wrong", + ) + self.assertIsInstance(messages, list) + self.assertEqual(len(messages), 2) + self.assertEqual(messages[0].role, MessageRole.SYSTEM) + self.assertEqual(messages[1].role, MessageRole.USER) + + def test_user_message_contains_inputs(self): + messages = build_problem_identification_messages( + filename="025-2-executive_summary.md", + file_content="# Summary\nBudget: 500k", + user_problem_description="fabricated budget", + ) + user_content = messages[1].content + self.assertIn("025-2-executive_summary.md", user_content) + self.assertIn("# Summary", user_content) + self.assertIn("fabricated budget", user_content) + + +class TestBuildUpstreamCheckMessages(unittest.TestCase): + def test_returns_chat_messages(self): + messages = build_upstream_check_messages( + problem_description="Budget is fabricated", + evidence_quote="CZK 500,000", + upstream_filename="005-2-project_plan.md", + upstream_file_content="# Project Plan\nBudget: 500k", + ) + self.assertIsInstance(messages, list) + self.assertEqual(len(messages), 2) + + def test_user_message_contains_problem_and_upstream(self): + messages = build_upstream_check_messages( + problem_description="Missing market sizing", + evidence_quote="growing Czech market", + upstream_filename="003-5-make_assumptions.md", + upstream_file_content="# Assumptions\nMarket is growing", + ) + user_content = messages[1].content + self.assertIn("Missing market sizing", user_content) + self.assertIn("growing Czech market", user_content) + self.assertIn("003-5-make_assumptions.md", user_content) + + +class TestBuildSourceCodeAnalysisMessages(unittest.TestCase): + def test_returns_chat_messages(self): + messages = build_source_code_analysis_messages( + problem_description="Budget fabricated", + evidence_quote="CZK 500,000", + source_code_contents=[ + ("nodes/make_assumptions.py", "class MakeAssumptionsTask: ..."), + ("assume/make_assumptions.py", "def execute(llm, query): ..."), + ], + ) + self.assertIsInstance(messages, list) + self.assertEqual(len(messages), 2) + + def test_user_message_contains_source_code(self): + messages = build_source_code_analysis_messages( + problem_description="Missing analysis", + evidence_quote="no data", + source_code_contents=[ + ("my_stage.py", "SYSTEM_PROMPT = 'Generate assumptions'"), + ], + ) + user_content = messages[1].content + self.assertIn("my_stage.py", user_content) + self.assertIn("SYSTEM_PROMPT", user_content) diff --git a/worker_plan/worker_plan_internal/rca/tests/test_registry.py b/worker_plan/worker_plan_internal/rca/tests/test_registry.py new file mode 100644 index 00000000..1ca79cf4 --- /dev/null +++ b/worker_plan/worker_plan_internal/rca/tests/test_registry.py @@ -0,0 +1,107 @@ +# worker_plan/worker_plan_internal/rca/tests/test_registry.py +import unittest +from pathlib import Path +from tempfile import TemporaryDirectory +from worker_plan_internal.rca.registry import ( + NodeInfo, + NODES, + find_node_by_filename, + get_upstream_files, + get_source_code_paths, +) + + +class TestNodeInfo(unittest.TestCase): + def test_nodes_is_nonempty(self): + self.assertGreater(len(NODES), 40) + + def test_all_nodes_have_required_fields(self): + for node in NODES: + self.assertIsInstance(node.name, str, f"{node.name} name") + self.assertIsInstance(node.output_files, tuple, f"{node.name} output_files") + self.assertTrue(len(node.output_files) > 0, f"{node.name} has no output_files") + self.assertIsInstance(node.inputs, tuple, f"{node.name} inputs") + self.assertIsInstance(node.source_code_files, tuple, f"{node.name} source_code_files") + + def test_no_duplicate_node_names(self): + names = [n.name for n in NODES] + self.assertEqual(len(names), len(set(names))) + + def test_input_references_are_valid(self): + valid_names = {n.name for n in NODES} + for node in NODES: + for inp in node.inputs: + self.assertIn(inp.from_node, valid_names, f"{node.name} references unknown node '{inp.from_node}'") + + +class TestFindNodeByFilename(unittest.TestCase): + def test_find_report(self): + node = find_node_by_filename("030-report.html") + self.assertIsNotNone(node) + self.assertEqual(node.name, "report") + + def test_find_potential_levers_clean(self): + node = find_node_by_filename("002-10-potential_levers.json") + self.assertIsNotNone(node) + self.assertEqual(node.name, "potential_levers") + + def test_find_potential_levers_raw(self): + node = find_node_by_filename("002-9-potential_levers_raw.json") + self.assertIsNotNone(node) + self.assertEqual(node.name, "potential_levers") + + def test_find_executive_summary(self): + node = find_node_by_filename("025-2-executive_summary.md") + self.assertIsNotNone(node) + self.assertEqual(node.name, "executive_summary") + + def test_unknown_filename_returns_none(self): + node = find_node_by_filename("zzz-unknown.txt") + self.assertIsNone(node) + + +class TestGetUpstreamFiles(unittest.TestCase): + def test_setup_has_no_upstream(self): + with TemporaryDirectory() as d: + result = get_upstream_files("setup", Path(d)) + self.assertEqual(result, []) + + def test_potential_levers_upstream(self): + with TemporaryDirectory() as d: + output_dir = Path(d) + # Create the expected upstream files on disk + (output_dir / "001-2-plan.txt").write_text("plan", encoding="utf-8") + (output_dir / "002-6-identify_purpose.md").write_text("purpose", encoding="utf-8") + (output_dir / "002-8-plan_type.md").write_text("type", encoding="utf-8") + (output_dir / "002-0-extract_constraints.md").write_text("constraints", encoding="utf-8") + + result = get_upstream_files("potential_levers", output_dir) + node_names = [name for name, _ in result] + self.assertIn("setup", node_names) + self.assertIn("identify_purpose", node_names) + self.assertIn("plan_type", node_names) + self.assertIn("extract_constraints", node_names) + + def test_missing_files_are_skipped(self): + with TemporaryDirectory() as d: + output_dir = Path(d) + # Only create one of the upstream files + (output_dir / "001-2-plan.txt").write_text("plan", encoding="utf-8") + + result = get_upstream_files("potential_levers", output_dir) + node_names = [name for name, _ in result] + self.assertIn("setup", node_names) + # The others should be skipped because their files don't exist + self.assertNotIn("identify_purpose", node_names) + + +class TestGetSourceCodePaths(unittest.TestCase): + def test_potential_levers_source(self): + paths = get_source_code_paths("potential_levers") + filenames = [p.name for p in paths] + self.assertIn("potential_levers.py", filenames) + self.assertIn("identify_potential_levers.py", filenames) + + def test_unknown_node_returns_empty(self): + paths = get_source_code_paths("nonexistent_node") + self.assertEqual(paths, []) diff --git a/worker_plan/worker_plan_internal/rca/tests/test_tracer.py b/worker_plan/worker_plan_internal/rca/tests/test_tracer.py new file mode 100644 index 00000000..aca22d40 --- /dev/null +++ b/worker_plan/worker_plan_internal/rca/tests/test_tracer.py @@ -0,0 +1,440 @@ +# worker_plan/worker_plan_internal/rca/tests/test_tracer.py +"""Tests for the root cause analyzer recursive algorithm. + +Since ResponseMockLLM does NOT support as_structured_llm(), we mock the three +private LLM-calling methods (_identify_problems, _check_upstream, +_analyze_source_code) directly. This tests the tracing logic — recursion, +deduplication, max depth — which is the important part. +""" +import unittest +from pathlib import Path +from tempfile import TemporaryDirectory +from unittest.mock import patch + +from worker_plan_internal.rca.tracer import ( + RootCauseAnalyzer, + RCAResult, + TracedProblem, + TraceEntry, + OriginInfo, +) +from worker_plan_internal.rca.prompts import ( + ProblemIdentificationResult, + IdentifiedProblem, + UpstreamCheckResult, +) +from worker_plan_internal.llm_util.response_mockllm import ResponseMockLLM +from worker_plan_internal.llm_util.llm_executor import LLMExecutor, LLMModelWithInstance + + +def _make_executor() -> LLMExecutor: + """Create a dummy LLMExecutor (won't actually be called when methods are mocked).""" + llm = ResponseMockLLM(responses=["unused"]) + llm_models = LLMModelWithInstance.from_instances([llm]) + return LLMExecutor(llm_models=llm_models) + + +def _make_tracer(output_dir: Path, max_depth: int = 15, verbose: bool = False) -> RootCauseAnalyzer: + """Create a RootCauseAnalyzer with a dummy executor.""" + executor = _make_executor() + return RootCauseAnalyzer( + output_dir=output_dir, + llm_executor=executor, + max_depth=max_depth, + verbose=verbose, + ) + + +class TestRCAResult(unittest.TestCase): + def test_dataclass_creation(self): + result = RCAResult( + starting_file="030-report.html", + problem_description="test", + output_dir="/tmp/test", + problems=[], + llm_calls_made=0, + ) + self.assertEqual(result.starting_file, "030-report.html") + self.assertEqual(len(result.problems), 0) + self.assertEqual(result.llm_calls_made, 0) + + def test_dataclass_with_problems(self): + problem = TracedProblem( + id="problem_001", + description="Budget fabricated", + severity="HIGH", + starting_evidence="CZK 500,000", + trace=[TraceEntry(node="test", file="test.md", evidence="ev")], + ) + result = RCAResult( + starting_file="test.md", + problem_description="test", + output_dir="/tmp/test", + problems=[problem], + llm_calls_made=1, + ) + self.assertEqual(len(result.problems), 1) + self.assertEqual(result.problems[0].severity, "HIGH") + + +class TestTracedProblem(unittest.TestCase): + def test_defaults(self): + problem = TracedProblem( + id="problem_001", + description="test", + severity="LOW", + starting_evidence="ev", + trace=[], + ) + self.assertIsNone(problem.origin_node) + self.assertIsNone(problem.origin) + self.assertEqual(problem.depth, 0) + self.assertTrue(problem.trace_complete) + + +class TestRootCauseAnalyzerPhase1(unittest.TestCase): + """Test problem identification (Phase 1) with mocked LLM methods.""" + + def test_identify_problems(self): + """The analyzer should produce TracedProblem objects from Phase 1 identification.""" + with TemporaryDirectory() as d: + output_dir = Path(d) + # Create a minimal output file + report_file = output_dir / "025-2-executive_summary.md" + report_file.write_text("# Summary\nBudget: CZK 500,000", encoding="utf-8") + + tracer = _make_tracer(output_dir) + + # Mock Phase 1: identify problems + mock_identification = ProblemIdentificationResult( + problems=[ + IdentifiedProblem( + description="Budget is unvalidated", + evidence="CZK 500,000", + severity="HIGH", + ) + ] + ) + + with patch.object(tracer, '_identify_problems', return_value=mock_identification), \ + patch.object(tracer, '_analyze_source_code') as mock_analyze: + result = tracer.trace("025-2-executive_summary.md", "budget is unvalidated") + + self.assertIsInstance(result, RCAResult) + self.assertGreaterEqual(len(result.problems), 1) + problem = result.problems[0] + self.assertEqual(problem.description, "Budget is unvalidated") + self.assertEqual(problem.severity, "HIGH") + + def test_file_not_found_raises(self): + """The tracer should raise FileNotFoundError for missing starting files.""" + with TemporaryDirectory() as d: + tracer = _make_tracer(Path(d)) + with self.assertRaises(FileNotFoundError): + tracer.trace("nonexistent.md", "test") + + +class TestRootCauseAnalyzerUpstreamTrace(unittest.TestCase): + """Test upstream tracing (Phase 2) with a simple two-level chain.""" + + def test_traces_problem_upstream(self): + with TemporaryDirectory() as d: + output_dir = Path(d) + # Create files for a chain: executive_summary -> project_plan -> setup + (output_dir / "025-2-executive_summary.md").write_text("Budget: CZK 500,000", encoding="utf-8") + (output_dir / "005-2-project_plan.md").write_text("Budget: CZK 500,000", encoding="utf-8") + (output_dir / "001-2-plan.txt").write_text("Open a tea shop", encoding="utf-8") + # Create other upstream files that executive_summary depends on + (output_dir / "002-14-strategic_decisions.md").write_text("decisions", encoding="utf-8") + (output_dir / "002-19-scenarios.md").write_text("scenarios", encoding="utf-8") + (output_dir / "003-10-consolidate_assumptions_full.md").write_text("assumptions", encoding="utf-8") + + tracer = _make_tracer(output_dir) + + # Mock Phase 1: identify problems + mock_identification = ProblemIdentificationResult( + problems=[ + IdentifiedProblem( + description="Budget fabricated", + evidence="CZK 500,000", + severity="HIGH", + ) + ] + ) + + # Track upstream check calls to return different results per file + upstream_call_count = 0 + upstream_responses = {} + + def mock_check_upstream(problem_desc, evidence, upstream_filename, upstream_content): + nonlocal upstream_call_count + upstream_call_count += 1 + # project_plan has the problem; others are clean + if "project_plan" in upstream_filename: + return UpstreamCheckResult( + found=True, + evidence="Budget: CZK 500,000", + explanation="Budget originates here", + ) + else: + return UpstreamCheckResult( + found=False, + evidence=None, + explanation="clean", + ) + + with patch.object(tracer, '_identify_problems', return_value=mock_identification), \ + patch.object(tracer, '_check_upstream', side_effect=mock_check_upstream), \ + patch.object(tracer, '_analyze_source_code'): + result = tracer.trace("025-2-executive_summary.md", "budget is fabricated") + + self.assertEqual(len(result.problems), 1) + problem = result.problems[0] + # The trace should include at least executive_summary and project_plan + trace_nodes = [entry.node for entry in problem.trace] + self.assertIn("executive_summary", trace_nodes) + self.assertIn("project_plan", trace_nodes) + # Origin should be project_plan (problem found there but not in its upstream 'setup') + self.assertEqual(problem.origin_node, "project_plan") + + def test_deduplication_works(self): + """Stages already checked for the same problem should be skipped.""" + with TemporaryDirectory() as d: + output_dir = Path(d) + # executive_summary depends on strategic_decisions_markdown, scenarios_markdown, etc. + # project_plan also depends on strategic_decisions_markdown, scenarios_markdown. + # When we trace through project_plan, those shared upstreams should be skipped. + (output_dir / "025-2-executive_summary.md").write_text("Budget: 500k", encoding="utf-8") + (output_dir / "005-2-project_plan.md").write_text("Budget: 500k", encoding="utf-8") + (output_dir / "001-2-plan.txt").write_text("Open a tea shop", encoding="utf-8") + (output_dir / "002-14-strategic_decisions.md").write_text("decisions", encoding="utf-8") + (output_dir / "002-19-scenarios.md").write_text("scenarios", encoding="utf-8") + (output_dir / "003-10-consolidate_assumptions_full.md").write_text("assumptions", encoding="utf-8") + + tracer = _make_tracer(output_dir) + + mock_identification = ProblemIdentificationResult( + problems=[ + IdentifiedProblem(description="Budget fabricated", evidence="500k", severity="HIGH") + ] + ) + + checked_stages = [] + + def mock_check_upstream(problem_desc, evidence, upstream_filename, upstream_content): + checked_stages.append(upstream_filename) + if "project_plan" in upstream_filename: + return UpstreamCheckResult(found=True, evidence="500k", explanation="found here") + return UpstreamCheckResult(found=False, evidence=None, explanation="clean") + + with patch.object(tracer, '_identify_problems', return_value=mock_identification), \ + patch.object(tracer, '_check_upstream', side_effect=mock_check_upstream), \ + patch.object(tracer, '_analyze_source_code'): + result = tracer.trace("025-2-executive_summary.md", "budget fabricated") + + # Count unique filenames checked — dedup should prevent re-checking + # strategic_decisions and scenarios at the project_plan level + unique_checked = set(checked_stages) + # Each file should appear at most once + self.assertEqual(len(checked_stages), len(unique_checked), + f"Dedup failed: checked {checked_stages}") + + +class TestRootCauseAnalyzerMaxDepth(unittest.TestCase): + def test_respects_max_depth_zero(self): + """With max_depth=0, no upstream tracing happens.""" + with TemporaryDirectory() as d: + output_dir = Path(d) + (output_dir / "025-2-executive_summary.md").write_text("Budget: 500k", encoding="utf-8") + + tracer = _make_tracer(output_dir, max_depth=0) + + mock_identification = ProblemIdentificationResult( + problems=[ + IdentifiedProblem(description="test problem", evidence="500k", severity="LOW") + ] + ) + + with patch.object(tracer, '_identify_problems', return_value=mock_identification), \ + patch.object(tracer, '_check_upstream') as mock_check, \ + patch.object(tracer, '_analyze_source_code'): + result = tracer.trace("025-2-executive_summary.md", "test") + + self.assertEqual(len(result.problems), 1) + # With max_depth=0, no upstream tracing happens + self.assertEqual(len(result.problems[0].trace), 1) # only the starting file + # _check_upstream should never have been called + mock_check.assert_not_called() + + def test_max_depth_limits_recursion(self): + """With max_depth=1, tracing should stop after one level of upstream.""" + with TemporaryDirectory() as d: + output_dir = Path(d) + (output_dir / "025-2-executive_summary.md").write_text("Budget: 500k", encoding="utf-8") + (output_dir / "005-2-project_plan.md").write_text("Budget: 500k", encoding="utf-8") + (output_dir / "001-2-plan.txt").write_text("plan", encoding="utf-8") + (output_dir / "002-14-strategic_decisions.md").write_text("decisions", encoding="utf-8") + (output_dir / "002-19-scenarios.md").write_text("scenarios", encoding="utf-8") + (output_dir / "003-10-consolidate_assumptions_full.md").write_text("assumptions", encoding="utf-8") + + tracer = _make_tracer(output_dir, max_depth=1) + + mock_identification = ProblemIdentificationResult( + problems=[ + IdentifiedProblem(description="problem", evidence="500k", severity="MEDIUM") + ] + ) + + def always_found(problem_desc, evidence, upstream_filename, upstream_content): + return UpstreamCheckResult(found=True, evidence="500k", explanation="found") + + with patch.object(tracer, '_identify_problems', return_value=mock_identification), \ + patch.object(tracer, '_check_upstream', side_effect=always_found), \ + patch.object(tracer, '_analyze_source_code'): + result = tracer.trace("025-2-executive_summary.md", "test") + + self.assertEqual(len(result.problems), 1) + problem = result.problems[0] + # trace_complete should be False because max depth was hit + self.assertFalse(problem.trace_complete) + + +class TestRootCauseAnalyzerSourceCodeAnalysis(unittest.TestCase): + """Test that Phase 3 source code analysis is invoked at the origin node.""" + + def test_source_code_analysis_called_at_origin(self): + with TemporaryDirectory() as d: + output_dir = Path(d) + (output_dir / "025-2-executive_summary.md").write_text("Budget: 500k", encoding="utf-8") + + tracer = _make_tracer(output_dir) + + mock_identification = ProblemIdentificationResult( + problems=[ + IdentifiedProblem(description="problem", evidence="500k", severity="HIGH") + ] + ) + + with patch.object(tracer, '_identify_problems', return_value=mock_identification), \ + patch.object(tracer, '_analyze_source_code') as mock_analyze: + result = tracer.trace("025-2-executive_summary.md", "test") + + # _analyze_source_code should have been called once for the origin + mock_analyze.assert_called_once() + args = mock_analyze.call_args + # First positional arg is the TracedProblem, second is the node name + self.assertEqual(args[0][1], "executive_summary") + + def test_source_code_analysis_called_at_deep_origin(self): + """Phase 3 should run when the origin is found at a deeper upstream node.""" + with TemporaryDirectory() as d: + output_dir = Path(d) + # Create files for a chain: executive_summary -> project_plan (origin) + (output_dir / "025-2-executive_summary.md").write_text("Budget: 500k", encoding="utf-8") + (output_dir / "005-2-project_plan.md").write_text("Budget: 500k", encoding="utf-8") + (output_dir / "001-2-plan.txt").write_text("Open a tea shop", encoding="utf-8") + (output_dir / "002-14-strategic_decisions.md").write_text("decisions", encoding="utf-8") + (output_dir / "002-19-scenarios.md").write_text("scenarios", encoding="utf-8") + (output_dir / "003-10-consolidate_assumptions_full.md").write_text("assumptions", encoding="utf-8") + + tracer = _make_tracer(output_dir) + + mock_identification = ProblemIdentificationResult( + problems=[ + IdentifiedProblem(description="Budget fabricated", evidence="500k", severity="HIGH") + ] + ) + + def mock_check_upstream(problem_desc, evidence, upstream_filename, upstream_content): + # project_plan has the problem; others are clean + if "project_plan" in upstream_filename: + return UpstreamCheckResult( + found=True, evidence="Budget: 500k", explanation="Budget originates here" + ) + return UpstreamCheckResult(found=False, evidence=None, explanation="clean") + + with patch.object(tracer, '_identify_problems', return_value=mock_identification), \ + patch.object(tracer, '_check_upstream', side_effect=mock_check_upstream), \ + patch.object(tracer, '_analyze_source_code') as mock_analyze: + result = tracer.trace("025-2-executive_summary.md", "budget fabricated") + + # Phase 3 should have been called at the deep origin (project_plan) + mock_analyze.assert_called_once() + args = mock_analyze.call_args + # Second positional arg is the origin node name + self.assertEqual(args[0][1], "project_plan") + + +class TestRootCauseAnalyzerMultipleProblems(unittest.TestCase): + """Test that multiple problems are traced independently.""" + + def test_traces_multiple_problems(self): + with TemporaryDirectory() as d: + output_dir = Path(d) + (output_dir / "025-2-executive_summary.md").write_text("Budget: 500k\nTimeline: 2 months", encoding="utf-8") + + tracer = _make_tracer(output_dir) + + mock_identification = ProblemIdentificationResult( + problems=[ + IdentifiedProblem(description="Budget fabricated", evidence="500k", severity="HIGH"), + IdentifiedProblem(description="Timeline unrealistic", evidence="2 months", severity="MEDIUM"), + ] + ) + + with patch.object(tracer, '_identify_problems', return_value=mock_identification), \ + patch.object(tracer, '_analyze_source_code'): + result = tracer.trace("025-2-executive_summary.md", "multiple issues") + + self.assertEqual(len(result.problems), 2) + descriptions = {f.description for f in result.problems} + self.assertIn("Budget fabricated", descriptions) + self.assertIn("Timeline unrealistic", descriptions) + # Each problem should have a unique ID + ids = [f.id for f in result.problems] + self.assertEqual(len(ids), len(set(ids))) + + +class TestRootCauseAnalyzerSortsByDepth(unittest.TestCase): + """Test that results are sorted by depth (deepest origin first).""" + + def test_problems_sorted_by_depth_descending(self): + with TemporaryDirectory() as d: + output_dir = Path(d) + (output_dir / "025-2-executive_summary.md").write_text("content", encoding="utf-8") + (output_dir / "005-2-project_plan.md").write_text("content", encoding="utf-8") + (output_dir / "002-14-strategic_decisions.md").write_text("content", encoding="utf-8") + (output_dir / "002-19-scenarios.md").write_text("content", encoding="utf-8") + (output_dir / "003-10-consolidate_assumptions_full.md").write_text("content", encoding="utf-8") + + tracer = _make_tracer(output_dir) + + mock_identification = ProblemIdentificationResult( + problems=[ + IdentifiedProblem(description="shallow problem", evidence="ev1", severity="LOW"), + IdentifiedProblem(description="deep problem", evidence="ev2", severity="HIGH"), + ] + ) + + call_count = 0 + + def mock_check_upstream(problem_desc, evidence, upstream_filename, upstream_content): + nonlocal call_count + call_count += 1 + # For "deep problem", find it in project_plan + if "deep problem" in problem_desc and "project_plan" in upstream_filename: + return UpstreamCheckResult(found=True, evidence="ev2", explanation="found") + return UpstreamCheckResult(found=False, evidence=None, explanation="clean") + + with patch.object(tracer, '_identify_problems', return_value=mock_identification), \ + patch.object(tracer, '_check_upstream', side_effect=mock_check_upstream), \ + patch.object(tracer, '_analyze_source_code'): + result = tracer.trace("025-2-executive_summary.md", "test") + + self.assertEqual(len(result.problems), 2) + # Deepest origin should be first + self.assertGreaterEqual(result.problems[0].depth, result.problems[1].depth) + + +if __name__ == "__main__": + unittest.main() diff --git a/worker_plan/worker_plan_internal/rca/tracer.py b/worker_plan/worker_plan_internal/rca/tracer.py new file mode 100644 index 00000000..c3d3ca20 --- /dev/null +++ b/worker_plan/worker_plan_internal/rca/tracer.py @@ -0,0 +1,335 @@ +# worker_plan/worker_plan_internal/rca/tracer.py +"""Recursive depth-first root cause analyzer for PlanExe pipeline outputs.""" +from __future__ import annotations + +import json +import logging +import sys +from dataclasses import dataclass +from datetime import datetime, UTC +from pathlib import Path + +from llama_index.core.llms.llm import LLM + +from worker_plan_internal.rca.registry import ( + find_node_by_filename, + get_upstream_files, + get_source_code_paths, +) +from worker_plan_internal.rca.prompts import ( + ProblemIdentificationResult, + UpstreamCheckResult, + SourceCodeAnalysisResult, + build_problem_identification_messages, + build_upstream_check_messages, + build_source_code_analysis_messages, +) +from worker_plan_internal.llm_util.llm_executor import LLMExecutor + +logger = logging.getLogger(__name__) + + +@dataclass +class TraceEntry: + """One hop in a problem's upstream trace.""" + node: str + file: str + evidence: str + is_origin: bool = False + + +@dataclass +class OriginInfo: + """Source code analysis at a problem's origin node.""" + node: str + file: str + source_code_files: list[str] + category: str # "prompt_fixable", "domain_complexity", or "missing_input" + likely_cause: str + suggestion: str + + +@dataclass +class TracedProblem: + """A fully traced problem with its upstream chain.""" + id: str + description: str + severity: str + starting_evidence: str + trace: list[TraceEntry] + origin_node: str | None = None + origin: OriginInfo | None = None + depth: int = 0 + trace_complete: bool = True + + +@dataclass +class RCAResult: + """Complete result of a root cause analysis run.""" + starting_file: str + problem_description: str + output_dir: str + problems: list[TracedProblem] + llm_calls_made: int = 0 + + +class EventLogger: + """Appends JSON events to a JSONL file for live monitoring. + + Usage: tail -f events.jsonl + """ + + def __init__(self, path: Path | None): + self._path = path + if self._path: + self._path.parent.mkdir(parents=True, exist_ok=True) + # Truncate on start so each run is a fresh log + self._path.write_text("", encoding="utf-8") + + def log(self, event_type: str, **data: object) -> None: + if self._path is None: + return + entry = { + "timestamp": datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ"), + "event": event_type, + **data, + } + with open(self._path, "a", encoding="utf-8") as f: + f.write(json.dumps(entry, ensure_ascii=False) + "\n") + + +class RootCauseAnalyzer: + """Traces problems upstream through the PlanExe pipeline DAG.""" + + def __init__( + self, + output_dir: Path, + llm_executor: LLMExecutor, + max_depth: int = 15, + verbose: bool = False, + events_path: Path | None = None, + ): + self.output_dir = output_dir + self.llm_executor = llm_executor + self.max_depth = max_depth + self.verbose = verbose + self._llm_calls = 0 + self._checked: set[tuple[str, str]] = set() # (node_name, problem_description) dedup + self._events = EventLogger(events_path) + + def trace(self, starting_file: str, problem_description: str) -> RCAResult: + """Main entry point. Identify problems and trace each upstream.""" + self._llm_calls = 0 + self._checked.clear() + + file_path = self.output_dir / starting_file + if not file_path.exists(): + raise FileNotFoundError(f"Starting file not found: {file_path}") + + file_content = file_path.read_text(encoding="utf-8") + found_node = find_node_by_filename(starting_file) + node_name = found_node.name if found_node else "unknown" + + # Phase 1: Identify problems + self._log(f"Phase 1: Identifying problems in {starting_file}") + self._events.log("phase1_start", file=starting_file, node=node_name) + identified = self._identify_problems(starting_file, file_content, problem_description) + self._log(f" Found {len(identified.problems)} problem(s)") + self._events.log("phase1_done", problems_found=len(identified.problems), + summaries=[p.description for p in identified.problems]) + + traced_problems: list[TracedProblem] = [] + for i, problem in enumerate(identified.problems): + problem_id = f"problem_{i + 1:03d}" + self._log(f"\nTracing {problem_id}: {problem.description}") + self._events.log("trace_problem_start", problem_id=problem_id, + problem_index=i + 1, problem_total=len(identified.problems), + description=problem.description, severity=problem.severity) + + starting_entry = TraceEntry( + node=node_name, + file=starting_file, + evidence=problem.evidence, + is_origin=False, + ) + + traced = TracedProblem( + id=problem_id, + description=problem.description, + severity=problem.severity, + starting_evidence=problem.evidence, + trace=[starting_entry], + ) + + if found_node and self.max_depth > 0: + self._trace_upstream(traced, node_name, problem.description, problem.evidence, depth=0) + + # Mark the last trace entry as origin if no deeper origin was found + if traced.origin_node is None and traced.trace: + last = traced.trace[-1] + last.is_origin = True + traced.origin_node = last.node + traced.depth = len(traced.trace) - 1 + + # Phase 3: Source code analysis at origin (always, when origin is known) + if traced.origin_node is not None: + self._events.log("phase3_start", problem_id=problem_id, origin_node=traced.origin_node) + self._analyze_source_code( + traced, traced.origin_node, problem.description, + next((e.evidence for e in traced.trace if e.node == traced.origin_node), problem.evidence) + ) + + self._events.log("trace_problem_done", problem_id=problem_id, + origin_node=traced.origin_node, depth=traced.depth) + traced_problems.append(traced) + + # Sort by depth (deepest origin first) + traced_problems.sort(key=lambda f: f.depth, reverse=True) + + self._events.log("trace_complete", total_problems=len(traced_problems), + llm_calls=self._llm_calls) + + return RCAResult( + starting_file=starting_file, + problem_description=problem_description, + output_dir=str(self.output_dir), + problems=traced_problems, + llm_calls_made=self._llm_calls, + ) + + def _identify_problems(self, filename: str, file_content: str, user_description: str) -> ProblemIdentificationResult: + """Phase 1: Ask LLM to identify discrete problems in the starting file.""" + messages = build_problem_identification_messages(filename, file_content, user_description) + + def execute(llm: LLM) -> ProblemIdentificationResult: + sllm = llm.as_structured_llm(ProblemIdentificationResult) + response = sllm.chat(messages) + return response.raw + + self._llm_calls += 1 + return self.llm_executor.run(execute) + + def _check_upstream(self, problem_description: str, evidence: str, upstream_filename: str, upstream_content: str) -> UpstreamCheckResult: + """Phase 2: Ask LLM if a problem exists in an upstream file.""" + messages = build_upstream_check_messages(problem_description, evidence, upstream_filename, upstream_content) + + def execute(llm: LLM) -> UpstreamCheckResult: + sllm = llm.as_structured_llm(UpstreamCheckResult) + response = sllm.chat(messages) + return response.raw + + self._llm_calls += 1 + return self.llm_executor.run(execute) + + def _trace_upstream( + self, + traced: TracedProblem, + current_node: str, + problem_description: str, + evidence: str, + depth: int, + ) -> None: + """Recursively trace a problem through upstream nodes.""" + if depth >= self.max_depth: + traced.trace_complete = False + self._log(f" Max depth {self.max_depth} reached at {current_node}") + return + + upstream_files = get_upstream_files(current_node, self.output_dir) + if not upstream_files: + return # No upstream = this is the origin + + found_upstream = False + for upstream_name, upstream_path in upstream_files: + # Dedup key uses problem_description so different problems get independent + # upstream checks. If the LLM returns duplicate descriptions, they + # share check results. + dedup_key = (upstream_name, problem_description) + if dedup_key in self._checked: + self._log(f" Skipping {upstream_name} (already checked for this problem)") + continue + self._checked.add(dedup_key) + + upstream_content = upstream_path.read_text(encoding="utf-8") + self._log(f" Checking upstream: {upstream_name} ({upstream_path.name})") + self._events.log("upstream_check", node=upstream_name, + file=upstream_path.name, depth=depth) + + result = self._check_upstream(problem_description, evidence, upstream_path.name, upstream_content) + + if result.found: + self._log(f" -> FOUND in {upstream_name}") + self._events.log("upstream_found", node=upstream_name, + file=upstream_path.name, depth=depth) + found_upstream = True + entry = TraceEntry( + node=upstream_name, + file=upstream_path.name, + evidence=result.evidence or "", + is_origin=False, + ) + traced.trace.append(entry) + + # Recurse deeper + self._trace_upstream( + traced, upstream_name, problem_description, + result.evidence or evidence, depth + 1, + ) + # First-match-wins: once an origin is found in one upstream + # branch, stop exploring others. + if traced.origin_node is not None: + return + + if not found_upstream: + # Current node is the origin — problem exists here but not in any upstream + traced.origin_node = current_node + traced.depth = len(traced.trace) - 1 + self._events.log("origin_found", node=current_node, depth=traced.depth) + # Mark the current node entry as origin + for entry in traced.trace: + if entry.node == current_node: + entry.is_origin = True + + def _analyze_source_code(self, traced: TracedProblem, node_name: str, problem_description: str, evidence: str) -> None: + """Phase 3: Analyze source code at the origin node.""" + source_paths = get_source_code_paths(node_name) + if not source_paths: + return + + source_contents: list[tuple[str, str]] = [] + for path in source_paths: + if path.exists(): + content = path.read_text(encoding="utf-8") + short_name = f"{path.parent.name}/{path.name}" + source_contents.append((short_name, content)) + + if not source_contents: + return + + self._log(f" Phase 3: Analyzing source code for {node_name}") + messages = build_source_code_analysis_messages(problem_description, evidence, source_contents) + + def execute(llm: LLM) -> SourceCodeAnalysisResult: + sllm = llm.as_structured_llm(SourceCodeAnalysisResult) + response = sllm.chat(messages) + return response.raw + + self._llm_calls += 1 + try: + analysis = self.llm_executor.run(execute) + source_file_names = [name for name, _ in source_contents] + traced.origin = OriginInfo( + node=node_name, + file=traced.trace[-1].file if traced.trace else "", + source_code_files=source_file_names, + category=analysis.category, + likely_cause=analysis.likely_cause, + suggestion=analysis.suggestion, + ) + except Exception as e: + logger.warning(f"Source code analysis failed for {node_name}: {e}") + + def _log(self, message: str) -> None: + """Print to stderr if verbose mode is enabled.""" + if self.verbose: + print(message, file=sys.stderr)