Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .github/workflows/benchmark-multinode-tmpl.yml
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,12 @@ jobs:

- name: Verify eval scores
if: ${{ (success() || failure()) && inputs.eval-only }}
run: python3 utils/evals/validate_scores.py
run: |
expected_concs="${EVAL_CONC}"
if [[ -z "${expected_concs}" ]]; then
expected_concs="$(printf '%s\n' "${CONC_LIST}" | tr ' ' '\n' | sort -n | tail -1)"
fi
python3 utils/evals/validate_scores.py --expected-concs "${expected_concs}"

- name: Cleanup eval outputs (post-upload)
if: ${{ always() && (inputs.run-eval || inputs.eval-only) }}
Expand Down
4 changes: 3 additions & 1 deletion benchmarks/multi_node/amd_utils/env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ else

export MORI_EP_LAUNCH_CONFIG_MODE=AUTO

export MORI_APP_LOG_LEVEL=INFO
# Default to WARNING to cut per-op MoRI log spam on long multinode/eval
# runs; override with MORI_APP_LOG_LEVEL=INFO when debugging.
export MORI_APP_LOG_LEVEL="${MORI_APP_LOG_LEVEL:-WARNING}"

# Router logging control:
# 0 (default) keeps noisy per-request access logs out of stdout while still logging to file.
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/multi_node/amd_utils/job.slurm
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ DOCKER_ENV_COMMON=(
-e WS_PATH=${WS_PATH}
-e RUN_EVAL=\$RUN_EVAL
-e EVAL_ONLY=\$EVAL_ONLY
-e EVAL_CONC
-e \"EVAL_CONC=\$EVAL_CONC\"
-e FRAMEWORK=\$FRAMEWORK
-e PRECISION=\$PRECISION
-e MODEL_PREFIX=\$MODEL_PREFIX
Expand Down
3 changes: 3 additions & 0 deletions benchmarks/multi_node/amd_utils/server_sglang.sh
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ if [ "$NODE_RANK" -eq 0 ]; then
--host 0.0.0.0 \
--port 8000 \
--trust-remote-code \
--log-level ${SGLANG_SERVER_LOG_LEVEL:-warning} \
${PREFILL_SERVER_CONFIG} "

if [ "$PREFILL_NODES_PER_WORKER" -gt 1 ]; then
Expand Down Expand Up @@ -657,6 +658,7 @@ elif [ "$NODE_RANK" -gt 0 ] && [ "$NODE_RANK" -lt "$NODE_OFFSET" ]; then
--host 0.0.0.0 \
--port 8000 \
--trust-remote-code \
--log-level ${SGLANG_SERVER_LOG_LEVEL:-warning} \
${PREFILL_SERVER_CONFIG} "

if [ "$PREFILL_NODES_PER_WORKER" -gt 1 ]; then
Expand Down Expand Up @@ -725,6 +727,7 @@ else
--host 0.0.0.0 \
--port 8000 \
--trust-remote-code \
--log-level ${SGLANG_SERVER_LOG_LEVEL:-warning} \
${DECODE_SERVER_CONFIG} "

if [ "$DECODE_NODES_PER_WORKER" -gt 1 ]; then
Expand Down
84 changes: 76 additions & 8 deletions utils/evals/test_batched_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def test_batched_eval_requires_a_valid_manifest(tmp_path: Path) -> None:
assert any("unavailable or invalid" in error for error in errors)


def test_validate_scores_warns_when_batch_status_metadata_is_unreadable(
def test_validate_scores_fails_when_expected_batch_metadata_is_unreadable(
tmp_path: Path,
monkeypatch,
capsys,
Expand All @@ -157,18 +157,77 @@ def test_validate_scores_warns_when_batch_status_metadata_is_unreadable(
str(meta_path),
"--results-glob",
str(result_path),
"--expected-concs",
"1 4 8",
],
)

assert validate_scores_main() == 0
assert validate_scores_main() == 1
captured = capsys.readouterr()
assert (
"WARN: could not inspect eval metadata for batched concurrency status"
in captured.err
assert "unavailable or invalid" in captured.err


def test_workflow_concurrencies_are_independent_of_eval_metadata(
tmp_path: Path,
) -> None:
meta_path = tmp_path / "meta_env.json"
meta_path.write_text(json.dumps({
"eval_concs": [8],
"completed_eval_concs": [8],
"failed_eval_concs": [],
}))
result_path = tmp_path / "results_test_conc8.json"
result_path.write_text('{"results": {}}')

errors = validate_batch_manifest(
str(meta_path),
[str(result_path)],
expected_concs=[1, 4, 8],
)

assert "batched eval metadata does not match workflow concurrencies" in errors
assert any("missing completed concurrency: 1, 4" in error for error in errors)
assert any("missing result files for concurrency: 1, 4" in error for error in errors)


def test_amd_multinode_container_inherits_eval_concurrency_list() -> None:
def test_validate_scores_checks_threshold_for_every_concurrency(
tmp_path: Path,
monkeypatch,
capsys,
) -> None:
(tmp_path / "meta_env.json").write_text(json.dumps({
"eval_concs": [1, 4],
"completed_eval_concs": [1, 4],
"failed_eval_concs": [],
}))
for conc, score in ((1, 0.9), (4, 0.8)):
(tmp_path / f"results_test_conc{conc}.json").write_text(json.dumps({
"results": {
"gsm8k": {
"exact_match,strict-match": score,
},
},
}))
monkeypatch.setattr(sys, "argv", [
"validate_scores.py",
"--meta-env",
str(tmp_path / "meta_env.json"),
"--results-glob",
str(tmp_path / "results*.json"),
"--expected-concs",
"1 4",
])

assert validate_scores_main() == 1

# Each score line is attributed to the concurrency that produced it, so a
# failing concurrency is identifiable from the log (conc 4 here).
captured = capsys.readouterr()
assert "PASS: [conc=1] gsm8k exact_match,strict-match" in captured.out
assert "FAIL: [conc=4] gsm8k exact_match,strict-match" in captured.err


def test_amd_multinode_container_forwards_eval_concurrency_list() -> None:
job_slurm = (
Path(__file__).resolve().parents[2]
/ "benchmarks"
Expand All @@ -178,5 +237,14 @@ def test_amd_multinode_container_inherits_eval_concurrency_list() -> None:
)
contents = job_slurm.read_text()

assert "-e EVAL_CONC\n" in contents
assert r"-e EVAL_CONC=\$EVAL_CONC" not in contents
assert r'-e \"EVAL_CONC=\$EVAL_CONC\"' in contents
assert "-e EVAL_CONC\n" not in contents

workflow = (
Path(__file__).resolve().parents[2]
/ ".github"
/ "workflows"
/ "benchmark-multinode-tmpl.yml"
).read_text()
assert 'expected_concs="${EVAL_CONC}"' in workflow
assert 'validate_scores.py --expected-concs "${expected_concs}"' in workflow
74 changes: 65 additions & 9 deletions utils/evals/validate_scores.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,14 @@ def resolve_threshold(config: dict, prefix: str | None, task: str, fallback: flo
def validate_batch_manifest(
meta_env_path: str,
result_files: list[str],
expected_concs: list[int] | None = None,
) -> list[str]:
"""Validate that a batched eval produced every requested concurrency."""
try:
with open(meta_env_path) as f:
meta = json.load(f)
except (json.JSONDecodeError, OSError) as exc:
if any(
if expected_concs is not None or any(
CONC_SUFFIX_RE.search(Path(result_file).name)
for result_file in result_files
):
Expand All @@ -107,29 +108,44 @@ def validate_batch_manifest(
]
return []

if expected_concs is not None and "eval_concs" not in meta:
if len(expected_concs) > 1:
return ["workflow requested multiple concurrencies but batched eval metadata is missing"]
errors = []
if meta.get("conc") != expected_concs[0]:
errors.append("eval metadata concurrency does not match workflow request")
if len(result_files) != 1:
errors.append("eval must produce exactly one result file")
return errors
if "eval_concs" not in meta:
return []

expected = meta.get("eval_concs")
metadata_expected = meta.get("eval_concs")
completed = meta.get("completed_eval_concs")
failed = meta.get("failed_eval_concs")
if not all(isinstance(values, list) for values in (expected, completed, failed)):
if not all(
isinstance(values, list)
for values in (metadata_expected, completed, failed)
):
return ["batched eval metadata must contain list-valued concurrency fields"]
if not all(
isinstance(value, int) and value > 0
for values in (expected, completed, failed)
for values in (metadata_expected, completed, failed)
for value in values
):
return ["batched eval metadata contains an invalid concurrency"]

errors = []
expected_set = set(expected)
metadata_expected_set = set(metadata_expected)
expected_set = set(expected_concs or metadata_expected)
completed_set = set(completed)
failed_set = set(failed)
if len(expected_set) != len(expected):
if len(metadata_expected_set) != len(metadata_expected):
errors.append("batched eval metadata contains duplicate expected concurrencies")
if len(completed_set) != len(completed):
errors.append("batched eval metadata contains duplicate completed concurrencies")
if expected_concs is not None and metadata_expected_set != expected_set:
errors.append("batched eval metadata does not match workflow concurrencies")
if failed_set:
errors.append(
"batched eval failed for concurrency: "
Expand Down Expand Up @@ -175,6 +191,19 @@ def validate_batch_manifest(


def main() -> int:
# CI merges this script's stdout and stderr into a single log. When stdout
# is a pipe it is block-buffered by default and only flushes at exit, which
# pushes the informational header (e.g. "Loaded thresholds...") below the
# unbuffered stderr FAIL lines. Force line buffering on both streams so
# every line reaches the log in emission order.
for _stream in (sys.stdout, sys.stderr):
try:
_stream.reconfigure(line_buffering=True)
except (AttributeError, ValueError):
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
# Best-effort only: some wrapped streams (e.g. pytest's capture
# object) don't support reconfigure; leave their buffering as-is.
pass

parser = argparse.ArgumentParser(description="Validate eval scores")
parser.add_argument(
"--min-score", type=float, default=0.85,
Expand All @@ -200,8 +229,27 @@ def main() -> int:
"--results-glob", default="results*.json",
help="Glob pattern for result files (default: 'results*.json')",
)
parser.add_argument(
"--expected-concs",
default=None,
help="Space-separated concurrencies requested by the workflow",
)
args = parser.parse_args()

expected_concs = None
if args.expected_concs is not None:
try:
expected_concs = [int(value) for value in args.expected_concs.split()]
except ValueError:
expected_concs = []
if (
not expected_concs
or any(value <= 0 for value in expected_concs)
or len(set(expected_concs)) != len(expected_concs)
):
print("FAIL: expected concurrencies must be unique positive integers", file=sys.stderr)
return 1

# Load thresholds config
config = {"default": {}, "models": {}}
thresholds_path = args.thresholds
Expand Down Expand Up @@ -229,7 +277,11 @@ def main() -> int:
checked = 0
result_files = sorted(glob.glob(args.results_glob))

manifest_errors = validate_batch_manifest(args.meta_env, result_files)
manifest_errors = validate_batch_manifest(
args.meta_env,
result_files,
expected_concs,
)
for error in manifest_errors:
print(f"FAIL: {error}", file=sys.stderr)
failed = True
Expand All @@ -246,6 +298,8 @@ def main() -> int:
)

for f in result_files:
match = CONC_SUFFIX_RE.search(Path(f).name)
conc_label = f"[conc={match.group(1)}] " if match else ""
with open(f) as fh:
data = json.load(fh)
for task, metrics in data.get("results", {}).items():
Expand All @@ -258,12 +312,14 @@ def main() -> int:
checked += 1
if val < min_score:
print(
f"FAIL: {task} {name} = {val:.4f} (< {min_score} from {source})",
f"FAIL: {conc_label}{task} {name} = {val:.4f} (< {min_score} from {source})",
file=sys.stderr,
)
failed = True
else:
print(f"PASS: {task} {name} = {val:.4f} (>= {min_score} from {source})")
print(
f"PASS: {conc_label}{task} {name} = {val:.4f} (>= {min_score} from {source})"
)

if checked == 0:
print("WARN: no metrics matched prefix '{}'".format(args.metric_prefix), file=sys.stderr)
Expand Down
16 changes: 15 additions & 1 deletion utils/matrix_logic/generate_sweep_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,20 +149,34 @@ def _eligible_eval_concs(entry):


def mark_all_eval_entries(matrix_values: list[dict]) -> list[dict]:
"""Expand eval selection to every fixed-sequence entry.
"""Expand eval selection to every 8k1k fixed-sequence entry.

Evals only run at 8k1k (matching mark_eval_entries), so entries at other
sequence lengths (e.g. 1k1k) are passed through untouched rather than
expanded into eval rows.
Agentic entries are left untouched because they do not support lm-eval.
Multi-node rows with the same engine topology are merged into one eval row
whose full concurrency list is run sequentially against the same engine.
"""
expanded_entries: list[dict] = []
multinode_indices: dict[tuple, int] = {}

target_isl, target_osl = seq_len_stoi["8k1k"]

for entry in matrix_values:
if entry.get(Fields.SCENARIO_TYPE.value) == 'agentic-coding':
expanded_entries.append(entry)
continue

# Only 8k1k is eligible for evals; leave other sequence lengths as-is
# (their RUN_EVAL stays False, so the evals-only filter drops them).
if (
entry.get(Fields.ISL.value) != target_isl
or entry.get(Fields.OSL.value) != target_osl
):
expanded_entries.append(entry)
continue

if Fields.PREFILL.value in entry:
conc = entry[Fields.CONC.value]
conc_values = conc if isinstance(conc, list) else [conc]
Expand Down
Loading