Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions simpler_setup/insight_trace/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Copyright (c) PyPTO Contributors.
# This program is free software, you can redistribute it and/or modify it under the terms and conditions of
# CANN Open Software License Agreement Version 2.0 (the "License").
# Please refer to the License for details. You may not use this file except in compliance with the License.
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
# See LICENSE in the root of the software repository for the full text of the License.
# -----------------------------------------------------------------------------------------------------------
"""MindStudio Insight trace workspace generation."""
181 changes: 181 additions & 0 deletions simpler_setup/insight_trace/arg_resolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
# Copyright (c) PyPTO Contributors.
# This program is free software, you can redistribute it and/or modify it under the terms and conditions of
# CANN Open Software License Agreement Version 2.0 (the "License").
# Please refer to the License for details. You may not use this file except in compliance with the License.
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
# See LICENSE in the root of the software repository for the full text of the License.
# -----------------------------------------------------------------------------------------------------------
from __future__ import annotations

import ctypes
import json
import struct
from pathlib import Path
from typing import Any

from .kernel_analyzer import read_arg_indices
from .models import KernelSpec, SceneCaseContext, TraceArg, TraceScalarArg, TraceTensorArg


def resolve_args(
context: SceneCaseContext,
kernel: KernelSpec,
arg_spec: Path | None = None,
dump_dir: Path | None = None,
dispatch_id: int | None = None,
) -> tuple[TraceArg, ...]:
if arg_spec is not None:
return _load_arg_spec(arg_spec)
if dump_dir is not None:
if dispatch_id is None:
raise ValueError("--dispatch-id is required with --dump-dir")
return load_kernel_dump_args(dump_dir, kernel.func_id, dispatch_id)
args = _paged_attention_recipe(context, kernel)
read_indices = read_arg_indices(kernel.source_path)
missing = sorted(index for index in read_indices if index not in {arg.index for arg in args})
if missing:
raise ValueError(f"Argument recipe for {kernel.name} does not cover args indices: {missing}")
return args


def load_kernel_dump_args(dump_dir: Path, func_id: int, dispatch_id: int) -> tuple[TraceArg, ...]:
dump_path = _kernel_dump_path(dump_dir)
raw = json.loads(dump_path.read_text())
dispatch = None
for item in raw.get("dispatches", []):
if int(item.get("func_id", -1)) == func_id and int(item.get("dispatch_id", -1)) == dispatch_id:
dispatch = item
break
if dispatch is None:
raise ValueError(f"No kernel args dump dispatch matches func_id={func_id}, dispatch_id={dispatch_id}")

result: list[TraceArg] = []
for item in dispatch.get("args", []):
index = int(item["arg_index"])
kind = item["kind"]
if kind == "tensor":
result.append(
TraceTensorArg(
index=index,
name=f"arg{index}",
dtype=item["dtype"],
shape=tuple(int(dim) for dim in item["shape"]),
)
)
elif kind == "scalar":
value = item["value"]
pack_mode = item.get("pack_mode", "value")
if (pack_mode == "bits" or item["dtype"] == "FLOAT32_BITS") and isinstance(value, float):
value = _f32_bits(value)
result.append(TraceScalarArg(index, f"arg{index}", item["dtype"], value, pack_mode))
elif kind in {"local_context", "global_context"}:
continue
else:
raise ValueError(f"Unknown kernel dump arg kind: {kind}")
return tuple(sorted(result, key=lambda arg: arg.index))


def _kernel_dump_path(dump_dir: Path) -> Path:
candidates = (dump_dir / "kernel_args_dump.json", dump_dir / "tensor_dump" / "kernel_args_dump.json")
for path in candidates:
if path.is_file():
return path
raise ValueError(f"kernel_args_dump.json not found under {dump_dir}")


def _load_arg_spec(path: Path) -> tuple[TraceArg, ...]:
raw = json.loads(path.read_text())
result: list[TraceArg] = []
for item in raw.get("args", raw):
if item["kind"] == "tensor":
result.append(
TraceTensorArg(
index=int(item["index"]),
name=item["name"],
dtype=item["dtype"],
shape=tuple(int(dim) for dim in item["shape"]),
role=item.get("role", "input"),
fill=item.get("fill", "zero"),
)
)
elif item["kind"] == "scalar":
value = item["value"]
pack_mode = item.get("pack_mode", "value")
if (pack_mode == "bits" or item["dtype"] == "FLOAT32_BITS") and isinstance(value, float):
value = _f32_bits(value)
result.append(
TraceScalarArg(
index=int(item["index"]),
name=item["name"],
dtype=item["dtype"],
value=value,
pack_mode=pack_mode,
)
)
Comment on lines +102 to +115
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When loading scalar arguments from a JSON specification, float values intended to be passed as bit patterns (e.g., for FLOAT32 kernel arguments) are currently truncated to integers by the template renderer. If pack_mode is set to "bits", the float value should be converted to its IEEE 754 bit representation here to ensure the generated C++ code receives the correct data.

Suggested change
elif item["kind"] == "scalar":
result.append(
TraceScalarArg(
index=int(item["index"]),
name=item["name"],
dtype=item["dtype"],
value=item["value"],
pack_mode=item.get("pack_mode", "value"),
)
)
elif item["kind"] == "scalar":
val = item["value"]
pack_mode = item.get("pack_mode", "value")
if pack_mode == "bits" and isinstance(val, float):
val = _f32_bits(val)
result.append(
TraceScalarArg(
index=int(item["index"]),
name=item["name"],
dtype=item["dtype"],
value=val,
pack_mode=pack_mode,
)
)

else:
raise ValueError(f"Unknown arg kind: {item['kind']}")
return tuple(sorted(result, key=lambda arg: arg.index))


def _paged_attention_recipe(context: SceneCaseContext, kernel: KernelSpec) -> tuple[TraceArg, ...]:
if "paged_attention" not in context.module_dir.as_posix():
raise ValueError("No built-in insight trace recipe for this test module; pass --arg-spec")
params = context.case.get("params", {})
q_tile = 16
block_size = int(params["block_size"])
head_dim = int(params["head_dim"])
scale = _scalar_value(context, "scale", default=1.0)

recipes: dict[str, tuple[TraceArg, ...]] = {
"QK": (
TraceTensorArg(0, "qi", "BFLOAT16", (q_tile, head_dim)),
TraceTensorArg(1, "kj", "BFLOAT16", (block_size, head_dim)),
TraceTensorArg(2, "sij", "FLOAT32", (q_tile, block_size)),
),
"SF": (
TraceTensorArg(0, "sij", "FLOAT32", (q_tile, block_size)),
TraceTensorArg(1, "pij", "BFLOAT16", (q_tile, block_size)),
TraceTensorArg(2, "mij", "FLOAT32", (q_tile,)),
TraceTensorArg(3, "lij", "FLOAT32", (q_tile,)),
TraceScalarArg(4, "scale", "FLOAT32_BITS", _f32_bits(float(scale)), "bits"),
),
"PV": (
TraceTensorArg(0, "pij", "BFLOAT16", (q_tile, block_size)),
TraceTensorArg(1, "vj", "BFLOAT16", (block_size, head_dim)),
TraceTensorArg(2, "oi_new", "FLOAT32", (q_tile, head_dim)),
),
"UP": (
TraceTensorArg(0, "mij", "FLOAT32", (q_tile,)),
TraceTensorArg(1, "lij", "FLOAT32", (q_tile,)),
TraceTensorArg(2, "oi_new", "FLOAT32", (q_tile, head_dim)),
TraceTensorArg(3, "mi", "FLOAT32", (q_tile,)),
TraceTensorArg(4, "li", "FLOAT32", (q_tile,)),
TraceTensorArg(5, "oi", "FLOAT32", (q_tile, head_dim)),
TraceTensorArg(6, "dst", "FLOAT32", (q_tile, head_dim)),
TraceScalarArg(7, "is_first", "UINT64", 1),
TraceScalarArg(8, "is_last", "UINT64", 1),
),
}
if kernel.name not in recipes:
raise ValueError(f"No paged_attention recipe for kernel {kernel.name}")
return recipes[kernel.name]


def _scalar_value(context: SceneCaseContext, name: str, default: Any) -> Any:
try:
builder = context.test_class().generate_args(context.case.get("params", {}))
except Exception: # noqa: BLE001
return default
for spec in getattr(builder, "specs", []):
if getattr(spec, "name", None) != name:
continue
value = spec.value
if isinstance(value, ctypes._SimpleCData):
return value.value
return value
return default


def _f32_bits(value: float) -> int:
return struct.unpack("I", struct.pack("f", value))[0]
59 changes: 59 additions & 0 deletions simpler_setup/insight_trace/case_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright (c) PyPTO Contributors.
# This program is free software, you can redistribute it and/or modify it under the terms and conditions of
# CANN Open Software License Agreement Version 2.0 (the "License").
# Please refer to the License for details. You may not use this file except in compliance with the License.
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
# See LICENSE in the root of the software repository for the full text of the License.
# -----------------------------------------------------------------------------------------------------------
from __future__ import annotations

import importlib.util
import inspect
import sys
from pathlib import Path
from types import ModuleType

from .models import SceneCaseContext


def load_module(path: Path) -> ModuleType:
path = path.resolve()
spec = importlib.util.spec_from_file_location(path.stem, str(path))
if spec is None or spec.loader is None:
raise ValueError(f"Cannot load test module: {path}")
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module


def find_scene_test_class(module: ModuleType) -> type:
candidates = []
for obj in module.__dict__.values():
if inspect.isclass(obj) and hasattr(obj, "CALLABLE") and hasattr(obj, "CASES"):
if getattr(obj, "_st_level", None) == 2:
candidates.append(obj)
if not candidates:
raise ValueError("No level-2 SceneTestCase class found")
if len(candidates) > 1:
names = ", ".join(cls.__name__ for cls in candidates)
raise ValueError(f"Multiple SceneTestCase classes found: {names}")
return candidates[0]


def load_scene_case(test_module: Path, case_name: str) -> SceneCaseContext:
module = load_module(test_module)
test_class = find_scene_test_class(module)
case = next((case for case in test_class.CASES if case.get("name") == case_name), None)
if case is None:
available = ", ".join(case.get("name", "<unnamed>") for case in test_class.CASES)
raise ValueError(f"Unknown case {case_name!r}; available cases: {available}")
return SceneCaseContext(
test_class=test_class,
case=case,
callable_spec=test_class.CALLABLE,
test_module=test_module.resolve(),
module_dir=test_module.resolve().parent,
runtime=getattr(test_class, "_st_runtime", ""),
)
Loading