From 8c85495c75746661376a8c97ca76ec9dcf5247e2 Mon Sep 17 00:00:00 2001 From: vegetabledoww <17863935975@163.com> Date: Tue, 19 May 2026 14:59:49 +0800 Subject: [PATCH 1/3] Add: Insight Trace workspace generation for MindStudio profiling Implement Insight Trace feature to generate MindStudio Insight-compatible trace data (trace.json + visualize_data.bin) for incore kernel instruction-level profiling. Supports simpler and ptoas dual backends. --- simpler_setup/insight_trace/__init__.py | 9 + simpler_setup/insight_trace/arg_resolver.py | 122 ++++++ simpler_setup/insight_trace/case_loader.py | 59 +++ simpler_setup/insight_trace/cli.py | 148 ++++++++ .../insight_trace/kernel_analyzer.py | 84 +++++ simpler_setup/insight_trace/models.py | 121 ++++++ simpler_setup/insight_trace/ptoas_backend.py | 194 ++++++++++ simpler_setup/insight_trace/runner.py | 53 +++ simpler_setup/insight_trace/templates.py | 348 ++++++++++++++++++ simpler_setup/insight_trace/workspace.py | 39 ++ simpler_setup/tools/insight_trace.py | 12 + tests/ut/py/test_insight_trace_core.py | 44 +++ 12 files changed, 1233 insertions(+) create mode 100644 simpler_setup/insight_trace/__init__.py create mode 100644 simpler_setup/insight_trace/arg_resolver.py create mode 100644 simpler_setup/insight_trace/case_loader.py create mode 100644 simpler_setup/insight_trace/cli.py create mode 100644 simpler_setup/insight_trace/kernel_analyzer.py create mode 100644 simpler_setup/insight_trace/models.py create mode 100644 simpler_setup/insight_trace/ptoas_backend.py create mode 100644 simpler_setup/insight_trace/runner.py create mode 100644 simpler_setup/insight_trace/templates.py create mode 100644 simpler_setup/insight_trace/workspace.py create mode 100644 simpler_setup/tools/insight_trace.py create mode 100644 tests/ut/py/test_insight_trace_core.py diff --git a/simpler_setup/insight_trace/__init__.py b/simpler_setup/insight_trace/__init__.py new file mode 100644 index 000000000..8c1e47a54 --- /dev/null +++ b/simpler_setup/insight_trace/__init__.py @@ -0,0 +1,9 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +"""MindStudio Insight trace workspace generation.""" diff --git a/simpler_setup/insight_trace/arg_resolver.py b/simpler_setup/insight_trace/arg_resolver.py new file mode 100644 index 000000000..885189e04 --- /dev/null +++ b/simpler_setup/insight_trace/arg_resolver.py @@ -0,0 +1,122 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +from __future__ import annotations + +import ctypes +import json +import struct +from pathlib import Path +from typing import Any + +from .kernel_analyzer import read_arg_indices +from .models import KernelSpec, SceneCaseContext, TraceArg, TraceScalarArg, TraceTensorArg + + +def resolve_args(context: SceneCaseContext, kernel: KernelSpec, arg_spec: Path | None = None) -> tuple[TraceArg, ...]: + if arg_spec is not None: + return _load_arg_spec(arg_spec) + args = _paged_attention_recipe(context, kernel) + read_indices = read_arg_indices(kernel.source_path) + missing = sorted(index for index in read_indices if index not in {arg.index for arg in args}) + if missing: + raise ValueError(f"Argument recipe for {kernel.name} does not cover args indices: {missing}") + return args + + +def _load_arg_spec(path: Path) -> tuple[TraceArg, ...]: + raw = json.loads(path.read_text()) + result: list[TraceArg] = [] + for item in raw.get("args", raw): + if item["kind"] == "tensor": + result.append( + TraceTensorArg( + index=int(item["index"]), + name=item["name"], + dtype=item["dtype"], + shape=tuple(int(dim) for dim in item["shape"]), + role=item.get("role", "input"), + fill=item.get("fill", "zero"), + ) + ) + elif item["kind"] == "scalar": + result.append( + TraceScalarArg( + index=int(item["index"]), + name=item["name"], + dtype=item["dtype"], + value=item["value"], + pack_mode=item.get("pack_mode", "value"), + ) + ) + else: + raise ValueError(f"Unknown arg kind: {item['kind']}") + return tuple(sorted(result, key=lambda arg: arg.index)) + + +def _paged_attention_recipe(context: SceneCaseContext, kernel: KernelSpec) -> tuple[TraceArg, ...]: + if "paged_attention" not in context.module_dir.as_posix(): + raise ValueError("No built-in insight trace recipe for this test module; pass --arg-spec") + params = context.case.get("params", {}) + q_tile = 16 + block_size = int(params["block_size"]) + head_dim = int(params["head_dim"]) + scale = _scalar_value(context, "scale", default=1.0) + + recipes: dict[str, tuple[TraceArg, ...]] = { + "QK": ( + TraceTensorArg(0, "qi", "BFLOAT16", (q_tile, head_dim)), + TraceTensorArg(1, "kj", "BFLOAT16", (block_size, head_dim)), + TraceTensorArg(2, "sij", "FLOAT32", (q_tile, block_size)), + ), + "SF": ( + TraceTensorArg(0, "sij", "FLOAT32", (q_tile, block_size)), + TraceTensorArg(1, "pij", "BFLOAT16", (q_tile, block_size)), + TraceTensorArg(2, "mij", "FLOAT32", (q_tile,)), + TraceTensorArg(3, "lij", "FLOAT32", (q_tile,)), + TraceScalarArg(4, "scale", "FLOAT32_BITS", _f32_bits(float(scale)), "bits"), + ), + "PV": ( + TraceTensorArg(0, "pij", "BFLOAT16", (q_tile, block_size)), + TraceTensorArg(1, "vj", "BFLOAT16", (block_size, head_dim)), + TraceTensorArg(2, "oi_new", "FLOAT32", (q_tile, head_dim)), + ), + "UP": ( + TraceTensorArg(0, "mij", "FLOAT32", (q_tile,)), + TraceTensorArg(1, "lij", "FLOAT32", (q_tile,)), + TraceTensorArg(2, "oi_new", "FLOAT32", (q_tile, head_dim)), + TraceTensorArg(3, "mi", "FLOAT32", (q_tile,)), + TraceTensorArg(4, "li", "FLOAT32", (q_tile,)), + TraceTensorArg(5, "oi", "FLOAT32", (q_tile, head_dim)), + TraceTensorArg(6, "dst", "FLOAT32", (q_tile, head_dim)), + TraceScalarArg(7, "is_first", "UINT64", 1), + TraceScalarArg(8, "is_last", "UINT64", 1), + ), + } + if kernel.name not in recipes: + raise ValueError(f"No paged_attention recipe for kernel {kernel.name}") + return recipes[kernel.name] + + +def _scalar_value(context: SceneCaseContext, name: str, default: Any) -> Any: + try: + builder = context.test_class().generate_args(context.case.get("params", {})) + except Exception: # noqa: BLE001 + return default + for spec in getattr(builder, "specs", []): + if getattr(spec, "name", None) != name: + continue + value = spec.value + if isinstance(value, ctypes._SimpleCData): + return value.value + return value + return default + + +def _f32_bits(value: float) -> int: + return struct.unpack("I", struct.pack("f", value))[0] diff --git a/simpler_setup/insight_trace/case_loader.py b/simpler_setup/insight_trace/case_loader.py new file mode 100644 index 000000000..eb0b84a1d --- /dev/null +++ b/simpler_setup/insight_trace/case_loader.py @@ -0,0 +1,59 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +from __future__ import annotations + +import importlib.util +import inspect +import sys +from pathlib import Path +from types import ModuleType + +from .models import SceneCaseContext + + +def load_module(path: Path) -> ModuleType: + path = path.resolve() + spec = importlib.util.spec_from_file_location(path.stem, str(path)) + if spec is None or spec.loader is None: + raise ValueError(f"Cannot load test module: {path}") + module = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = module + spec.loader.exec_module(module) + return module + + +def find_scene_test_class(module: ModuleType) -> type: + candidates = [] + for obj in module.__dict__.values(): + if inspect.isclass(obj) and hasattr(obj, "CALLABLE") and hasattr(obj, "CASES"): + if getattr(obj, "_st_level", None) == 2: + candidates.append(obj) + if not candidates: + raise ValueError("No level-2 SceneTestCase class found") + if len(candidates) > 1: + names = ", ".join(cls.__name__ for cls in candidates) + raise ValueError(f"Multiple SceneTestCase classes found: {names}") + return candidates[0] + + +def load_scene_case(test_module: Path, case_name: str) -> SceneCaseContext: + module = load_module(test_module) + test_class = find_scene_test_class(module) + case = next((case for case in test_class.CASES if case.get("name") == case_name), None) + if case is None: + available = ", ".join(case.get("name", "") for case in test_class.CASES) + raise ValueError(f"Unknown case {case_name!r}; available cases: {available}") + return SceneCaseContext( + test_class=test_class, + case=case, + callable_spec=test_class.CALLABLE, + test_module=test_module.resolve(), + module_dir=test_module.resolve().parent, + runtime=getattr(test_class, "_st_runtime", ""), + ) diff --git a/simpler_setup/insight_trace/cli.py b/simpler_setup/insight_trace/cli.py new file mode 100644 index 000000000..89c891c96 --- /dev/null +++ b/simpler_setup/insight_trace/cli.py @@ -0,0 +1,148 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +from __future__ import annotations + +import argparse +import os +from datetime import datetime +from pathlib import Path + +from simpler_setup.environment import PROJECT_ROOT +from simpler_setup.pto_isa import ensure_pto_isa_root + +from .arg_resolver import resolve_args +from .case_loader import load_scene_case +from .kernel_analyzer import select_kernel, validate_single_task_kernel +from .models import PtoasTraceConfig, TraceBackend, TraceConfig +from .ptoas_backend import collect_ptoas_trace, generate_ptoas_workspace +from .runner import run_workspace +from .workspace import create_workspace + + +def main(argv: list[str] | None = None) -> int: + parser = _build_parser() + args = parser.parse_args(argv) + try: + if args.backend == TraceBackend.PTOAS.value: + result = _run_ptoas(args) + else: + result = _run_simpler(args) + except Exception as exc: # noqa: BLE001 + print(f"insight trace failed: {exc}") + return 1 + print(f"Insight trace workspace: {result.workspace_dir}") + if result.simulator_dir is not None: + print(f"MindStudio Insight input: {result.simulator_dir}") + return 0 + + +def _build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Generate MindStudio Insight trace data for an incore kernel") + parser.add_argument("test_module", nargs="?", type=Path) + parser.add_argument("--backend", choices=[item.value for item in TraceBackend], default=TraceBackend.SIMPLER.value) + parser.add_argument("--case") + selector = parser.add_mutually_exclusive_group() + selector.add_argument("--kernel") + selector.add_argument("--func-id", type=int) + selector.add_argument("--kernel-source") + parser.add_argument("--platform", default="a2a3") + parser.add_argument("--runtime", default="tensormap_and_ringbuffer") + parser.add_argument("--output-dir", type=Path) + parser.add_argument("--cann-home", type=Path, default=_default_cann_home()) + parser.add_argument("--pto-isa-root", type=Path) + parser.add_argument("--soc-version", default="dav_2201") + parser.add_argument("--device", type=int, default=0) + parser.add_argument("--launch-count", type=int, default=1) + parser.add_argument("--timeout", type=int, default=120) + parser.add_argument("--hw-block-num", type=int, default=1) + parser.add_argument("--arg-spec", type=Path) + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--ptoas-root", type=Path) + parser.add_argument("--source-cpp", type=Path) + parser.add_argument("--kernel-base-name") + parser.add_argument("--aicore-arch") + parser.add_argument("--kernel-symbol") + return parser + + +def _run_simpler(args: argparse.Namespace): + if args.test_module is None or args.case is None: + raise ValueError("simpler backend requires test_module and --case") + context = load_scene_case(args.test_module, args.case) + kernel = select_kernel(context, args.kernel, args.func_id, args.kernel_source) + validate_single_task_kernel(kernel) + trace_args = resolve_args(context, kernel, args.arg_spec) + output_dir = args.output_dir or _default_output_dir(args.case, kernel.name) + config = TraceConfig( + backend=TraceBackend.SIMPLER, + test_module=args.test_module.resolve(), + case_name=args.case, + kernel_spec=kernel, + args=trace_args, + output_dir=output_dir, + repo_root=PROJECT_ROOT, + cann_home=args.cann_home, + pto_isa_root=_pto_isa_root(args.pto_isa_root), + soc_version=args.soc_version, + device=args.device, + launch_count=args.launch_count, + timeout=args.timeout, + hw_block_num=args.hw_block_num, + dry_run=args.dry_run, + ) + result = create_workspace(config) + if args.dry_run: + return result + return run_workspace(config) + + +def _run_ptoas(args: argparse.Namespace): + if args.ptoas_root is None or args.source_cpp is None or args.kernel_base_name is None or args.aicore_arch is None: + raise ValueError("ptoas backend requires --ptoas-root, --source-cpp, --kernel-base-name, and --aicore-arch") + testcase = f"{args.kernel_base_name}_msprof" + config = PtoasTraceConfig( + ptoas_root=args.ptoas_root.resolve(), + source_cpp=args.source_cpp.resolve(), + testcase_name=testcase, + kernel_base_name=args.kernel_base_name, + aicore_arch=args.aicore_arch, + output_dir=args.output_dir or _default_output_dir("ptoas", args.kernel_base_name), + cann_home=args.cann_home, + pto_isa_root=_pto_isa_root(args.pto_isa_root), + soc_version=args.soc_version, + timeout=args.timeout, + launch_count=args.launch_count, + kernel_symbol=args.kernel_symbol, + ) + if args.dry_run: + workspace = generate_ptoas_workspace(config) + return type("DryRunResult", (), {"workspace_dir": workspace.run_root, "simulator_dir": None})() + return collect_ptoas_trace(config) + + +def _default_output_dir(case_name: str, kernel_name: str) -> Path: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + safe_case = case_name.replace("/", "_") + safe_kernel = kernel_name.replace("/", "_") + return PROJECT_ROOT / "outputs" / f"insight_trace_{safe_case}_{safe_kernel}_{timestamp}" + + +def _pto_isa_root(path: Path | None) -> Path: + if path is not None: + return path.resolve() + return Path(ensure_pto_isa_root()).resolve() + + +def _default_cann_home() -> Path | None: + value = os.environ.get("CANN_HOME") or os.environ.get("ASCEND_HOME_PATH") + return Path(value) if value else None + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/simpler_setup/insight_trace/kernel_analyzer.py b/simpler_setup/insight_trace/kernel_analyzer.py new file mode 100644 index 000000000..0af32276f --- /dev/null +++ b/simpler_setup/insight_trace/kernel_analyzer.py @@ -0,0 +1,84 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +from __future__ import annotations + +import re +from pathlib import Path + +from .models import KernelShape, KernelSpec, SceneCaseContext + +_ARG_READ_RE = re.compile(r"args\[(\d+)\]") + + +def select_kernel( + context: SceneCaseContext, + kernel_name: str | None = None, + func_id: int | None = None, + kernel_source: str | None = None, +) -> KernelSpec: + selectors = [kernel_name is not None, func_id is not None, kernel_source is not None] + if sum(selectors) != 1: + raise ValueError("Exactly one of --kernel, --func-id, or --kernel-source is required") + + incores = context.callable_spec.get("incores", []) + selected = None + for entry in incores: + source = Path(entry.get("source", "")) + if kernel_name is not None and entry.get("name") == kernel_name: + selected = entry + break + if func_id is not None and entry.get("func_id") == func_id: + selected = entry + break + if kernel_source is not None: + requested = Path(kernel_source) + if source == requested or source.name == requested.name or source.as_posix().endswith(requested.as_posix()): + selected = entry + break + if selected is None: + raise ValueError("Kernel selector did not match any CALLABLE['incores'] entry") + + source_path = Path(selected["source"]).resolve() + shape = classify_kernel(selected.get("core_type", ""), source_path) + return KernelSpec( + name=selected.get("name", source_path.stem), + func_id=int(selected["func_id"]), + core_type=selected.get("core_type", ""), + source_path=source_path, + shape=shape, + ) + + +def classify_kernel(core_type: str, source_path: Path) -> KernelShape: + source = source_path.read_text() + if "SPMD_LOCAL_CONTEXT_INDEX" in source or "SPMD_GLOBAL_CONTEXT_INDEX" in source: + return KernelShape.SPMD_MIX + if "args[48]" in source or "args[49]" in source: + return KernelShape.SPMD_MIX + if core_type == "aic": + return KernelShape.AIC_ONLY + if core_type == "aiv": + return KernelShape.AIV_ONLY + if "/kernels/aic/" in source_path.as_posix(): + return KernelShape.AIC_ONLY + if "/kernels/aiv/" in source_path.as_posix(): + return KernelShape.AIV_ONLY + raise ValueError(f"Cannot classify kernel core type: {core_type!r}") + + +def read_arg_indices(source_path: Path) -> set[int]: + return {int(match.group(1)) for match in _ARG_READ_RE.finditer(source_path.read_text())} + + +def validate_single_task_kernel(kernel: KernelSpec) -> None: + if kernel.shape == KernelShape.SPMD_MIX: + raise ValueError("SPMD mix kernels are not supported by the simpler backend yet") + source = kernel.source_path.read_text() + if "kernel_entry" not in source or "int64_t *args" not in source: + raise ValueError(f"Kernel does not look like kernel_entry(args): {kernel.source_path}") diff --git a/simpler_setup/insight_trace/models.py b/simpler_setup/insight_trace/models.py new file mode 100644 index 000000000..00993630f --- /dev/null +++ b/simpler_setup/insight_trace/models.py @@ -0,0 +1,121 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import Enum +from pathlib import Path +from typing import Any, Optional, Union + + +class KernelShape(str, Enum): + AIC_ONLY = "aic-only" + AIV_ONLY = "aiv-only" + SPMD_MIX = "spmd-mix" + + +class TraceBackend(str, Enum): + SIMPLER = "simpler" + PTOAS = "ptoas" + + +@dataclass(frozen=True) +class KernelSpec: + name: str + func_id: int + core_type: str + source_path: Path + shape: KernelShape + + +@dataclass(frozen=True) +class TraceTensorArg: + index: int + name: str + dtype: str + shape: tuple[int, ...] + role: str = "input" + fill: str = "zero" + + +@dataclass(frozen=True) +class TraceScalarArg: + index: int + name: str + dtype: str + value: Union[int, float] + pack_mode: str = "value" + + +TraceArg = Union[TraceTensorArg, TraceScalarArg] + + +@dataclass(frozen=True) +class TraceConfig: + backend: TraceBackend + test_module: Optional[Path] + case_name: Optional[str] + kernel_spec: Optional[KernelSpec] + args: tuple[TraceArg, ...] + output_dir: Path + repo_root: Path + cann_home: Optional[Path] + pto_isa_root: Optional[Path] + soc_version: str = "dav_2201" + device: int = 0 + launch_count: int = 1 + timeout: int = 120 + hw_block_num: int = 1 + dry_run: bool = False + metadata: dict[str, Any] = field(default_factory=dict) + + +@dataclass(frozen=True) +class TraceResult: + workspace_dir: Path + simulator_dir: Optional[Path] + collect_log: Optional[Path] + export_log: Optional[Path] + + +@dataclass(frozen=True) +class SceneCaseContext: + test_class: type + case: dict[str, Any] + callable_spec: dict[str, Any] + test_module: Path + module_dir: Path + runtime: str + + +@dataclass(frozen=True) +class PtoasTraceConfig: + ptoas_root: Path + source_cpp: Path + testcase_name: str + kernel_base_name: str + aicore_arch: str + output_dir: Path + cann_home: Optional[Path] + pto_isa_root: Optional[Path] + soc_version: str = "dav_2201" + timeout: int = 120 + launch_count: int = 1 + kernel_symbol: Optional[str] = None + + +@dataclass(frozen=True) +class PtoasWorkspace: + run_root: Path + case_root: Path + case_dir: Path + build_dir: Path + application: Path + kernel_lib: Path + kernel_symbol: str diff --git a/simpler_setup/insight_trace/ptoas_backend.py b/simpler_setup/insight_trace/ptoas_backend.py new file mode 100644 index 000000000..fb1122e97 --- /dev/null +++ b/simpler_setup/insight_trace/ptoas_backend.py @@ -0,0 +1,194 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +from __future__ import annotations + +import os +import shutil +import subprocess +from pathlib import Path + +from .models import PtoasTraceConfig, PtoasWorkspace, TraceResult +from .runner import find_simulator_dir, validate_simulator_dir + + +def generate_ptoas_workspace(config: PtoasTraceConfig) -> PtoasWorkspace: + case_root = config.output_dir / "cases" + case_dir = case_root / "ptoas" / config.testcase_name + build_dir = config.output_dir / "build" + generator = config.ptoas_root / "test" / "npu_validation" / "scripts" / "generate_testcase.py" + if not generator.is_file(): + raise FileNotFoundError(f"PTOAS testcase generator not found: {generator}") + + _run( + [ + "python3", + str(generator), + "--input", + str(config.source_cpp), + "--testcase", + config.testcase_name, + "--output-root", + str(case_root), + "--run-mode", + "sim", + "--soc-version", + config.soc_version, + "--aicore-arch", + config.aicore_arch, + ], + cwd=config.ptoas_root, + ) + env = _env(config, build_dir) + _run( + [ + "cmake", + "-G", + "Ninja", + "-S", + str(case_dir), + "-B", + str(build_dir), + f"-DSOC_VERSION={config.soc_version}", + f"-DPTO_ISA_ROOT={config.pto_isa_root or ''}", + ], + env=env, + ) + _run(["cmake", "--build", str(build_dir), "--target", f"{config.testcase_name}_sim"], env=env) + + kernel_lib = build_dir / f"lib{config.testcase_name}_kernel.so" + symbol = config.kernel_symbol or resolve_kernel_symbol(kernel_lib, config.kernel_base_name) + return PtoasWorkspace( + run_root=config.output_dir, + case_root=case_root, + case_dir=case_dir, + build_dir=build_dir, + application=build_dir / f"{config.testcase_name}_sim", + kernel_lib=kernel_lib, + kernel_symbol=symbol, + ) + + +def collect_ptoas_trace(config: PtoasTraceConfig) -> TraceResult: + workspace = generate_ptoas_workspace(config) + _prepare_inputs(workspace.case_dir) + collect_root = config.output_dir / "msprof_collect" + export_root = config.output_dir / "insight_export" + collect_root.mkdir(parents=True, exist_ok=True) + export_root.mkdir(parents=True, exist_ok=True) + collect_root.chmod(0o700) + export_root.chmod(0o700) + env = _env(config, workspace.build_dir) + collect_log = collect_root / "msprof_collect.log" + with collect_log.open("w") as log: + result = subprocess.run( + [ + "msprof", + "op", + "simulator", + f"--application={workspace.application}", + f"--kernel-name={workspace.kernel_symbol}", + f"--launch-count={config.launch_count}", + f"--soc-version={config.soc_version}", + f"--timeout={config.timeout}", + f"--output={collect_root / 'out'}", + ], + cwd=workspace.case_dir, + env=env, + stdout=log, + stderr=subprocess.STDOUT, + text=True, + check=False, + ) + if result.returncode != 0: + raise RuntimeError(f"PTOAS msprof collect failed; see {collect_log}") + export_src = _export_source(collect_root / "out") + export_log = export_root / "msprof_export.log" + with export_log.open("w") as log: + result = subprocess.run( + ["msprof", "op", "simulator", f"--export={export_src}", f"--output={export_root}"], + env=env, + stdout=log, + stderr=subprocess.STDOUT, + text=True, + check=False, + ) + if result.returncode != 0: + raise RuntimeError(f"PTOAS msprof export failed; see {export_log}") + simulator_dir = find_simulator_dir(export_root) + validate_simulator_dir(simulator_dir) + return TraceResult(config.output_dir, simulator_dir, collect_log, export_log) + + +def resolve_kernel_symbol(kernel_lib: Path, kernel_base_name: str) -> str: + result = subprocess.run(["nm", "-D", str(kernel_lib)], check=True, capture_output=True, text=True) + candidates = [] + for line in result.stdout.splitlines(): + parts = line.split() + if len(parts) < 3 or parts[-2] not in {"T", "W"}: + continue + symbol = parts[-1] + demangled = subprocess.run(["c++filt", symbol], check=False, capture_output=True, text=True).stdout.strip() + if demangled.startswith(f"{kernel_base_name}("): + candidates.append(symbol) + if len(candidates) != 1: + raise ValueError(f"Expected one symbol for {kernel_base_name}, found {len(candidates)}: {candidates}") + return candidates[0] + + +def _prepare_inputs(case_dir: Path) -> None: + golden = case_dir / "golden.py" + if golden.is_file(): + result = subprocess.run(["python3", str(golden)], cwd=case_dir, check=False) + if result.returncode == 0: + return + main_cpp = case_dir / "main.cpp" + if not main_cpp.is_file(): + return + for line in main_cpp.read_text().splitlines(): + if ".bin" not in line or "fileSize_" not in line: + continue + + +def _export_source(out_dir: Path) -> Path: + opprofs = sorted(out_dir.glob("OPPROF_*")) + if not opprofs: + raise FileNotFoundError(f"No OPPROF dir under {out_dir}") + opprof = opprofs[-1] + tmp_dump = opprof / "device0" / "tmp_dump" + if tmp_dump.is_dir(): + pc_start = next((opprof / "device0").glob("**/dump/pc_start_addr.txt"), None) + if pc_start and not (tmp_dump / "pc_start_addr.txt").exists(): + shutil.copy2(pc_start, tmp_dump / "pc_start_addr.txt") + return tmp_dump + dump = opprof / "dump" + if dump.is_dir(): + return dump + raise FileNotFoundError(f"No tmp_dump or dump under {opprof}") + + +def _env(config: PtoasTraceConfig, build_dir: Path) -> dict[str, str]: + env = os.environ.copy() + if config.cann_home is not None: + env["CANN_HOME"] = str(config.cann_home) + env["ASCEND_HOME_PATH"] = str(config.cann_home) + if config.pto_isa_root is not None: + env["PTO_ISA_ROOT"] = str(config.pto_isa_root) + cann_home = env.get("ASCEND_HOME_PATH") or env.get("CANN_HOME") + if cann_home: + sim_lib = Path(cann_home) / "aarch64-linux" / "simulator" / config.soc_version / "lib" + env["LD_LIBRARY_PATH"] = ":".join( + [str(build_dir), str(sim_lib), str(Path(cann_home) / "lib64"), env.get("LD_LIBRARY_PATH", "")] + ) + return env + + +def _run(cmd: list[str], cwd: Path | None = None, env: dict[str, str] | None = None) -> None: + result = subprocess.run(cmd, cwd=cwd, env=env, check=False, text=True) + if result.returncode != 0: + raise RuntimeError(f"Command failed: {' '.join(cmd)}") diff --git a/simpler_setup/insight_trace/runner.py b/simpler_setup/insight_trace/runner.py new file mode 100644 index 000000000..349b9efc9 --- /dev/null +++ b/simpler_setup/insight_trace/runner.py @@ -0,0 +1,53 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +from __future__ import annotations + +import os +import subprocess +from pathlib import Path + +from .models import TraceConfig, TraceResult + + +def run_workspace(config: TraceConfig) -> TraceResult: + script = config.output_dir / "run_collect.sh" + env = os.environ.copy() + if config.cann_home is not None: + env["CANN_HOME"] = str(config.cann_home) + if config.pto_isa_root is not None: + env["PTO_ISA_ROOT"] = str(config.pto_isa_root) + env["REPO_ROOT"] = str(config.repo_root) + result = subprocess.run(["bash", str(script)], cwd=str(config.output_dir), env=env, check=False, text=True) + if result.returncode != 0: + raise RuntimeError(f"insight trace collection failed; see {config.output_dir / 'msprof_collect'}") + simulator_dir = find_simulator_dir(config.output_dir / "insight_export") + validate_simulator_dir(simulator_dir) + return TraceResult( + workspace_dir=config.output_dir, + simulator_dir=simulator_dir, + collect_log=config.output_dir / "msprof_collect" / "msprof_collect.log", + export_log=config.output_dir / "insight_export" / "msprof_export.log", + ) + + +def find_simulator_dir(export_root: Path) -> Path: + candidates = sorted(export_root.glob("OPPROF_*/simulator")) + if not candidates: + raise FileNotFoundError(f"No OPPROF simulator directory under {export_root}") + return candidates[-1] + + +def validate_simulator_dir(simulator_dir: Path) -> None: + required = [simulator_dir / "trace.json", simulator_dir / "visualize_data.bin"] + missing = [path for path in required if not path.is_file()] + if missing: + names = ", ".join(str(path) for path in missing) + raise FileNotFoundError(f"Missing Insight trace artifacts: {names}") + if not list(simulator_dir.glob("core*/*instr_exe*.csv")): + raise FileNotFoundError(f"No instr_exe CSV files under {simulator_dir}") diff --git a/simpler_setup/insight_trace/templates.py b/simpler_setup/insight_trace/templates.py new file mode 100644 index 000000000..1f9f16b83 --- /dev/null +++ b/simpler_setup/insight_trace/templates.py @@ -0,0 +1,348 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +from __future__ import annotations + +from functools import reduce +from operator import mul + +from .models import KernelShape, TraceConfig, TraceScalarArg, TraceTensorArg + +_DTYPE_CPP = { + "FLOAT32": "DataType::FLOAT32", + "BFLOAT16": "DataType::BFLOAT16", + "INT32": "DataType::INT32", + "UINT64": "DataType::UINT64", +} +_DTYPE_SIZE = { + "FLOAT32": 4, + "BFLOAT16": 2, + "INT32": 4, + "UINT64": 8, +} + + +def render_kernel(config: TraceConfig) -> str: + kernel = _require_kernel(config) + include_guard = "__DAV_CUBE__" if kernel.shape == KernelShape.AIC_ONLY else "__DAV_VEC__" + return f"""#include + +#ifndef AICORE +#define AICORE [aicore] +#endif + +#if defined({include_guard}) +#ifndef __CCE_AICORE__ +#define __CCE_AICORE__ 220 +#endif +#include +#ifndef PTO_NPU_ARCH_A2A3 +#define PTO_NPU_ARCH_A2A3 +#endif +#ifndef EVENT_ID7 +#define EVENT_ID7 ((event_t)7) +#endif +#ifndef PIPE_FIX +#define PIPE_FIX ((pipe_t)10) +#endif +#include \"{kernel.source_path}\" +#endif + +extern "C" __global__ AICORE void replay_entry(__gm__ int64_t *args) {{ +#if defined({include_guard}) + kernel_entry(args); +#endif +}} +""" + + +def render_launch(config: TraceConfig) -> str: + return f"""#include +#ifndef AICORE +#define AICORE [aicore] +#endif + +extern "C" __global__ AICORE void replay_entry(__gm__ int64_t *args); + +extern "C" void launch_replay(void *args, void *stream) {{ + replay_entry<<<{config.hw_block_num}, nullptr, stream>>>((__gm__ int64_t *)args); +}} +""" + + +def render_host(config: TraceConfig) -> str: + tensors = [arg for arg in config.args if isinstance(arg, TraceTensorArg)] + scalars = [arg for arg in config.args if isinstance(arg, TraceScalarArg)] + constants = [] + allocs = [] + tensor_inits = [] + frees = [] + for i, arg in enumerate(tensors): + elements = reduce(mul, arg.shape, 1) + size = elements * _DTYPE_SIZE[arg.dtype] + shape_name = f"shape_{arg.name}" + constants.append(f"constexpr int k{_camel(arg.name)}Bytes = {size};") + constants.append(f"uint32_t {shape_name}[{len(arg.shape)}] = {{{', '.join(str(dim) for dim in arg.shape)}}};") + allocs.append(f" void *d_{arg.name} = nullptr;") + allocs.append(f" ACL_CHECK(aclrtMalloc(&d_{arg.name}, k{_camel(arg.name)}Bytes, ACL_MEM_MALLOC_HUGE_FIRST));") + allocs.append(f" ACL_CHECK(aclrtMemset(d_{arg.name}, k{_camel(arg.name)}Bytes, 0, k{_camel(arg.name)}Bytes));") + tensor_inits.append( + f" tensors[{i}] = make_tensor_external(d_{arg.name}, {shape_name}, {len(arg.shape)}, {_DTYPE_CPP[arg.dtype]});" + ) + frees.append(f" ACL_CHECK(aclrtFree(d_{arg.name}));") + + arg_assigns = [] + for i, arg in enumerate(tensors): + arg_assigns.append( + f" args[{arg.index}] = static_cast(reinterpret_cast(d_tensors) + {i} * sizeof(Tensor));" + ) + for arg in scalars: + arg_assigns.append(f" args[{arg.index}] = static_cast({int(arg.value)});") + + return f"""#include +#include +#include +#include + +#include "acl/acl.h" +#include "pto_orchestration_api.h" + +constexpr int kArgsSlots = 50; +constexpr int kNumTensors = {len(tensors)}; +{chr(10).join(constants)} + +#define ACL_CHECK(expr) \\ + do {{ \\ + aclError _err = (expr); \\ + if (_err != ACL_SUCCESS) {{ \\ + fprintf(stderr, "ACL error %d at %s:%d\\n", _err, __FILE__, __LINE__); \\ + exit(1); \\ + }} \\ + }} while (0) + +extern "C" void launch_replay(void *args, void *stream); + +int main() {{ + ACL_CHECK(aclInit(nullptr)); + int device_id = 0; + if (const char *env = getenv("ACL_DEVICE_ID")) {{ + device_id = atoi(env); + }} + ACL_CHECK(aclrtSetDevice(device_id)); + + aclrtStream stream; + ACL_CHECK(aclrtCreateStream(&stream)); + +{chr(10).join(allocs)} + + void *tensors_mem = malloc(kNumTensors * sizeof(Tensor)); + Tensor *tensors = static_cast(tensors_mem); +{chr(10).join(tensor_inits)} + + void *d_tensors = nullptr; + ACL_CHECK(aclrtMalloc(&d_tensors, kNumTensors * sizeof(Tensor), ACL_MEM_MALLOC_HUGE_FIRST)); + ACL_CHECK(aclrtMemcpy(d_tensors, kNumTensors * sizeof(Tensor), tensors, kNumTensors * sizeof(Tensor), ACL_MEMCPY_HOST_TO_DEVICE)); + + std::array args{{}}; +{chr(10).join(arg_assigns)} + + void *d_args = nullptr; + ACL_CHECK(aclrtMalloc(&d_args, sizeof(args), ACL_MEM_MALLOC_HUGE_FIRST)); + ACL_CHECK(aclrtMemcpy(d_args, sizeof(args), args.data(), sizeof(args), ACL_MEMCPY_HOST_TO_DEVICE)); + + launch_replay(d_args, stream); + ACL_CHECK(aclrtSynchronizeStream(stream)); + + ACL_CHECK(aclrtFree(d_args)); + ACL_CHECK(aclrtFree(d_tensors)); + free(tensors_mem); +{chr(10).join(reversed(frees))} + ACL_CHECK(aclrtDestroyStream(stream)); + ACL_CHECK(aclrtResetDevice(device_id)); + ACL_CHECK(aclFinalize()); + + printf("Replay completed successfully.\\n"); + return 0; +}} +""" + + +def render_cmake(config: TraceConfig) -> str: + return """cmake_minimum_required(VERSION 3.16) + +set(CMAKE_C_COMPILER bisheng) +set(CMAKE_CXX_COMPILER bisheng) + +project(insight_trace_replay) + +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(CMAKE_POSITION_INDEPENDENT_CODE ON) + +if(NOT DEFINED ENV{ASCEND_HOME_PATH}) + message(FATAL_ERROR "ASCEND_HOME_PATH is not set (source CANN set_env.sh first)") +endif() +set(ASCEND_HOME_PATH $ENV{ASCEND_HOME_PATH}) +set(SOC_VERSION dav_2201 CACHE STRING "Simulator SoC version") +set(PTO_ISA_ROOT $ENV{PTO_ISA_ROOT} CACHE PATH "PTO ISA root") +set(REPO_ROOT $ENV{REPO_ROOT} CACHE PATH "simpler repo root") + +add_compile_options( + -D_FORTIFY_SOURCE=2 -O2 -std=c++17 + -Wno-macro-redefined -Wno-ignored-attributes + -fstack-protector-strong -fPIC +) +add_link_options(-s -Wl,-z,relro -Wl,-z,now) + +set(CMAKE_CCE_COMPILE_OPTIONS + -xcce -fenable-matrix --cce-aicore-enable-tl -fPIC + -Xhost-start -Xhost-end + "SHELL:-mllvm -cce-aicore-stack-size=0x8000" + "SHELL:-mllvm -cce-aicore-function-stack-size=0x8000" + "SHELL:-mllvm -cce-aicore-record-overflow=true" + "SHELL:-mllvm -cce-aicore-addr-transform" + "SHELL:-mllvm -cce-aicore-dcci-insert-for-scalar=false" +) +set(CMAKE_CPP_COMPILE_OPTIONS + -xc++ + "SHELL:-include stdint.h" + "SHELL:-include stddef.h" +) + +set(COMMON_INCLUDES + ${PTO_ISA_ROOT}/include + ${PTO_ISA_ROOT}/include/pto + ${REPO_ROOT}/src/a2a3/runtime/tensormap_and_ringbuffer/runtime + ${REPO_ROOT}/src/a2a3/runtime/tensormap_and_ringbuffer/common + ${REPO_ROOT}/src/a2a3/runtime/tensormap_and_ringbuffer/orchestration + ${REPO_ROOT}/src/common/task_interface + ${REPO_ROOT}/src/a2a3/platform/include + ${REPO_ROOT}/simpler_setup/incore + ${ASCEND_HOME_PATH}/pkg_inc + ${ASCEND_HOME_PATH}/pkg_inc/profiling + ${ASCEND_HOME_PATH}/pkg_inc/runtime/runtime + ${ASCEND_HOME_PATH}/include +) + +add_library(replay_kernel SHARED replay_kernel.cpp replay_launch.cpp) +target_compile_options(replay_kernel PRIVATE + ${CMAKE_CCE_COMPILE_OPTIONS} + --cce-aicore-arch=dav-c220 + -DREGISTER_BASE -std=c++17) +target_include_directories(replay_kernel PRIVATE ${COMMON_INCLUDES}) +target_link_options(replay_kernel PRIVATE --cce-fatobj-link) + +add_executable(replay_host replay_host.cpp) +target_compile_options(replay_host PRIVATE ${CMAKE_CPP_COMPILE_OPTIONS}) +target_include_directories(replay_host PRIVATE ${COMMON_INCLUDES}) +target_link_directories(replay_host PUBLIC + ${ASCEND_HOME_PATH}/lib64 + ${ASCEND_HOME_PATH}/aarch64-linux/simulator/${SOC_VERSION}/lib +) +target_link_libraries(replay_host PRIVATE + replay_kernel + runtime_camodel + stdc++ ascendcl m tiling_api platform c_sec dl nnopbase +) +""" + + +def render_run_collect(config: TraceConfig) -> str: + cann_default = str(config.cann_home) if config.cann_home else "" + pto_default = str(config.pto_isa_root) if config.pto_isa_root else "" + return f"""#!/usr/bin/env bash +set -euo pipefail + +CANN_HOME="${{CANN_HOME:-{cann_default}}}" +PTO_ISA_ROOT="${{PTO_ISA_ROOT:-{pto_default}}}" +REPO_ROOT="${{REPO_ROOT:-{config.repo_root}}}" +: "${{CANN_HOME:?CANN_HOME must be set}}" +: "${{PTO_ISA_ROOT:?PTO_ISA_ROOT must be set}}" +: "${{REPO_ROOT:?REPO_ROOT must be set}}" + +WS="$(cd "$(dirname "$(readlink -f "$0")")" && pwd)" +SOC_VERSION="${{SOC_VERSION:-{config.soc_version}}}" +DEVICE_ID="${{TARGET_DEVICE_ID:-${{NPU_LOCKED_DEVICE:-{config.device}}}}}" +BUILD_DIR="$WS/build" +COLLECT_DIR="$WS/msprof_collect" +EXPORT_ROOT="$WS/insight_export" + +source "$CANN_HOME/../cann/set_env.sh" 2>/dev/null \ + || source "$CANN_HOME/set_env.sh" +export ASCEND_HOME_PATH="$CANN_HOME" +SIM_LIB_DIR="$CANN_HOME/aarch64-linux/simulator/$SOC_VERSION/lib" +export LD_LIBRARY_PATH="$BUILD_DIR:$SIM_LIB_DIR:$CANN_HOME/lib64:$CANN_HOME/aarch64-linux/devlib:$CANN_HOME/devlib:${{LD_LIBRARY_PATH:-}}" +export ACL_DEVICE_ID="$DEVICE_ID" +mkdir -p "$BUILD_DIR" "$COLLECT_DIR" "$EXPORT_ROOT" +chmod 700 "$COLLECT_DIR" "$EXPORT_ROOT" + +cmake -G Ninja -S "$WS" -B "$BUILD_DIR" \ + -DSOC_VERSION="$SOC_VERSION" \ + -DPTO_ISA_ROOT="$PTO_ISA_ROOT" \ + -DREPO_ROOT="$REPO_ROOT" +cmake --build "$BUILD_DIR" --target replay_host + +msprof op simulator \ + --application="$BUILD_DIR/replay_host" \ + --kernel-name="replay_entry" \ + --launch-count={config.launch_count} \ + --soc-version="$SOC_VERSION" \ + --timeout={config.timeout} \ + --output="$COLLECT_DIR/out" \ + 2>&1 | tee "$COLLECT_DIR/msprof_collect.log" + +OPPROF_DIR="$(find "$COLLECT_DIR/out" -maxdepth 1 -mindepth 1 -type d -name 'OPPROF_*' | sort | tail -n 1)" +test -n "$OPPROF_DIR" +if [[ -d "$OPPROF_DIR/device0/tmp_dump" ]]; then + EXPORT_SRC="$OPPROF_DIR/device0/tmp_dump" +else + EXPORT_SRC="$OPPROF_DIR/dump" +fi + +msprof op simulator --export="$EXPORT_SRC" --output="$EXPORT_ROOT" \ + 2>&1 | tee "$EXPORT_ROOT/msprof_export.log" +""" + + +def render_config(config: TraceConfig) -> dict: + kernel = _require_kernel(config) + return { + "backend": config.backend.value, + "test_module": str(config.test_module) if config.test_module else None, + "case": config.case_name, + "kernel": { + "name": kernel.name, + "func_id": kernel.func_id, + "core_type": kernel.core_type, + "source": str(kernel.source_path), + "shape": kernel.shape.value, + }, + "replay": { + "hw_block_num": config.hw_block_num, + "soc_version": config.soc_version, + "timeout": config.timeout, + "launch_count": config.launch_count, + }, + "args": [_arg_to_json(arg) for arg in config.args], + } + + +def _arg_to_json(arg: TraceTensorArg | TraceScalarArg) -> dict: + if isinstance(arg, TraceTensorArg): + return {"index": arg.index, "kind": "tensor", "dtype": arg.dtype, "shape": list(arg.shape), "name": arg.name} + return {"index": arg.index, "kind": "scalar", "dtype": arg.dtype, "value": arg.value, "name": arg.name} + + +def _require_kernel(config: TraceConfig): + if config.kernel_spec is None: + raise ValueError("simpler backend requires a kernel spec") + return config.kernel_spec + + +def _camel(name: str) -> str: + return "".join(part.capitalize() for part in name.split("_")) diff --git a/simpler_setup/insight_trace/workspace.py b/simpler_setup/insight_trace/workspace.py new file mode 100644 index 000000000..57937f593 --- /dev/null +++ b/simpler_setup/insight_trace/workspace.py @@ -0,0 +1,39 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +from __future__ import annotations + +import json +import stat +from pathlib import Path + +from .models import TraceConfig, TraceResult +from .templates import render_cmake, render_config, render_host, render_kernel, render_launch, render_run_collect + + +def create_workspace(config: TraceConfig) -> TraceResult: + config.output_dir.mkdir(parents=True, exist_ok=True) + files = { + "replay_kernel.cpp": render_kernel(config), + "replay_launch.cpp": render_launch(config), + "replay_host.cpp": render_host(config), + "CMakeLists.txt": render_cmake(config), + "run_collect.sh": render_run_collect(config), + "insight_trace_config.json": json.dumps(render_config(config), indent=2) + "\n", + } + for name, content in files.items(): + path = config.output_dir / name + path.write_text(content) + if name == "run_collect.sh": + path.chmod(path.stat().st_mode | stat.S_IXUSR) + return TraceResult( + workspace_dir=config.output_dir, + simulator_dir=None, + collect_log=config.output_dir / "msprof_collect" / "msprof_collect.log", + export_log=config.output_dir / "insight_export" / "msprof_export.log", + ) diff --git a/simpler_setup/tools/insight_trace.py b/simpler_setup/tools/insight_trace.py new file mode 100644 index 000000000..61af69179 --- /dev/null +++ b/simpler_setup/tools/insight_trace.py @@ -0,0 +1,12 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +from simpler_setup.insight_trace.cli import main + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/ut/py/test_insight_trace_core.py b/tests/ut/py/test_insight_trace_core.py new file mode 100644 index 000000000..9bcb984e6 --- /dev/null +++ b/tests/ut/py/test_insight_trace_core.py @@ -0,0 +1,44 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +from pathlib import Path + +from simpler_setup.insight_trace.arg_resolver import resolve_args +from simpler_setup.insight_trace.case_loader import load_scene_case +from simpler_setup.insight_trace.kernel_analyzer import select_kernel +from simpler_setup.insight_trace.models import KernelShape, TraceScalarArg, TraceTensorArg + + +def _paged_attention_module() -> Path: + return Path(__file__).resolve().parents[3] / "examples/a2a3/tensormap_and_ringbuffer/paged_attention/test_paged_attention.py" + + +def test_loads_paged_attention_case(): + context = load_scene_case(_paged_attention_module(), "CaseSmall1") + assert context.case["params"]["block_size"] == 16 + assert context.runtime == "tensormap_and_ringbuffer" + + +def test_selects_sf_as_aiv_only(): + context = load_scene_case(_paged_attention_module(), "CaseSmall1") + kernel = select_kernel(context, kernel_name="SF") + assert kernel.func_id == 1 + assert kernel.shape == KernelShape.AIV_ONLY + + +def test_resolves_sf_recipe(): + context = load_scene_case(_paged_attention_module(), "CaseSmall1") + kernel = select_kernel(context, kernel_name="SF") + args = resolve_args(context, kernel) + assert args[:4] == ( + TraceTensorArg(0, "sij", "FLOAT32", (16, 16)), + TraceTensorArg(1, "pij", "BFLOAT16", (16, 16)), + TraceTensorArg(2, "mij", "FLOAT32", (16,)), + TraceTensorArg(3, "lij", "FLOAT32", (16,)), + ) + assert args[4] == TraceScalarArg(4, "scale", "FLOAT32_BITS", 1065353216, "bits") From e3e877d84d1d7885d13bb9a1bc31e0b640cacfef Mon Sep 17 00:00:00 2001 From: vegetabledoww <17863935975@163.com> Date: Tue, 19 May 2026 16:48:34 +0800 Subject: [PATCH 2/3] Fix insight trace review findings Tighten generated argument handling and PTOAS symbol/input processing so invalid replay specs fail early and float bit scalars preserve their intended encoding. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- simpler_setup/insight_trace/arg_resolver.py | 8 +++- .../insight_trace/kernel_analyzer.py | 12 +++-- simpler_setup/insight_trace/ptoas_backend.py | 40 ++++++++-------- simpler_setup/insight_trace/templates.py | 11 +++++ tests/ut/py/test_insight_trace_core.py | 46 ++++++++++++++++++- 5 files changed, 93 insertions(+), 24 deletions(-) diff --git a/simpler_setup/insight_trace/arg_resolver.py b/simpler_setup/insight_trace/arg_resolver.py index 885189e04..50044cddd 100644 --- a/simpler_setup/insight_trace/arg_resolver.py +++ b/simpler_setup/insight_trace/arg_resolver.py @@ -45,13 +45,17 @@ def _load_arg_spec(path: Path) -> tuple[TraceArg, ...]: ) ) elif item["kind"] == "scalar": + value = item["value"] + pack_mode = item.get("pack_mode", "value") + if (pack_mode == "bits" or item["dtype"] == "FLOAT32_BITS") and isinstance(value, float): + value = _f32_bits(value) result.append( TraceScalarArg( index=int(item["index"]), name=item["name"], dtype=item["dtype"], - value=item["value"], - pack_mode=item.get("pack_mode", "value"), + value=value, + pack_mode=pack_mode, ) ) else: diff --git a/simpler_setup/insight_trace/kernel_analyzer.py b/simpler_setup/insight_trace/kernel_analyzer.py index 0af32276f..ca1abef36 100644 --- a/simpler_setup/insight_trace/kernel_analyzer.py +++ b/simpler_setup/insight_trace/kernel_analyzer.py @@ -9,6 +9,7 @@ from __future__ import annotations import re +from functools import lru_cache from pathlib import Path from .models import KernelShape, KernelSpec, SceneCaseContext @@ -56,7 +57,7 @@ def select_kernel( def classify_kernel(core_type: str, source_path: Path) -> KernelShape: - source = source_path.read_text() + source = _read_source(source_path) if "SPMD_LOCAL_CONTEXT_INDEX" in source or "SPMD_GLOBAL_CONTEXT_INDEX" in source: return KernelShape.SPMD_MIX if "args[48]" in source or "args[49]" in source: @@ -73,12 +74,17 @@ def classify_kernel(core_type: str, source_path: Path) -> KernelShape: def read_arg_indices(source_path: Path) -> set[int]: - return {int(match.group(1)) for match in _ARG_READ_RE.finditer(source_path.read_text())} + return {int(match.group(1)) for match in _ARG_READ_RE.finditer(_read_source(source_path))} def validate_single_task_kernel(kernel: KernelSpec) -> None: if kernel.shape == KernelShape.SPMD_MIX: raise ValueError("SPMD mix kernels are not supported by the simpler backend yet") - source = kernel.source_path.read_text() + source = _read_source(kernel.source_path) if "kernel_entry" not in source or "int64_t *args" not in source: raise ValueError(f"Kernel does not look like kernel_entry(args): {kernel.source_path}") + + +@lru_cache(maxsize=None) +def _read_source(source_path: Path) -> str: + return source_path.resolve().read_text() diff --git a/simpler_setup/insight_trace/ptoas_backend.py b/simpler_setup/insight_trace/ptoas_backend.py index fb1122e97..c3b3506d9 100644 --- a/simpler_setup/insight_trace/ptoas_backend.py +++ b/simpler_setup/insight_trace/ptoas_backend.py @@ -127,32 +127,36 @@ def collect_ptoas_trace(config: PtoasTraceConfig) -> TraceResult: def resolve_kernel_symbol(kernel_lib: Path, kernel_base_name: str) -> str: result = subprocess.run(["nm", "-D", str(kernel_lib)], check=True, capture_output=True, text=True) - candidates = [] + symbols = [] for line in result.stdout.splitlines(): parts = line.split() - if len(parts) < 3 or parts[-2] not in {"T", "W"}: - continue - symbol = parts[-1] - demangled = subprocess.run(["c++filt", symbol], check=False, capture_output=True, text=True).stdout.strip() - if demangled.startswith(f"{kernel_base_name}("): - candidates.append(symbol) + if len(parts) >= 3 and parts[-2] in {"T", "W"}: + symbols.append(parts[-1]) + if not symbols: + raise ValueError(f"No exported text symbols found in {kernel_lib}") + + proc = subprocess.run( + ["c++filt"], + input="\n".join(symbols), + check=True, + capture_output=True, + text=True, + ) + demangled = proc.stdout.splitlines() + candidates = [symbol for symbol, name in zip(symbols, demangled) if name.startswith(f"{kernel_base_name}(")] if len(candidates) != 1: - raise ValueError(f"Expected one symbol for {kernel_base_name}, found {len(candidates)}: {candidates}") + matches = [f"{symbol}: {name}" for symbol, name in zip(symbols, demangled) if kernel_base_name in name] + raise ValueError(f"Expected one symbol for {kernel_base_name}, found {len(candidates)}: {matches}") return candidates[0] def _prepare_inputs(case_dir: Path) -> None: golden = case_dir / "golden.py" - if golden.is_file(): - result = subprocess.run(["python3", str(golden)], cwd=case_dir, check=False) - if result.returncode == 0: - return - main_cpp = case_dir / "main.cpp" - if not main_cpp.is_file(): - return - for line in main_cpp.read_text().splitlines(): - if ".bin" not in line or "fileSize_" not in line: - continue + if not golden.is_file(): + raise FileNotFoundError(f"PTOAS golden input generator not found: {golden}") + result = subprocess.run(["python3", str(golden)], cwd=case_dir, check=False) + if result.returncode != 0: + raise RuntimeError("golden.py failed; PTOAS zero-input fallback is not implemented") def _export_source(out_dir: Path) -> Path: diff --git a/simpler_setup/insight_trace/templates.py b/simpler_setup/insight_trace/templates.py index 1f9f16b83..664defdef 100644 --- a/simpler_setup/insight_trace/templates.py +++ b/simpler_setup/insight_trace/templates.py @@ -76,6 +76,7 @@ def render_launch(config: TraceConfig) -> str: def render_host(config: TraceConfig) -> str: + _validate_args(config.args) tensors = [arg for arg in config.args if isinstance(arg, TraceTensorArg)] scalars = [arg for arg in config.args if isinstance(arg, TraceScalarArg)] constants = [] @@ -338,6 +339,16 @@ def _arg_to_json(arg: TraceTensorArg | TraceScalarArg) -> dict: return {"index": arg.index, "kind": "scalar", "dtype": arg.dtype, "value": arg.value, "name": arg.name} +def _validate_args(args: tuple[TraceTensorArg | TraceScalarArg, ...]) -> None: + seen = set() + for arg in args: + if arg.index < 0 or arg.index >= 50: + raise ValueError(f"Arg index {arg.index} exceeds max slots (50)") + if arg.index in seen: + raise ValueError(f"Duplicate arg index {arg.index}") + seen.add(arg.index) + + def _require_kernel(config: TraceConfig): if config.kernel_spec is None: raise ValueError("simpler backend requires a kernel spec") diff --git a/tests/ut/py/test_insight_trace_core.py b/tests/ut/py/test_insight_trace_core.py index 9bcb984e6..465655e8b 100644 --- a/tests/ut/py/test_insight_trace_core.py +++ b/tests/ut/py/test_insight_trace_core.py @@ -8,10 +8,13 @@ # ----------------------------------------------------------------------------------------------------------- from pathlib import Path +import pytest + from simpler_setup.insight_trace.arg_resolver import resolve_args from simpler_setup.insight_trace.case_loader import load_scene_case from simpler_setup.insight_trace.kernel_analyzer import select_kernel -from simpler_setup.insight_trace.models import KernelShape, TraceScalarArg, TraceTensorArg +from simpler_setup.insight_trace.models import KernelShape, TraceBackend, TraceConfig, TraceScalarArg, TraceTensorArg +from simpler_setup.insight_trace.templates import render_host def _paged_attention_module() -> Path: @@ -42,3 +45,44 @@ def test_resolves_sf_recipe(): TraceTensorArg(3, "lij", "FLOAT32", (16,)), ) assert args[4] == TraceScalarArg(4, "scale", "FLOAT32_BITS", 1065353216, "bits") + + +def test_load_arg_spec_packs_float_bits(tmp_path): + spec = tmp_path / "args.json" + spec.write_text( + '{"args":[{"kind":"scalar","index":4,"name":"scale","dtype":"FLOAT32_BITS","value":1.0,"pack_mode":"bits"}]}' + ) + args = resolve_args(None, None, spec) + assert args == (TraceScalarArg(4, "scale", "FLOAT32_BITS", 1065353216, "bits"),) + + +def test_render_host_rejects_out_of_bounds_arg_index(tmp_path): + config = TraceConfig( + backend=TraceBackend.SIMPLER, + test_module=None, + case_name="case", + kernel_spec=None, + args=(TraceScalarArg(50, "bad", "UINT64", 1),), + output_dir=tmp_path, + repo_root=tmp_path, + cann_home=None, + pto_isa_root=None, + ) + with pytest.raises(ValueError, match="exceeds max slots"): + render_host(config) + + +def test_render_host_rejects_duplicate_arg_index(tmp_path): + config = TraceConfig( + backend=TraceBackend.SIMPLER, + test_module=None, + case_name="case", + kernel_spec=None, + args=(TraceScalarArg(1, "a", "UINT64", 1), TraceScalarArg(1, "b", "UINT64", 2)), + output_dir=tmp_path, + repo_root=tmp_path, + cann_home=None, + pto_isa_root=None, + ) + with pytest.raises(ValueError, match="Duplicate arg index"): + render_host(config) \ No newline at end of file From c7ba1b1b010249a7505e435cb5567b2068bee10a Mon Sep 17 00:00:00 2001 From: vegetabledoww <17863935975@163.com> Date: Wed, 20 May 2026 15:04:40 +0800 Subject: [PATCH 3/3] feat(insight_trace): support --dump-dir and --dispatch-id for kernel args dump input Read kernel_args_dump.json directly from a dump directory, select the matching func_id+dispatch_id dispatch, skip local/global context slots, and convert tensor/scalar args to TraceArg objects. This removes the manual --arg-spec JSON conversion step. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- simpler_setup/insight_trace/arg_resolver.py | 57 ++++++++++++++++++++- simpler_setup/insight_trace/cli.py | 6 ++- tests/ut/py/test_insight_trace_core.py | 34 +++++++++++- 3 files changed, 94 insertions(+), 3 deletions(-) diff --git a/simpler_setup/insight_trace/arg_resolver.py b/simpler_setup/insight_trace/arg_resolver.py index 50044cddd..d74b457c9 100644 --- a/simpler_setup/insight_trace/arg_resolver.py +++ b/simpler_setup/insight_trace/arg_resolver.py @@ -18,9 +18,19 @@ from .models import KernelSpec, SceneCaseContext, TraceArg, TraceScalarArg, TraceTensorArg -def resolve_args(context: SceneCaseContext, kernel: KernelSpec, arg_spec: Path | None = None) -> tuple[TraceArg, ...]: +def resolve_args( + context: SceneCaseContext, + kernel: KernelSpec, + arg_spec: Path | None = None, + dump_dir: Path | None = None, + dispatch_id: int | None = None, +) -> tuple[TraceArg, ...]: if arg_spec is not None: return _load_arg_spec(arg_spec) + if dump_dir is not None: + if dispatch_id is None: + raise ValueError("--dispatch-id is required with --dump-dir") + return load_kernel_dump_args(dump_dir, kernel.func_id, dispatch_id) args = _paged_attention_recipe(context, kernel) read_indices = read_arg_indices(kernel.source_path) missing = sorted(index for index in read_indices if index not in {arg.index for arg in args}) @@ -29,6 +39,51 @@ def resolve_args(context: SceneCaseContext, kernel: KernelSpec, arg_spec: Path | return args +def load_kernel_dump_args(dump_dir: Path, func_id: int, dispatch_id: int) -> tuple[TraceArg, ...]: + dump_path = _kernel_dump_path(dump_dir) + raw = json.loads(dump_path.read_text()) + dispatch = None + for item in raw.get("dispatches", []): + if int(item.get("func_id", -1)) == func_id and int(item.get("dispatch_id", -1)) == dispatch_id: + dispatch = item + break + if dispatch is None: + raise ValueError(f"No kernel args dump dispatch matches func_id={func_id}, dispatch_id={dispatch_id}") + + result: list[TraceArg] = [] + for item in dispatch.get("args", []): + index = int(item["arg_index"]) + kind = item["kind"] + if kind == "tensor": + result.append( + TraceTensorArg( + index=index, + name=f"arg{index}", + dtype=item["dtype"], + shape=tuple(int(dim) for dim in item["shape"]), + ) + ) + elif kind == "scalar": + value = item["value"] + pack_mode = item.get("pack_mode", "value") + if (pack_mode == "bits" or item["dtype"] == "FLOAT32_BITS") and isinstance(value, float): + value = _f32_bits(value) + result.append(TraceScalarArg(index, f"arg{index}", item["dtype"], value, pack_mode)) + elif kind in {"local_context", "global_context"}: + continue + else: + raise ValueError(f"Unknown kernel dump arg kind: {kind}") + return tuple(sorted(result, key=lambda arg: arg.index)) + + +def _kernel_dump_path(dump_dir: Path) -> Path: + candidates = (dump_dir / "kernel_args_dump.json", dump_dir / "tensor_dump" / "kernel_args_dump.json") + for path in candidates: + if path.is_file(): + return path + raise ValueError(f"kernel_args_dump.json not found under {dump_dir}") + + def _load_arg_spec(path: Path) -> tuple[TraceArg, ...]: raw = json.loads(path.read_text()) result: list[TraceArg] = [] diff --git a/simpler_setup/insight_trace/cli.py b/simpler_setup/insight_trace/cli.py index 89c891c96..dcde4b5c8 100644 --- a/simpler_setup/insight_trace/cli.py +++ b/simpler_setup/insight_trace/cli.py @@ -62,6 +62,8 @@ def _build_parser() -> argparse.ArgumentParser: parser.add_argument("--timeout", type=int, default=120) parser.add_argument("--hw-block-num", type=int, default=1) parser.add_argument("--arg-spec", type=Path) + parser.add_argument("--dump-dir", type=Path) + parser.add_argument("--dispatch-id", type=int) parser.add_argument("--dry-run", action="store_true") parser.add_argument("--ptoas-root", type=Path) parser.add_argument("--source-cpp", type=Path) @@ -77,7 +79,9 @@ def _run_simpler(args: argparse.Namespace): context = load_scene_case(args.test_module, args.case) kernel = select_kernel(context, args.kernel, args.func_id, args.kernel_source) validate_single_task_kernel(kernel) - trace_args = resolve_args(context, kernel, args.arg_spec) + if args.dump_dir is not None and (args.func_id is None or args.dispatch_id is None): + raise ValueError("simpler backend requires --func-id and --dispatch-id with --dump-dir") + trace_args = resolve_args(context, kernel, args.arg_spec, args.dump_dir, args.dispatch_id) output_dir = args.output_dir or _default_output_dir(args.case, kernel.name) config = TraceConfig( backend=TraceBackend.SIMPLER, diff --git a/tests/ut/py/test_insight_trace_core.py b/tests/ut/py/test_insight_trace_core.py index 465655e8b..951dbe7fe 100644 --- a/tests/ut/py/test_insight_trace_core.py +++ b/tests/ut/py/test_insight_trace_core.py @@ -10,7 +10,7 @@ import pytest -from simpler_setup.insight_trace.arg_resolver import resolve_args +from simpler_setup.insight_trace.arg_resolver import load_kernel_dump_args, resolve_args from simpler_setup.insight_trace.case_loader import load_scene_case from simpler_setup.insight_trace.kernel_analyzer import select_kernel from simpler_setup.insight_trace.models import KernelShape, TraceBackend, TraceConfig, TraceScalarArg, TraceTensorArg @@ -56,6 +56,38 @@ def test_load_arg_spec_packs_float_bits(tmp_path): assert args == (TraceScalarArg(4, "scale", "FLOAT32_BITS", 1065353216, "bits"),) +def test_load_kernel_dump_args_skips_context_slots(tmp_path): + dump = tmp_path / "tensor_dump" + dump.mkdir() + (dump / "kernel_args_dump.json").write_text( + """ + { + "dispatches": [ + {"dispatch_id": 7, "func_id": 3, "args": [ + {"arg_index": 2, "kind": "tensor", "dtype": "FLOAT32", "shape": [16]}, + {"arg_index": 48, "kind": "local_context", "dtype": "UINT64"}, + {"arg_index": 0, "kind": "tensor", "dtype": "BFLOAT16", "shape": [16, 16]}, + {"arg_index": 3, "kind": "scalar", "dtype": "UINT64", "value": 1, "pack_mode": "bits"}, + {"arg_index": 49, "kind": "global_context", "dtype": "UINT64"} + ]} + ] + } + """ + ) + args = load_kernel_dump_args(tmp_path, func_id=3, dispatch_id=7) + assert args == ( + TraceTensorArg(0, "arg0", "BFLOAT16", (16, 16)), + TraceTensorArg(2, "arg2", "FLOAT32", (16,)), + TraceScalarArg(3, "arg3", "UINT64", 1, "bits"), + ) + + +def test_load_kernel_dump_args_requires_matching_func_and_dispatch(tmp_path): + (tmp_path / "kernel_args_dump.json").write_text('{"dispatches":[{"dispatch_id":1,"func_id":2,"args":[]}]}') + with pytest.raises(ValueError, match="func_id=3, dispatch_id=1"): + load_kernel_dump_args(tmp_path, func_id=3, dispatch_id=1) + + def test_render_host_rejects_out_of_bounds_arg_index(tmp_path): config = TraceConfig( backend=TraceBackend.SIMPLER,