diff --git a/src/fromager/commands/graph.py b/src/fromager/commands/graph.py index cf3fab98..83548e77 100644 --- a/src/fromager/commands/graph.py +++ b/src/fromager/commands/graph.py @@ -784,3 +784,361 @@ def n2s(nodes: typing.Iterable[DependencyNode]) -> str: topo.done(*nodes_to_build) print(f"\nBuilding {len(graph)} packages in {rounds} rounds.") + + +# --------------------------------------------------------------------------- +# graph check — pre-rebuild structural and conflict analysis +# --------------------------------------------------------------------------- + + +def _check_well_formed(graph_dict: dict[str, typing.Any]) -> list[str]: + """Check that all edge targets exist as nodes in the graph.""" + dangling: dict[str, int] = {} + for _key, node_dict in graph_dict.items(): + for edge in node_dict.get("edges", []): + target = edge["key"] + if target not in graph_dict: + dangling[target] = dangling.get(target, 0) + 1 + return [ + f"DANGLING EDGE: {target} not in graph ({n} edge(s))" + for target, n in sorted(dangling.items()) + ] + + +def _check_acyclicity( + graph_dict: dict[str, typing.Any], +) -> tuple[list[str], list[str]]: + """Check for cycles and self-loops in the raw graph dict. + + Operates on the raw JSON dict rather than DependencyGraph because + from_dict absorbs self-loops and flattens cycles during construction, + making them undetectable afterward. + + Returns (cycles, self_loops). Cycles are errors; self-loops are + warnings (common in extras like safetensors[numpy] and harmless + for install deps). + """ + cycles: list[str] = [] + self_loops: list[str] = [] + + # Self-loops: edge target equals the node's own key + for key, node_dict in graph_dict.items(): + for edge in node_dict.get("edges", []): + if edge["key"] == key: + self_loops.append( + f"SELF-LOOP: {key} depends on itself (req: {edge['req']})" + ) + + # Cycle detection via DFS on raw adjacency + # (0=unvisited, 1=in-stack, 2=done) + unvisited, in_stack, done = 0, 1, 2 + color: dict[str, int] = {k: unvisited for k in graph_dict} + path: list[str] = [] + + def dfs(key: str) -> None: + color[key] = in_stack + path.append(key) + for edge in graph_dict.get(key, {}).get("edges", []): + tgt = edge["key"] + if tgt == key: + continue # self-loops already reported + if tgt not in color: + continue # dangling edge, reported by well-formed check + if color[tgt] == in_stack: + cycle_start = path.index(tgt) + cycle = path[cycle_start:] + [tgt] + cycles.append(f"CYCLE: {' -> '.join(cycle)}") + elif color[tgt] == unvisited: + dfs(tgt) + path.pop() + color[key] = done + + for key in graph_dict: + if color[key] == unvisited: + dfs(key) + + return cycles, self_loops + + +def _classify_conflicts( + graph: DependencyGraph, +) -> list[dict[str, typing.Any]]: + """Classify multi-version packages as collapsible or required. + + Same specifier-matching logic as show_explain_duplicates: for each + multi-version package, check if any single version satisfies all + consumer specifiers. Collapsible means a feasible pin exists; + required means no single version works. + """ + conflicts = graph.get_install_dependency_versions() + entries: list[dict[str, typing.Any]] = [] + + for dep_name, nodes in sorted(conflicts.items()): + versions = [node.version for node in nodes] + if len(versions) <= 1: + continue + + usable_versions: dict[str, list[str]] = {} + user_counter = 0 + tightest: tuple[str, str, int] | None = None + + for node in sorted(nodes, key=lambda x: x.version): + parents_by_req: dict[Requirement, set[str]] = {} + for parent_edge in node.get_incoming_install_edges(): + parents_by_req.setdefault(parent_edge.req, set()).add( + parent_edge.destination_node.key + ) + # No top-level self-vote: matches show_explain_duplicates and + # is stricter than write_constraints_file (which self-votes + # without specifier checking). Preserves conservatism. + for req, parents in parents_by_req.items(): + match_versions = [str(v) for v in req.specifier.filter(versions)] + user_counter += len(parents) + for mv in match_versions: + usable_versions.setdefault(mv, []).extend(parents) + n_match = len(match_versions) + pick_parent = sorted(parents)[0] + if tightest is None or n_match < tightest[2]: + tightest = (str(req), pick_parent, n_match) + + # Sort descending to prefer highest version, matching + # write_constraints_file's reversed(sorted(...)) + all_usable = sorted( + (v for v, users in usable_versions.items() if len(users) == user_counter), + key=lambda s: Version(s), + reverse=True, + ) + + entry: dict[str, typing.Any] = { + "name": str(dep_name), + "versions": [str(v) for v in sorted(versions)], + } + if tightest: + entry["tightest_spec"] = tightest[0] + entry["tightest_parent"] = tightest[1] + if all_usable: + entry["pin"] = all_usable[0] # highest feasible version + entry["also"] = all_usable[1:] + else: + entry["pin"] = None + entry["also"] = [] + entries.append(entry) + + return entries + + +def _print_check( + wf_issues: list[str], + cycles: list[str], + self_loops: list[str], + entries: list[dict[str, typing.Any]], + n_nodes: int, + as_json: bool = False, + as_constraints: bool = False, +) -> bool: + """Print check results. Returns True if all checks pass. + + Self-loops are warnings (printed but don't cause failure). + Cycles, dangling edges, and version conflicts cause failure. + """ + n_multi = len(entries) + collapsible = [e for e in entries if e.get("pin")] + required = [e for e in entries if not e.get("pin")] + + if as_constraints: + for e in collapsible: + print(f"{e['name']}=={e['pin']}") + return not required and not wf_issues and not cycles + + if as_json: + out: dict[str, typing.Any] = { + "well_formed": {"pass": not wf_issues, "issues": wf_issues}, + "acyclic": { + "pass": not cycles, + "cycles": cycles, + "self_loops": self_loops, + }, + "version_unique": {"pass": n_multi == 0, "n_conflicts": n_multi}, + "conflict_analysis": entries, + "build_efficiency": { + "total_wheels": n_nodes, + "extra_builds": sum(len(e["versions"]) - 1 for e in collapsible), + }, + } + print(json.dumps(out, indent=2)) + return not wf_issues and not cycles and not n_multi + + # Human-readable output + all_pass = True + print("=== Graph Check ===\n") + + wf_status = "PASS" if not wf_issues else "FAIL" + ac_status = "PASS" if not cycles else "FAIL" + vu_status = "PASS" if not n_multi else "FAIL" + if wf_issues or cycles or n_multi: + all_pass = False + + print(f" [{wf_status}] Well-formed — all edge targets exist as nodes") + if self_loops and not cycles: + print( + f" [{ac_status}] Acyclic — no dependency cycles " + f"({len(self_loops)} self-loop warning(s))" + ) + else: + print(f" [{ac_status}] Acyclic — no dependency cycles") + if n_multi: + print( + f" [{vu_status}] Version-unique — " + f"{n_multi} package(s) have multiple versions" + ) + else: + print(f" [{vu_status}] Version-unique — one version per package name") + + if all_pass: + if self_loops: + print() + print(f"--- {len(self_loops)} self-loop warning(s) ---\n") + for issue in self_loops: + print(f" {issue}") + print() + print("All checks pass. Self-loops are informational only.") + else: + print("\nAll checks pass. No known issues blocking the next build.") + return True + + print() + + # Build efficiency + if entries: + n_extra = sum(len(e["versions"]) - 1 for e in collapsible) + print( + f"Build efficiency: {n_nodes} wheels built, " + f"{n_extra} install-dep extra can be eliminated" + ) + print() + + # Conflict classification + if entries: + n_coll = len(collapsible) + n_req = len(required) + print( + f"{n_multi} install-dep package(s) with multiple versions " + f"({n_coll} collapsible, {n_req} required):" + ) + print() + + if required: + print( + f" {n_req} required — no single version satisfies " + "all consumers (update binding parent):" + ) + for e in required: + spec = e.get("tightest_spec", "") + parent = e.get("tightest_parent", "unknown") + print(f" {e['name']} — bound by {parent} ({spec})") + print() + + if collapsible: + print(f" {n_coll} collapsible — one version satisfies all consumers:") + for e in collapsible: + spec = e.get("tightest_spec", "") + parent = e.get("tightest_parent", "unknown") + print(f" {e['name']}=={e['pin']} — bound by {parent} ({spec})") + print() + + pin_list = ", ".join(f"{e['name']}=={e['pin']}" for e in collapsible) + print(f" Or constrain to one version: {pin_list}") + print() + + # Structural issues + if wf_issues: + print(f"--- {len(wf_issues)} dangling edge(s) ---\n") + for issue in wf_issues: + print(f" {issue}") + print() + + if cycles: + print(f"--- {len(cycles)} cycle(s) ---\n") + for issue in cycles: + print(f" {issue}") + print() + + if self_loops: + print(f"--- {len(self_loops)} self-loop warning(s) ---\n") + for issue in self_loops: + print(f" {issue}") + print() + + return False + + +@graph.command() +@click.option( + "--json", + "as_json", + is_flag=True, + default=False, + help="Output results as JSON.", +) +@click.option( + "--constraints", + "as_constraints", + is_flag=True, + default=False, + help="Output collapsible pins in constraints.txt format.", +) +@click.argument( + "graph-file", + type=str, +) +@click.pass_obj +def check( + wkctx: context.WorkContext, + graph_file: str, + as_json: bool, + as_constraints: bool, +) -> None: + """Check graph structure for version conflicts. + + Pre-rebuild check that answers two questions in seconds: + + 1. Is this graph structurally sound? (well-formed, acyclic) + + 2. How many wheels are extra? (collapsible vs required) + + Collapsible means one version satisfies all consumers — the extra + version is an unnecessary build. Required means no single version + works — write_constraints_file will fail for this package. + """ + if as_json and as_constraints: + raise click.UsageError("--json and --constraints are mutually exclusive") + + # Load raw dict for well-formedness (needs access to raw edges) + with open(graph_file) as f: + graph_dict = json.load(f) + + wf_issues = _check_well_formed(graph_dict) + + # Acyclicity runs on raw dict (DependencyGraph absorbs cycles/self-loops) + cycles, self_loops = _check_acyclicity(graph_dict) + + # Conflict classification needs DependencyGraph. If the graph has + # dangling edges, from_dict will crash (KeyError), so skip. + entries: list[dict[str, typing.Any]] = [] + n_nodes = len(graph_dict) - 1 # exclude root + if not wf_issues: + graph = DependencyGraph.from_dict(graph_dict) + entries = _classify_conflicts(graph) + n_nodes = len(graph) - 1 # exclude root + + passed = _print_check( + wf_issues, + cycles, + self_loops, + entries, + n_nodes, + as_json=as_json, + as_constraints=as_constraints, + ) + if not passed: + raise SystemExit(1) diff --git a/tests/test_graph_check.py b/tests/test_graph_check.py new file mode 100644 index 00000000..ae525738 --- /dev/null +++ b/tests/test_graph_check.py @@ -0,0 +1,574 @@ +import json +import pathlib + +from click.testing import CliRunner + +from fromager.__main__ import main as fromager +from fromager.commands.graph import ( + _check_acyclicity, + _check_well_formed, + _classify_conflicts, +) +from fromager.dependency_graph import DependencyGraph + + +def _make_graph_dict( + packages: dict[str, dict], +) -> dict[str, dict]: + """Build a minimal graph JSON dict from a simplified spec. + + packages maps "name==version" to {"edges": [{"key": ..., "req_type": ..., "req": ...}]} + The ROOT node is added automatically with toplevel edges to all packages + that aren't edge targets. + """ + all_targets = set() + for info in packages.values(): + for edge in info.get("edges", []): + all_targets.add(edge["key"]) + + roots = [k for k in packages if k not in all_targets] + + result: dict[str, dict] = { + "": { + "download_url": "", + "pre_built": False, + "version": "0", + "canonicalized_name": "", + "edges": [{"key": k, "req_type": "toplevel", "req": k} for k in roots], + } + } + for key, info in packages.items(): + name, version = key.split("==") + result[key] = { + "download_url": "", + "pre_built": False, + "version": version, + "canonicalized_name": name, + "edges": info.get("edges", []), + } + return result + + +def _write_graph(tmp_path: pathlib.Path, graph_dict: dict) -> pathlib.Path: + """Write a graph dict to a JSON file and return the path.""" + p = tmp_path / "graph.json" + p.write_text(json.dumps(graph_dict)) + return p + + +# --------------------------------------------------------------------------- +# Unit tests for _check_well_formed +# --------------------------------------------------------------------------- + + +class TestCheckWellFormed: + """Tests for _check_well_formed on raw graph dicts.""" + + def test_clean_graph(self) -> None: + """No issues when all edge targets exist.""" + d = _make_graph_dict( + { + "a==1.0": { + "edges": [{"key": "b==2.0", "req_type": "install", "req": "b>=1.0"}] + }, + "b==2.0": {"edges": []}, + } + ) + assert _check_well_formed(d) == [] + + def test_dangling_edge(self) -> None: + """Detects edges pointing to missing nodes.""" + d = _make_graph_dict({"a==1.0": {"edges": []}}) + d["a==1.0"]["edges"].append( + {"key": "missing==1.0", "req_type": "install", "req": "missing"} + ) + issues = _check_well_formed(d) + assert len(issues) == 1 + assert "missing==1.0" in issues[0] + assert "DANGLING EDGE" in issues[0] + + def test_deduplicates_by_target(self) -> None: + """Multiple edges to same missing target produce one issue.""" + d = _make_graph_dict({"a==1.0": {"edges": []}, "b==1.0": {"edges": []}}) + d["a==1.0"]["edges"].append( + {"key": "missing==1.0", "req_type": "install", "req": "missing"} + ) + d["b==1.0"]["edges"].append( + {"key": "missing==1.0", "req_type": "build-system", "req": "missing"} + ) + issues = _check_well_formed(d) + assert len(issues) == 1 + assert "2 edge(s)" in issues[0] + + +# --------------------------------------------------------------------------- +# Unit tests for _check_acyclicity (operates on raw dict) +# --------------------------------------------------------------------------- + + +class TestCheckAcyclicity: + """Tests for _check_acyclicity on raw graph dicts.""" + + def test_clean_graph(self) -> None: + """No issues for acyclic graph.""" + d = _make_graph_dict( + { + "a==1.0": { + "edges": [{"key": "b==2.0", "req_type": "install", "req": "b>=1.0"}] + }, + "b==2.0": {"edges": []}, + } + ) + cycles, self_loops = _check_acyclicity(d) + assert cycles == [] + assert self_loops == [] + + def test_self_loop(self) -> None: + """Self-loops are returned as warnings, not cycles.""" + d = _make_graph_dict({"a==1.0": {"edges": []}}) + d["a==1.0"]["edges"].append( + {"key": "a==1.0", "req_type": "install", "req": "a[extras]"} + ) + cycles, self_loops = _check_acyclicity(d) + assert cycles == [] + assert len(self_loops) == 1 + assert "SELF-LOOP" in self_loops[0] + assert "a==1.0" in self_loops[0] + + def test_cycle(self) -> None: + """Detects a two-node cycle in raw dict.""" + d = _make_graph_dict( + { + "a==1.0": { + "edges": [{"key": "b==1.0", "req_type": "install", "req": "b"}] + }, + "b==1.0": { + "edges": [{"key": "a==1.0", "req_type": "install", "req": "a"}] + }, + } + ) + cycles, self_loops = _check_acyclicity(d) + assert any("CYCLE" in i for i in cycles) + assert self_loops == [] + + def test_dangling_edge_skipped(self) -> None: + """Dangling edges don't crash cycle detection.""" + d = _make_graph_dict({"a==1.0": {"edges": []}}) + d["a==1.0"]["edges"].append( + {"key": "missing==1.0", "req_type": "install", "req": "missing"} + ) + # Should not raise + cycles, _self_loops = _check_acyclicity(d) + assert cycles == [] + + def test_three_node_cycle(self) -> None: + """Detects a three-node cycle: a -> b -> c -> a.""" + d = _make_graph_dict( + { + "a==1.0": { + "edges": [{"key": "b==1.0", "req_type": "install", "req": "b"}] + }, + "b==1.0": { + "edges": [{"key": "c==1.0", "req_type": "install", "req": "c"}] + }, + "c==1.0": { + "edges": [{"key": "a==1.0", "req_type": "install", "req": "a"}] + }, + } + ) + cycles, self_loops = _check_acyclicity(d) + assert len(cycles) == 1 + assert "a==1.0" in cycles[0] + assert "b==1.0" in cycles[0] + assert "c==1.0" in cycles[0] + assert self_loops == [] + + +# --------------------------------------------------------------------------- +# Unit tests for _classify_conflicts +# --------------------------------------------------------------------------- + + +class TestClassifyConflicts: + """Tests for _classify_conflicts on DependencyGraph objects.""" + + def test_no_conflicts(self) -> None: + """No entries when each package has one version.""" + d = _make_graph_dict( + { + "a==1.0": { + "edges": [{"key": "b==2.0", "req_type": "install", "req": "b>=1.0"}] + }, + "b==2.0": {"edges": []}, + } + ) + graph = DependencyGraph.from_dict(d) + assert _classify_conflicts(graph) == [] + + def test_collapsible_conflict(self) -> None: + """Detects collapsible when one version satisfies all consumers.""" + d = _make_graph_dict( + { + "a==1.0": { + "edges": [{"key": "c==1.0", "req_type": "install", "req": "c>=1.0"}] + }, + "b==1.0": { + "edges": [{"key": "c==2.0", "req_type": "install", "req": "c>=1.0"}] + }, + "c==1.0": {"edges": []}, + "c==2.0": {"edges": []}, + } + ) + graph = DependencyGraph.from_dict(d) + entries = _classify_conflicts(graph) + assert len(entries) == 1 + assert entries[0]["name"] == "c" + assert entries[0]["pin"] == "2.0" # highest feasible version preferred + + def test_collapsible_asymmetric(self) -> None: + """When only one specific version is the valid pin.""" + d = _make_graph_dict( + { + "a==1.0": { + "edges": [{"key": "c==1.0", "req_type": "install", "req": "c>=1.0"}] + }, + "b==1.0": { + "edges": [ + {"key": "c==2.0", "req_type": "install", "req": "c>=2.0,<3.0"} + ] + }, + "c==1.0": {"edges": []}, + "c==2.0": {"edges": []}, + } + ) + graph = DependencyGraph.from_dict(d) + entries = _classify_conflicts(graph) + assert len(entries) == 1 + # Only 2.0 satisfies both >=1.0 and >=2.0,<3.0 + assert entries[0]["pin"] == "2.0" + + def test_required_conflict(self) -> None: + """Detects required when no single version satisfies all consumers.""" + d = _make_graph_dict( + { + "a==1.0": { + "edges": [{"key": "c==1.0", "req_type": "install", "req": "c<2.0"}] + }, + "b==1.0": { + "edges": [{"key": "c==2.0", "req_type": "install", "req": "c>=2.0"}] + }, + "c==1.0": {"edges": []}, + "c==2.0": {"edges": []}, + } + ) + graph = DependencyGraph.from_dict(d) + entries = _classify_conflicts(graph) + assert len(entries) == 1 + assert entries[0]["name"] == "c" + assert entries[0]["pin"] is None + + def test_build_system_only_invisible(self) -> None: + """Build-system multi-version packages are not reported. + + _classify_conflicts uses get_install_dependency_versions(), so + build-system-only version splits (e.g. setuptools 69.0 vs 70.0) + are intentionally excluded. + """ + d = _make_graph_dict( + { + "a==1.0": { + "edges": [ + { + "key": "setuptools==69.0", + "req_type": "build-system", + "req": "setuptools", + } + ] + }, + "b==1.0": { + "edges": [ + { + "key": "setuptools==70.0", + "req_type": "build-system", + "req": "setuptools", + } + ] + }, + "setuptools==69.0": {"edges": []}, + "setuptools==70.0": {"edges": []}, + } + ) + graph = DependencyGraph.from_dict(d) + entries = _classify_conflicts(graph) + assert entries == [] + + def test_same_parent_two_versions(self) -> None: + """Parent with edges to two versions of the same dependency.""" + d = _make_graph_dict( + { + "a==1.0": { + "edges": [ + {"key": "c==1.0", "req_type": "install", "req": "c>=1.0"}, + {"key": "c==2.0", "req_type": "install", "req": "c>=1.0"}, + ] + }, + "c==1.0": {"edges": []}, + "c==2.0": {"edges": []}, + } + ) + graph = DependencyGraph.from_dict(d) + entries = _classify_conflicts(graph) + assert len(entries) == 1 + assert entries[0]["name"] == "c" + # Both versions satisfy >=1.0, so it should be collapsible + assert entries[0]["pin"] is not None + + +# --------------------------------------------------------------------------- +# CLI integration tests via CliRunner +# --------------------------------------------------------------------------- + + +class TestCheckCLI: + """Integration tests for `fromager graph check`.""" + + def test_clean_graph_passes( + self, cli_runner: CliRunner, e2e_path: pathlib.Path + ) -> None: + """build-parallel graph should pass all checks.""" + graph_json = e2e_path / "build-parallel" / "graph.json" + result = cli_runner.invoke(fromager, ["graph", "check", str(graph_json)]) + assert result.exit_code == 0 + assert "[PASS] Well-formed" in result.output + assert "[PASS] Acyclic" in result.output + assert "[PASS] Version-unique" in result.output + assert "All checks pass" in result.output + + def test_clean_graph_json( + self, cli_runner: CliRunner, e2e_path: pathlib.Path + ) -> None: + """JSON output for clean graph has correct structure.""" + graph_json = e2e_path / "build-parallel" / "graph.json" + result = cli_runner.invoke( + fromager, ["graph", "check", "--json", str(graph_json)] + ) + assert result.exit_code == 0 + data = json.loads(result.output) + assert data["well_formed"]["pass"] is True + assert data["acyclic"]["pass"] is True + assert data["version_unique"]["pass"] is True + assert data["conflict_analysis"] == [] + + def test_clean_graph_constraints( + self, cli_runner: CliRunner, e2e_path: pathlib.Path + ) -> None: + """Constraints output for clean graph is empty (nothing to pin).""" + graph_json = e2e_path / "build-parallel" / "graph.json" + result = cli_runner.invoke( + fromager, ["graph", "check", "--constraints", str(graph_json)] + ) + assert result.exit_code == 0 + assert result.output.strip() == "" + + def test_conflict_graph_fails( + self, cli_runner: CliRunner, tmp_path: pathlib.Path + ) -> None: + """Graph with conflicts exits non-zero.""" + d = _make_graph_dict( + { + "a==1.0": { + "edges": [{"key": "c==1.0", "req_type": "install", "req": "c<2.0"}] + }, + "b==1.0": { + "edges": [{"key": "c==2.0", "req_type": "install", "req": "c>=2.0"}] + }, + "c==1.0": {"edges": []}, + "c==2.0": {"edges": []}, + } + ) + graph_file = _write_graph(tmp_path, d) + result = cli_runner.invoke(fromager, ["graph", "check", str(graph_file)]) + assert result.exit_code != 0 + assert "[FAIL] Version-unique" in result.output + + def test_conflict_json_structure( + self, cli_runner: CliRunner, tmp_path: pathlib.Path + ) -> None: + """JSON output for conflict graph has conflict_analysis entries.""" + d = _make_graph_dict( + { + "a==1.0": { + "edges": [{"key": "c==1.0", "req_type": "install", "req": "c>=1.0"}] + }, + "b==1.0": { + "edges": [{"key": "c==2.0", "req_type": "install", "req": "c>=1.0"}] + }, + "c==1.0": {"edges": []}, + "c==2.0": {"edges": []}, + } + ) + graph_file = _write_graph(tmp_path, d) + result = cli_runner.invoke( + fromager, ["graph", "check", "--json", str(graph_file)] + ) + data = json.loads(result.output) + assert data["version_unique"]["pass"] is False + assert data["version_unique"]["n_conflicts"] == 1 + assert len(data["conflict_analysis"]) == 1 + assert data["conflict_analysis"][0]["name"] == "c" + assert data["conflict_analysis"][0]["pin"] is not None + + def test_constraints_collapsible_exits_zero( + self, cli_runner: CliRunner, tmp_path: pathlib.Path + ) -> None: + """Constraints mode with collapsible-only conflict exits 0.""" + d = _make_graph_dict( + { + "a==1.0": { + "edges": [{"key": "c==1.0", "req_type": "install", "req": "c>=1.0"}] + }, + "b==1.0": { + "edges": [{"key": "c==2.0", "req_type": "install", "req": "c>=1.0"}] + }, + "c==1.0": {"edges": []}, + "c==2.0": {"edges": []}, + } + ) + graph_file = _write_graph(tmp_path, d) + result = cli_runner.invoke( + fromager, ["graph", "check", "--constraints", str(graph_file)] + ) + assert result.exit_code == 0 + lines = result.output.strip().splitlines() + assert len(lines) == 1 + assert lines[0].startswith("c==") + + def test_constraints_required_exits_nonzero( + self, cli_runner: CliRunner, tmp_path: pathlib.Path + ) -> None: + """Constraints mode with required conflict exits non-zero.""" + d = _make_graph_dict( + { + "a==1.0": { + "edges": [{"key": "c==1.0", "req_type": "install", "req": "c<2.0"}] + }, + "b==1.0": { + "edges": [{"key": "c==2.0", "req_type": "install", "req": "c>=2.0"}] + }, + "c==1.0": {"edges": []}, + "c==2.0": {"edges": []}, + } + ) + graph_file = _write_graph(tmp_path, d) + result = cli_runner.invoke( + fromager, ["graph", "check", "--constraints", str(graph_file)] + ) + assert result.exit_code != 0 + # No output — nothing collapsible to suggest + assert result.output.strip() == "" + + def test_json_and_constraints_mutually_exclusive( + self, cli_runner: CliRunner, tmp_path: pathlib.Path + ) -> None: + """Passing both --json and --constraints is an error.""" + d = _make_graph_dict({"a==1.0": {"edges": []}}) + graph_file = _write_graph(tmp_path, d) + result = cli_runner.invoke( + fromager, ["graph", "check", "--json", "--constraints", str(graph_file)] + ) + assert result.exit_code != 0 + assert "mutually exclusive" in result.output + + def test_dangling_edge_fails( + self, cli_runner: CliRunner, tmp_path: pathlib.Path + ) -> None: + """Graph with dangling edges exits non-zero and reports issue.""" + d = _make_graph_dict({"a==1.0": {"edges": []}}) + d["a==1.0"]["edges"].append( + {"key": "missing==1.0", "req_type": "install", "req": "missing"} + ) + graph_file = _write_graph(tmp_path, d) + result = cli_runner.invoke(fromager, ["graph", "check", str(graph_file)]) + assert result.exit_code != 0 + assert "[FAIL] Well-formed" in result.output + assert "dangling edge" in result.output.lower() + + def test_self_loop_warning( + self, cli_runner: CliRunner, tmp_path: pathlib.Path + ) -> None: + """Self-loop is a warning, not a failure — exits 0.""" + d = _make_graph_dict({"a==1.0": {"edges": []}}) + d["a==1.0"]["edges"].append( + {"key": "a==1.0", "req_type": "install", "req": "a[extras]"} + ) + graph_file = _write_graph(tmp_path, d) + result = cli_runner.invoke(fromager, ["graph", "check", str(graph_file)]) + assert result.exit_code == 0 + assert "[PASS] Acyclic" in result.output + assert "self-loop warning" in result.output.lower() + assert "SELF-LOOP" in result.output + + def test_cycle_detected( + self, cli_runner: CliRunner, tmp_path: pathlib.Path + ) -> None: + """Cycle in raw graph is detected and reported.""" + d = _make_graph_dict( + { + "a==1.0": { + "edges": [{"key": "b==1.0", "req_type": "install", "req": "b"}] + }, + "b==1.0": { + "edges": [{"key": "a==1.0", "req_type": "install", "req": "a"}] + }, + } + ) + graph_file = _write_graph(tmp_path, d) + result = cli_runner.invoke(fromager, ["graph", "check", str(graph_file)]) + assert result.exit_code != 0 + assert "CYCLE" in result.output + + def test_json_cycle_and_self_loop( + self, cli_runner: CliRunner, tmp_path: pathlib.Path + ) -> None: + """JSON output separates cycles (errors) from self-loops (warnings).""" + d = _make_graph_dict( + { + "a==1.0": { + "edges": [{"key": "b==1.0", "req_type": "install", "req": "b"}] + }, + "b==1.0": { + "edges": [{"key": "a==1.0", "req_type": "install", "req": "a"}] + }, + "c==1.0": {"edges": []}, + } + ) + # Add self-loop to c + d["c==1.0"]["edges"].append( + {"key": "c==1.0", "req_type": "install", "req": "c[extras]"} + ) + graph_file = _write_graph(tmp_path, d) + result = cli_runner.invoke( + fromager, ["graph", "check", "--json", str(graph_file)] + ) + data = json.loads(result.output) + assert data["acyclic"]["pass"] is False + assert len(data["acyclic"]["cycles"]) == 1 + assert "CYCLE" in data["acyclic"]["cycles"][0] + assert len(data["acyclic"]["self_loops"]) == 1 + assert "SELF-LOOP" in data["acyclic"]["self_loops"][0] + + def test_empty_graph(self, cli_runner: CliRunner, tmp_path: pathlib.Path) -> None: + """Graph with only ROOT node passes all checks.""" + d = { + "": { + "download_url": "", + "pre_built": False, + "version": "0", + "canonicalized_name": "", + "edges": [], + } + } + graph_file = _write_graph(tmp_path, d) + result = cli_runner.invoke(fromager, ["graph", "check", str(graph_file)]) + assert result.exit_code == 0 + assert "All checks pass" in result.output