diff --git a/.github/workflows/benchmark-multinode-tmpl.yml b/.github/workflows/benchmark-multinode-tmpl.yml index e58cff478..4ca9be53c 100644 --- a/.github/workflows/benchmark-multinode-tmpl.yml +++ b/.github/workflows/benchmark-multinode-tmpl.yml @@ -312,8 +312,15 @@ jobs: if-no-files-found: ${{ inputs.eval-only && 'error' || 'ignore' }} - name: Verify eval scores - if: ${{ (success() || failure()) && inputs.eval-only }} - run: python3 utils/evals/validate_scores.py + if: ${{ (success() || failure()) && (inputs.run-eval || inputs.eval-only) }} + run: | + expected_concs="${EVAL_CONC}" + if [[ -z "${expected_concs}" ]]; then + expected_concs="$( + printf '%s\n' "${CONC_LIST}" | tr ' ' '\n' | sort -n | tail -1 + )" + fi + python3 utils/evals/validate_scores.py --expected-concs "${expected_concs}" - name: Cleanup eval outputs (post-upload) if: ${{ always() && (inputs.run-eval || inputs.eval-only) }} diff --git a/.github/workflows/benchmark-tmpl.yml b/.github/workflows/benchmark-tmpl.yml index a57e89725..d6d982bd5 100644 --- a/.github/workflows/benchmark-tmpl.yml +++ b/.github/workflows/benchmark-tmpl.yml @@ -302,8 +302,8 @@ jobs: if-no-files-found: ${{ inputs.eval-only && 'error' || 'ignore' }} - name: Verify eval scores - if: ${{ (success() || failure()) && inputs.eval-only }} - run: python3 utils/evals/validate_scores.py + if: ${{ (success() || failure()) && (inputs.run-eval || inputs.eval-only) }} + run: python3 utils/evals/validate_scores.py --expected-concs "${CONC}" - name: Cleanup eval outputs (post-upload) if: ${{ always() && (env.RUN_EVAL == 'true' || inputs.eval-only) }} diff --git a/.github/workflows/run-sweep.yml b/.github/workflows/run-sweep.yml index 2aa917622..65be1552a 100644 --- a/.github/workflows/run-sweep.yml +++ b/.github/workflows/run-sweep.yml @@ -438,7 +438,8 @@ jobs: conc: ${{ matrix.config.conc }} spec-decoding: ${{ matrix.config.spec-decoding }} disagg: ${{ matrix.config.disagg }} - run-eval: ${{ matrix.config.run-eval }} + # Accuracy runs have their own eval-only matrices below. + run-eval: false sweep-single-node-8k1k: needs: [setup, canary-select, canary-sweep] diff --git a/.github/workflows/test-changelog-gate.yml b/.github/workflows/test-changelog-gate.yml index d46a6aaac..950053326 100644 --- a/.github/workflows/test-changelog-gate.yml +++ b/.github/workflows/test-changelog-gate.yml @@ -8,19 +8,22 @@ on: paths: - ".claude/commands/recover-failed-ingest.md" - ".github/workflows/benchmark-multinode-tmpl.yml" + - ".github/workflows/benchmark-tmpl.yml" + - ".github/workflows/collect-evals.yml" - ".github/workflows/e2e-tests.yml" - ".github/workflows/run-sweep.yml" - ".github/workflows/test-changelog-gate.yml" - "benchmarks/benchmark_lib.sh" - - "benchmarks/multi_node/amd_utils/job.slurm" + - "benchmarks/multi_node/amd_utils/**" + - "benchmarks/single_node/**" + - "runners/launch_*.sh" - "utils/find_reusable_sweep_run.py" - "utils/test_find_reusable_sweep_run.py" - "utils/process_changelog.py" - "utils/test_process_changelog.py" - "utils/collect_eval_results.py" - "utils/test_collect_eval_results.py" - - "utils/evals/validate_scores.py" - - "utils/evals/test_batched_eval.py" + - "utils/evals/**" - "utils/prepare_perf_changelog_merge.py" - "utils/recover_failed_ingest.py" - "utils/changelog_gate_tests/test_prepare_perf_changelog_merge.py" diff --git a/benchmarks/benchmark_lib.sh b/benchmarks/benchmark_lib.sh index 95e063a3d..aed65d8b8 100644 --- a/benchmarks/benchmark_lib.sh +++ b/benchmarks/benchmark_lib.sh @@ -752,6 +752,7 @@ _stage_lm_eval_artifacts() { local results_dir="$1" local eval_conc="$2" local moved=0 + local result_count=0 local failed=0 local jf base stem extension target suffix @@ -788,6 +789,9 @@ _stage_lm_eval_artifacts() { if mv -f "$jf" "$target"; then moved=1 + if [[ "$base" == results*.json ]]; then + result_count=$((result_count + 1)) + fi else echo "WARN: failed to stage eval artifact ${jf}" >&2 failed=1 @@ -805,6 +809,10 @@ _stage_lm_eval_artifacts() { echo "WARN: no eval artifacts were produced for concurrency ${eval_conc}" >&2 return 1 fi + if [ "$result_count" -eq 0 ]; then + echo "WARN: no results*.json artifact was produced for concurrency ${eval_conc}" >&2 + return 1 + fi return "$failed" } @@ -834,6 +842,12 @@ append_lm_eval_summary() { local meta_json local metadata_conc="${CONC:-1}" local batch_metadata="" + local eval_exit_code="${EVAL_RUN_EXIT_CODE:-0}" + + if [[ ! "$eval_exit_code" =~ ^[0-9]+$ ]]; then + echo "ERROR: invalid eval exit code '${eval_exit_code}'" >&2 + return 1 + fi if [ -n "$batch_concs" ]; then meta_json="./meta_env.json" @@ -864,8 +878,13 @@ append_lm_eval_summary() { meta_json="${out_dir}/meta_env.json" fi + if [[ ! "$metadata_conc" =~ ^[1-9][0-9]*$ ]]; then + echo "ERROR: invalid eval metadata concurrency '${metadata_conc}'" >&2 + return 1 + fi + # Write minimal meta for collectors that expect it - local model_name="${MODEL_NAME:-$MODEL}" + local model_name="${MODEL_NAME:-${MODEL:-}}" local is_multinode_json="false" if [ "${IS_MULTINODE:-false}" = "true" ]; then is_multinode_json="true" @@ -899,7 +918,7 @@ append_lm_eval_summary() { local fw="${FRAMEWORK:-}" local prec="${PRECISION:-}" if [[ -z "$fw" || -z "$prec" ]]; then - if [[ -n "${RESULT_FILENAME}" ]]; then + if [[ -n "${RESULT_FILENAME:-}" ]]; then # Extract the two fields immediately before "_tp" # Handles arbitrary underscores in exp_name by matching from the end local parsed @@ -914,12 +933,13 @@ append_lm_eval_summary() { fi fi fi - cat > "${meta_json}" < "${meta_json}" <&2 + return 1 + fi + + if ! python3 -m json.tool "${meta_json}" >/dev/null 2>&1; then + echo "ERROR: generated invalid eval metadata at ${meta_json}" >&2 + rm -f "${meta_json}" || true + return 1 + fi if [ -n "$batch_concs" ]; then echo "Prepared batched eval artifacts in: $(pwd)" @@ -946,26 +976,94 @@ META fi # Move eval artifacts into PWD (no new directories in workspace) + local artifact_rc=0 + local result_count=0 + local jf base if [ -f "${meta_json}" ]; then - mv -f "${meta_json}" ./ || echo "WARN: failed to move ${meta_json}" >&2 + if ! mv -f "${meta_json}" ./; then + echo "ERROR: failed to move ${meta_json}" >&2 + artifact_rc=1 + fi fi if [ -d "${out_dir}" ]; then while IFS= read -r -d '' jf; do base=$(basename "$jf") if [ "$base" != "meta_env.json" ]; then - mv -f "$jf" ./ || echo "WARN: failed to move ${jf}" >&2 + if mv -f "$jf" ./; then + if [[ "$base" == results*.json ]]; then + result_count=$((result_count + 1)) + fi + else + echo "ERROR: failed to move ${jf}" >&2 + artifact_rc=1 + fi fi - done < <(find "${out_dir}" -type f -name "*.json*" -print0 2>/dev/null) + done < <( + find "${out_dir}" -type f \ + \( -name "*.json" -o -name "*.jsonl" \) -print0 2>/dev/null + ) + fi + if [ "$result_count" -eq 0 ]; then + echo "ERROR: eval produced no results*.json artifact" >&2 + artifact_rc=1 fi - # Best-effort cleanup of the temp directory - if [ -n "${out_dir}" ] && [ -d "${out_dir}" ]; then + # Preserve the temp directory when staging fails so the caller can inspect it. + if [ "$artifact_rc" -eq 0 ] && [ -n "${out_dir}" ] && [ -d "${out_dir}" ]; then rm -rf --one-file-system "${out_dir}" || rm -rf "${out_dir}" || true fi + if [ "$artifact_rc" -ne 0 ]; then + echo "ERROR: eval artifact staging was incomplete" >&2 + return "$artifact_rc" + fi + echo "Moved eval artifacts to: $(pwd)" } +_copy_lm_eval_artifacts() { + local source_dir="$1" + local destination_dir="$2" + local artifact_path base + local result_count=0 + local copy_rc=0 + + if ! mkdir -p "$destination_dir"; then + echo "ERROR: failed to create eval artifact directory ${destination_dir}" >&2 + return 1 + fi + + if [ ! -f "${source_dir}/meta_env.json" ]; then + echo "ERROR: missing ${source_dir}/meta_env.json" >&2 + copy_rc=1 + elif ! cp -f "${source_dir}/meta_env.json" "$destination_dir/"; then + echo "ERROR: failed to copy ${source_dir}/meta_env.json" >&2 + copy_rc=1 + fi + + while IFS= read -r -d '' artifact_path; do + base=$(basename "$artifact_path") + if cp -f "$artifact_path" "$destination_dir/"; then + if [[ "$base" == results*.json ]]; then + result_count=$((result_count + 1)) + fi + else + echo "ERROR: failed to copy eval artifact ${artifact_path}" >&2 + copy_rc=1 + fi + done < <( + find "$source_dir" -maxdepth 1 -type f \ + \( -name "results*.json" -o -name "sample*.jsonl" \) \ + -print0 2>/dev/null + ) + + if [ "$result_count" -eq 0 ]; then + echo "ERROR: no results*.json artifacts found in ${source_dir}" >&2 + copy_rc=1 + fi + return "$copy_rc" +} + # ------------------------------ # Unified eval entrypoint # ------------------------------ @@ -989,29 +1087,46 @@ run_eval() { unset EVAL_BATCHED_CONCS unset EVAL_BATCHED_COMPLETED_CONCS unset EVAL_BATCHED_FAILED_CONCS + export EVAL_RUN_EXIT_CODE=0 local requested_concs="${EVAL_CONCURRENT_REQUESTS:-}" + if [ -z "$requested_concs" ] && [ -n "${CONC:-}" ]; then + requested_concs="$CONC" + fi + requested_concs="${requested_concs//$'\n'/ }" + local eval_concs=() if [ -n "$requested_concs" ]; then read -r -a eval_concs <<< "$requested_concs" fi + local eval_conc seen_concs=" " + for eval_conc in "${eval_concs[@]}"; do + if [[ ! "$eval_conc" =~ ^[1-9][0-9]*$ ]]; then + echo "ERROR: invalid eval concurrency '${eval_conc}'" >&2 + export EVAL_RUN_EXIT_CODE=2 + return 2 + fi + if [[ "$seen_concs" == *" ${eval_conc} "* ]]; then + echo "ERROR: duplicate eval concurrency '${eval_conc}'" >&2 + export EVAL_RUN_EXIT_CODE=2 + return 2 + fi + seen_concs="${seen_concs}${eval_conc} " + done + if [ "${#eval_concs[@]}" -gt 1 ]; then if [[ "$framework" != "lm-eval" && "$framework" != "lm_eval" ]]; then echo "ERROR: batched eval concurrency is only supported for lm-eval" >&2 + export EVAL_RUN_EXIT_CODE=2 return 1 fi - local eval_conc results_dir eval_rc stage_rc + local results_dir eval_rc stage_rc local completed_concs=() local failed_concs=() for eval_conc in "${eval_concs[@]}"; do - if [[ ! "$eval_conc" =~ ^[1-9][0-9]*$ ]]; then - echo "ERROR: invalid eval concurrency '${eval_conc}'" >&2 - return 1 - fi - if ! results_dir=$(mktemp -d /tmp/eval_out-conc"${eval_conc}"-XXXXXX); then echo "ERROR: failed to create eval output directory for concurrency ${eval_conc}" >&2 failed_concs+=("$eval_conc") @@ -1043,8 +1158,13 @@ run_eval() { export EVAL_BATCHED_FAILED_CONCS="${failed_concs[*]}" if [ "${#failed_concs[@]}" -gt 0 ]; then + export EVAL_RUN_EXIT_CODE=1 echo "ERROR: batched eval failed for concurrency: ${failed_concs[*]}" >&2 - echo "Deferring failure until post-upload score validation preserves all artifacts" >&2 + if [ "${EVAL_ONLY:-false}" = "true" ]; then + echo "Deferring failure until post-upload score validation preserves all artifacts" >&2 + return 0 + fi + return 1 fi return 0 fi @@ -1055,11 +1175,12 @@ run_eval() { *) echo "Unknown framework '${framework}'"; eval_rc=1 ;; esac + export EVAL_RUN_EXIT_CODE="$eval_rc" if [ "$eval_rc" -ne 0 ]; then echo "ERROR: run_eval failed with exit code $eval_rc" >&2 - if [ "${EVAL_ONLY}" = "true" ]; then - echo "Eval-only mode: failing after artifact collection" >&2 - return "$eval_rc" + if [ "${EVAL_ONLY:-false}" = "true" ]; then + echo "Deferring failure until post-upload score validation preserves artifacts" >&2 + return 0 fi fi return $eval_rc diff --git a/benchmarks/multi_node/amd_utils/env_atom.sh b/benchmarks/multi_node/amd_utils/env_atom.sh index 52f81b7d6..3beef4111 100644 --- a/benchmarks/multi_node/amd_utils/env_atom.sh +++ b/benchmarks/multi_node/amd_utils/env_atom.sh @@ -34,7 +34,10 @@ export IBDEVICES export SAFETENSORS_FAST_GPU=1 export VLLM_LOG_LEVEL=WARNING -export ATOM_LOG_LEVEL=WARNING +export ATOM_LOG_LEVEL="${ATOM_LOG_LEVEL:-WARNING}" +export ATOM_UVICORN_LOG_LEVEL="${ATOM_UVICORN_LOG_LEVEL:-warning}" +export ATOM_UVICORN_ACCESS_LOG="${ATOM_UVICORN_ACCESS_LOG:-0}" +export ATOMESH_LOG_LEVEL="${ATOMESH_LOG_LEVEL:-warn}" export AITER_LOG_LEVEL=WARNING export LOG_LEVEL=WARNING export LOGLEVEL=WARNING diff --git a/benchmarks/multi_node/amd_utils/job.slurm b/benchmarks/multi_node/amd_utils/job.slurm index 17f5b4f54..bb55eb847 100755 --- a/benchmarks/multi_node/amd_utils/job.slurm +++ b/benchmarks/multi_node/amd_utils/job.slurm @@ -304,6 +304,11 @@ export ENGINE=$ENGINE export RUN_EVAL="${RUN_EVAL:-false}" export EVAL_ONLY="${EVAL_ONLY:-false}" export EVAL_CONC="${EVAL_CONC:-}" +export EVAL_FRAMEWORK="${EVAL_FRAMEWORK:-}" +export EVAL_TASKS_DIR="${EVAL_TASKS_DIR:-}" +export EVAL_MAX_MODEL_LEN="${EVAL_MAX_MODEL_LEN:-}" +export MODEL="${MODEL:-}" +export MAX_MODEL_LEN="${MAX_MODEL_LEN:-}" export FRAMEWORK="${FRAMEWORK:-}" export PRECISION="${PRECISION:-}" export MODEL_PREFIX="${MODEL_PREFIX:-}" @@ -354,6 +359,9 @@ DOCKER_ENV_COMMON=( -e NODE0_ADDR=\$NODE0_ADDR -e MODEL_DIR=/models -e MODEL_NAME=\$MODEL_NAME + -e MODEL=\$MODEL + -e MODEL_PATH=$DOCKER_MODEL_PATH + -e MAX_MODEL_LEN=\$MAX_MODEL_LEN -e GPUS_PER_NODE=\$GPUS_PER_NODE -e xP=\$xP -e yD=\$yD @@ -370,7 +378,9 @@ DOCKER_ENV_COMMON=( -e WS_PATH=${WS_PATH} -e RUN_EVAL=\$RUN_EVAL -e EVAL_ONLY=\$EVAL_ONLY - -e EVAL_CONC + -e EVAL_FRAMEWORK=\$EVAL_FRAMEWORK + -e EVAL_TASKS_DIR=\$EVAL_TASKS_DIR + -e EVAL_MAX_MODEL_LEN=\$EVAL_MAX_MODEL_LEN -e FRAMEWORK=\$FRAMEWORK -e PRECISION=\$PRECISION -e MODEL_PREFIX=\$MODEL_PREFIX @@ -417,6 +427,10 @@ elif [[ "$ENGINE" == "atom-disagg" ]]; then -e MAX_NUM_SEQS=${MAX_NUM_SEQS:-256} -e EXTRA_SERVER_ARGS=\${EXTRA_SERVER_ARGS:-} -e IBDEVICES=${IBDEVICES:-} + -e ATOM_LOG_LEVEL=\${ATOM_LOG_LEVEL:-WARNING} + -e ATOM_UVICORN_LOG_LEVEL=\${ATOM_UVICORN_LOG_LEVEL:-warning} + -e ATOM_UVICORN_ACCESS_LOG=\${ATOM_UVICORN_ACCESS_LOG:-0} + -e ATOMESH_LOG_LEVEL=\${ATOMESH_LOG_LEVEL:-warn} ) else DOCKER_ENV_ENGINE=( @@ -585,6 +599,7 @@ fi -v ${DI_REPO_DIR}:${DOCKER_MOUNT_PATH} \ ${EXTRA_DOCKER_MOUNTS:-} \ \${RDMA_MOUNTS[@]+"\${RDMA_MOUNTS[@]}"} \ + -e \"EVAL_CONC=\$EVAL_CONC\" \ ${DOCKER_ENV_COMMON[*]} \ ${DOCKER_ENV_ENGINE[*]} \ --name \"$DOCKER_CONT_NAME\" \ diff --git a/benchmarks/multi_node/amd_utils/server_atom.sh b/benchmarks/multi_node/amd_utils/server_atom.sh index 957c84d60..601c6de78 100644 --- a/benchmarks/multi_node/amd_utils/server_atom.sh +++ b/benchmarks/multi_node/amd_utils/server_atom.sh @@ -29,11 +29,11 @@ IPADDRS="${IPADDRS:-localhost}" # Parallelism PREFILL_TP_SIZE="${PREFILL_TP_SIZE:-8}" -PREFILL_ENABLE_EP="${PREFILL_ENABLE_EP}" -PREFILL_ENABLE_DP="${PREFILL_ENABLE_DP}" +PREFILL_ENABLE_EP="${PREFILL_ENABLE_EP:-false}" +PREFILL_ENABLE_DP="${PREFILL_ENABLE_DP:-false}" DECODE_TP_SIZE="${DECODE_TP_SIZE:-8}" -DECODE_ENABLE_EP="${DECODE_ENABLE_EP}" -DECODE_ENABLE_DP="${DECODE_ENABLE_DP}" +DECODE_ENABLE_EP="${DECODE_ENABLE_EP:-false}" +DECODE_ENABLE_DP="${DECODE_ENABLE_DP:-false}" # ATOM server ports (different from SGLang which uses 8000 for all) PREFILL_PORT="${PREFILL_PORT:-8010}" @@ -63,8 +63,14 @@ GPUS_PER_NODE="${GPUS_PER_NODE:-8}" # Dependencies and Environment Setup # ============================================================================= -source $ATOM_WS_PATH/setup_deps.sh -source $ATOM_WS_PATH/env_atom.sh +if ! source "$ATOM_WS_PATH/setup_deps.sh"; then + echo "ERROR: failed to initialize ATOM dependencies" >&2 + exit 1 +fi +if ! source "$ATOM_WS_PATH/env_atom.sh"; then + echo "ERROR: failed to initialize ATOM environment" >&2 + exit 1 +fi host_ip=$(ip route get 1.1.1.1 2>/dev/null | awk '/src/ {print $7}') if [[ -z "$host_ip" ]]; then @@ -103,28 +109,21 @@ done echo "Prefill IPs : ${PREFILL_IPS[*]}" echo "Decode IPs : ${DECODE_IPS[*]}" -PREFILL_ENABLE_EP="${PREFILL_ENABLE_EP}" -PREFILL_ENABLE_DP="${PREFILL_ENABLE_DP}" -DECODE_ENABLE_EP="${DECODE_ENABLE_EP}" -DECODE_ENABLE_DP="${DECODE_ENABLE_DP}" - PREFILL_PARALLEL_ARGS=(-tp "$PREFILL_TP_SIZE") #TP -if [ "$PREFILL_ENABLE_DP" = "true" ]; then - if [ "$PREFILL_ENABLE_EP" -gt 1 ]; then #DPA+EP - PREFILL_PARALLEL_ARGS=(-tp "$PREFILL_TP_SIZE" --enable-expert-parallel --enable-dp-attention ) - else #DPA+TP - PREFILL_PARALLEL_ARGS=(-tp "$PREFILL_TP_SIZE" --enable-dp-attention ) - fi -fi - -DECODE_PARALLEL_ARGS=(-tp "$PREFILL_TP_SIZE") #TP -if [ "$DECODE_ENABLE_DP" = "true" ]; then - if [ "$DECODE_ENABLE_EP" -gt 1 ]; then #DPA+EP - DECODE_PARALLEL_ARGS=(-tp "$DECODE_TP_SIZE" --enable-expert-parallel --enable-dp-attention ) - else #DPA+TP - DECODE_PARALLEL_ARGS=(-tp "$DECODE_TP_SIZE" --enable-dp-attention ) - fi -fi +if [[ "$PREFILL_ENABLE_EP" == "true" ]]; then + PREFILL_PARALLEL_ARGS+=(--enable-expert-parallel) +fi +if [[ "$PREFILL_ENABLE_DP" == "true" ]]; then + PREFILL_PARALLEL_ARGS+=(--enable-dp-attention) +fi + +DECODE_PARALLEL_ARGS=(-tp "$DECODE_TP_SIZE") #TP +if [[ "$DECODE_ENABLE_EP" == "true" ]]; then + DECODE_PARALLEL_ARGS+=(--enable-expert-parallel) +fi +if [[ "$DECODE_ENABLE_DP" == "true" ]]; then + DECODE_PARALLEL_ARGS+=(--enable-dp-attention) +fi echo "Prefill Parallel args : ${PREFILL_PARALLEL_ARGS[*]}" echo "Decode Parallel args : ${DECODE_PARALLEL_ARGS[*]}" @@ -213,7 +212,7 @@ if [ "$NODE_RANK" -eq 0 ]; then ${DECODE_ARGS} \ --policy random \ --backend atom \ - --log-level info \ + --log-level ${ATOMESH_LOG_LEVEL} \ --disable-health-check \ --disable-circuit-breaker \ --prometheus-port 29100" @@ -303,26 +302,33 @@ if [ "$NODE_RANK" -eq 0 ]; then else export TP="${PREFILL_TP_SIZE}" export CONC="${EVAL_CONCURRENT_REQUESTS}" + export EP_SIZE=1 + [[ "${PREFILL_ENABLE_EP}" == "true" ]] && EP_SIZE="${PREFILL_TP_SIZE}" export PREFILL_TP="${PREFILL_TP_SIZE}" export PREFILL_EP=1 + [[ "${PREFILL_ENABLE_EP}" == "true" ]] && PREFILL_EP="${PREFILL_TP_SIZE}" export PREFILL_NUM_WORKERS="${xP}" export DECODE_TP="${DECODE_TP_SIZE}" export DECODE_EP=1 + [[ "${DECODE_ENABLE_EP}" == "true" ]] && DECODE_EP="${DECODE_TP_SIZE}" export DECODE_NUM_WORKERS="${yD}" + export DP_ATTENTION="${PREFILL_ENABLE_DP}" + export PREFILL_DP_ATTENTION="${PREFILL_ENABLE_DP}" + export DECODE_DP_ATTENTION="${DECODE_ENABLE_DP}" export ISL="${BENCH_INPUT_LEN}" export OSL="${BENCH_OUTPUT_LEN}" - MODEL_NAME="${MODEL_DIR}/${MODEL_NAME}" append_lm_eval_summary - EVAL_COPY_DIR="/run_logs/slurm_job-${SLURM_JOB_ID}/eval_results" - mkdir -p "$EVAL_COPY_DIR" - for f in meta_env.json; do - [ -e "/workspace/$f" ] && cp -f "/workspace/$f" "$EVAL_COPY_DIR/" - done - find /workspace -maxdepth 1 -name 'results*.json' -exec cp -f {} "$EVAL_COPY_DIR/" \; - find /workspace -maxdepth 1 -name 'sample*.jsonl' -exec cp -f {} "$EVAL_COPY_DIR/" \; - - echo "Eval completed. Artifacts staged in $EVAL_COPY_DIR" + if ! MODEL_NAME="${MODEL_DIR}/${MODEL_NAME}" append_lm_eval_summary; then + echo "ERROR: failed to finalize eval artifacts" >&2 + EVAL_FAILED=1 + fi + if ! _copy_lm_eval_artifacts /workspace "$EVAL_COPY_DIR"; then + echo "ERROR: failed to stage eval artifacts in $EVAL_COPY_DIR" >&2 + EVAL_FAILED=1 + else + echo "Eval completed. Artifacts staged in $EVAL_COPY_DIR" + fi fi fi diff --git a/benchmarks/multi_node/amd_utils/server_sglang.sh b/benchmarks/multi_node/amd_utils/server_sglang.sh index c28ccab41..0f7fd2fc3 100755 --- a/benchmarks/multi_node/amd_utils/server_sglang.sh +++ b/benchmarks/multi_node/amd_utils/server_sglang.sh @@ -597,20 +597,17 @@ if [ "$NODE_RANK" -eq 0 ]; then # IS_MULTINODE, FRAMEWORK, PRECISION, MODEL_PREFIX, RUNNER_TYPE, # RESULT_FILENAME are already set via Docker -e flags from job.slurm - append_lm_eval_summary - # Files (meta_env.json, results*.json, sample*.jsonl) are now in /workspace - - # Copy eval artifacts to run_logs for NFS extraction by runner EVAL_COPY_DIR="/run_logs/slurm_job-${SLURM_JOB_ID}/eval_results" - mkdir -p "$EVAL_COPY_DIR" - for f in meta_env.json; do - [ -e "/workspace/$f" ] && cp -f "/workspace/$f" "$EVAL_COPY_DIR/" - done - # Use find for glob patterns to avoid "no match" errors - find /workspace -maxdepth 1 -name 'results*.json' -exec cp -f {} "$EVAL_COPY_DIR/" \; - find /workspace -maxdepth 1 -name 'sample*.jsonl' -exec cp -f {} "$EVAL_COPY_DIR/" \; - - echo "Eval completed. Artifacts staged in $EVAL_COPY_DIR" + if ! append_lm_eval_summary; then + echo "ERROR: failed to finalize eval artifacts" >&2 + EVAL_FAILED=1 + fi + if ! _copy_lm_eval_artifacts /workspace "$EVAL_COPY_DIR"; then + echo "ERROR: failed to stage eval artifacts in $EVAL_COPY_DIR" >&2 + EVAL_FAILED=1 + else + echo "Eval completed. Artifacts staged in $EVAL_COPY_DIR" + fi fi fi diff --git a/benchmarks/multi_node/amd_utils/server_vllm.sh b/benchmarks/multi_node/amd_utils/server_vllm.sh index d61fe0359..d8871a17e 100755 --- a/benchmarks/multi_node/amd_utils/server_vllm.sh +++ b/benchmarks/multi_node/amd_utils/server_vllm.sh @@ -372,17 +372,17 @@ if [ "$NODE_RANK" -eq 0 ]; then export ISL="${BENCH_INPUT_LEN}" export OSL="${BENCH_OUTPUT_LEN}" - append_lm_eval_summary - EVAL_COPY_DIR="/run_logs/slurm_job-${SLURM_JOB_ID}/eval_results" - mkdir -p "$EVAL_COPY_DIR" - for f in meta_env.json; do - [ -e "/workspace/$f" ] && cp -f "/workspace/$f" "$EVAL_COPY_DIR/" - done - find /workspace -maxdepth 1 -name 'results*.json' -exec cp -f {} "$EVAL_COPY_DIR/" \; - find /workspace -maxdepth 1 -name 'sample*.jsonl' -exec cp -f {} "$EVAL_COPY_DIR/" \; - - echo "Eval completed. Artifacts staged in $EVAL_COPY_DIR" + if ! append_lm_eval_summary; then + echo "ERROR: failed to finalize eval artifacts" >&2 + EVAL_FAILED=1 + fi + if ! _copy_lm_eval_artifacts /workspace "$EVAL_COPY_DIR"; then + echo "ERROR: failed to stage eval artifacts in $EVAL_COPY_DIR" >&2 + EVAL_FAILED=1 + else + echo "Eval completed. Artifacts staged in $EVAL_COPY_DIR" + fi fi fi diff --git a/benchmarks/multi_node/amd_utils/setup_deps.sh b/benchmarks/multi_node/amd_utils/setup_deps.sh index add2e3fa5..b094954f8 100644 --- a/benchmarks/multi_node/amd_utils/setup_deps.sh +++ b/benchmarks/multi_node/amd_utils/setup_deps.sh @@ -7,9 +7,10 @@ # (base image: vllm/vllm-openai-rocm:v0.18.0) # sglang-disagg -> SGLang aiter gluon patch + per-model installs # (base image: lmsysorg/sglang-rocm:v0.5.12-rocm720-mi35x-*) +# atom-disagg -> ATOM logging controls + shared aiter/model installs # -# Sourced by server_vllm.sh and server_sglang.sh so PATH / LD_LIBRARY_PATH -# exports persist. Each patch is idempotent: skipped if already applied. +# Sourced by the engine server launchers so PATH / LD_LIBRARY_PATH exports +# persist. Each patch is idempotent: skipped if already applied. # # Build steps run in subshells to avoid CWD pollution between installers. # ============================================================================= @@ -79,6 +80,284 @@ install_amd_quark() { _SETUP_INSTALLED+=("amd-quark") } +# --------------------------------------------------------------------------- +# 7. Make the pinned ATOM image honor InferenceX logging environment variables. +# +# ATOM's Python logger hardcodes its console handler to INFO, and its +# programmatic uvicorn.run() call enables INFO/access logs without CLI flags. +# Engine workers use multiprocessing "spawn", so changing only the parent +# process logger does not affect them. Upstream ATOM does not currently expose +# these controls. Patch the installed package once so all processes read the +# inherited environment: +# ATOM_LOG_LEVEL (default WARNING) +# ATOM_UVICORN_LOG_LEVEL (default warning) +# ATOM_UVICORN_ACCESS_LOG (default 0) +# atomesh has a native --log-level flag and does not need patching. +# +# This patch fails closed: after editing, it executes the patched getLogger() +# function with a root INFO handler installed and verifies that ATOM INFO is +# suppressed, WARNING is emitted exactly once, and propagation is disabled. It +# also executes the patched uvicorn.run() expression with a fake server and +# verifies the effective level/access-log arguments. +# --------------------------------------------------------------------------- +patch_atom_logging_controls() { + if ! python3 - <<'PY' +import ast +import contextlib +import importlib.util +import io +import logging +import os +import sys +import types +from pathlib import Path + +spec = importlib.util.find_spec("atom") +if spec is None or not spec.submodule_search_locations: + raise RuntimeError("could not locate the installed ATOM package") + +atom_dir = Path(next(iter(spec.submodule_search_locations))) +updates: dict[Path, str] = {} + +logger_path = atom_dir / "utils" / "__init__.py" +logger_src = logger_path.read_text() +logger_marker = "# InferenceX compatibility: make logging environment-driven." +if logger_marker not in logger_src: + old_logger_level = " logger.setLevel(logging.DEBUG)" + old_handler_level = " console_handler.setLevel(logging.INFO)" + if logger_src.count(old_logger_level) != 1: + raise RuntimeError( + f"{logger_path}: expected one hardcoded logger DEBUG level" + ) + if logger_src.count(old_handler_level) != 1: + raise RuntimeError( + f"{logger_path}: expected one hardcoded handler INFO level" + ) + new_logger_level = """ # InferenceX compatibility: make logging environment-driven. + _atom_log_level = getattr( + logging, + os.getenv("ATOM_LOG_LEVEL", "WARNING").upper(), + logging.WARNING, + ) + logger.setLevel(_atom_log_level) + # Prevent another framework's root handler from re-emitting ATOM INFO. + logger.propagate = False""" + logger_src = logger_src.replace( + old_logger_level, + new_logger_level, + 1, + ).replace( + old_handler_level, + " console_handler.setLevel(_atom_log_level)", + 1, + ) + updates[logger_path] = logger_src + +api_path = atom_dir / "entrypoints" / "openai" / "api_server.py" +api_src = api_path.read_text() +if "ATOM_UVICORN_ACCESS_LOG" not in api_src: + old = " uvicorn.run(app, host=args.host, port=args.server_port)" + if api_src.count(old) != 1: + raise RuntimeError(f"{api_path}: expected one uvicorn.run() call") + new = """ uvicorn.run( + app, + host=args.host, + port=args.server_port, + log_level=__import__("os").getenv( + "ATOM_UVICORN_LOG_LEVEL", "warning" + ).lower(), + access_log=__import__("os").getenv( + "ATOM_UVICORN_ACCESS_LOG", "0" + ).strip().lower() in {"1", "true", "yes", "on"}, + )""" + api_src = api_src.replace(old, new, 1) + updates[api_path] = api_src + +for path, contents in updates.items(): + compile(contents, str(path), "exec") + path.write_text(contents) + +logger_src = logger_path.read_text() +api_src = api_path.read_text() + +for required in ( + logger_marker, + 'os.getenv("ATOM_LOG_LEVEL", "WARNING")', + "logger.setLevel(_atom_log_level)", + "console_handler.setLevel(_atom_log_level)", + "logger.propagate = False", +): + if required not in logger_src: + raise RuntimeError(f"{logger_path}: missing logging control: {required}") + +for required in ( + '"ATOM_UVICORN_LOG_LEVEL", "warning"', + '"ATOM_UVICORN_ACCESS_LOG", "0"', + "access_log=", +): + if required not in api_src: + raise RuntimeError(f"{api_path}: missing logging control: {required}") + + +def verify_logger_behavior() -> None: + tree = ast.parse(logger_src, filename=str(logger_path)) + functions = [ + node + for node in tree.body + if isinstance(node, ast.FunctionDef) and node.name == "getLogger" + ] + if len(functions) != 1: + raise RuntimeError(f"{logger_path}: expected one getLogger() function") + + envs_module = types.ModuleType("atom.utils.envs") + envs_module.ATOM_LOG_MORE = False + atom_module = types.ModuleType("atom") + atom_module.__path__ = [] + utils_module = types.ModuleType("atom.utils") + utils_module.__path__ = [] + utils_module.envs = envs_module + module_backups = { + name: sys.modules.get(name) + for name in ("atom", "atom.utils", "atom.utils.envs") + } + sys.modules["atom"] = atom_module + sys.modules["atom.utils"] = utils_module + sys.modules["atom.utils.envs"] = envs_module + + atom_logger = logging.getLogger("atom") + old_handlers = atom_logger.handlers[:] + old_level = atom_logger.level + old_propagate = atom_logger.propagate + root_logger = logging.getLogger() + root_stream = io.StringIO() + root_handler = logging.StreamHandler(root_stream) + old_root_level = root_logger.level + root_logger.addHandler(root_handler) + root_logger.setLevel(logging.DEBUG) + + old_atom_level = os.environ.get("ATOM_LOG_LEVEL") + os.environ["ATOM_LOG_LEVEL"] = "WARNING" + atom_logger.handlers.clear() + try: + namespace = { + "logger": atom_logger, + "logging": logging, + "os": os, + "torch": types.SimpleNamespace( + _dynamo=types.SimpleNamespace(config=types.SimpleNamespace()) + ), + } + function_module = ast.Module(body=functions, type_ignores=[]) + ast.fix_missing_locations(function_module) + exec(compile(function_module, str(logger_path), "exec"), namespace) + + atom_stream = io.StringIO() + with contextlib.redirect_stderr(atom_stream): + configured_logger = namespace["getLogger"]() + for handler in configured_logger.handlers: + if isinstance(handler, logging.StreamHandler): + handler.setStream(atom_stream) + configured_logger.info("INFERENCEX_HIDDEN_ATOM_INFO") + configured_logger.warning("INFERENCEX_VISIBLE_ATOM_WARNING") + + output = atom_stream.getvalue() + if "INFERENCEX_HIDDEN_ATOM_INFO" in output: + raise RuntimeError("ATOM INFO logging is still enabled") + if output.count("INFERENCEX_VISIBLE_ATOM_WARNING") != 1: + raise RuntimeError("ATOM WARNING was not emitted exactly once") + if root_stream.getvalue(): + raise RuntimeError("ATOM logs still propagate to the root logger") + if configured_logger.getEffectiveLevel() != logging.WARNING: + raise RuntimeError("ATOM logger effective level is not WARNING") + if configured_logger.propagate: + raise RuntimeError("ATOM logger propagation is still enabled") + finally: + atom_logger.handlers[:] = old_handlers + atom_logger.setLevel(old_level) + atom_logger.propagate = old_propagate + root_logger.removeHandler(root_handler) + root_logger.setLevel(old_root_level) + if old_atom_level is None: + os.environ.pop("ATOM_LOG_LEVEL", None) + else: + os.environ["ATOM_LOG_LEVEL"] = old_atom_level + for name, module in module_backups.items(): + if module is None: + sys.modules.pop(name, None) + else: + sys.modules[name] = module + + +def verify_uvicorn_behavior() -> None: + tree = ast.parse(api_src, filename=str(api_path)) + calls = [ + node + for node in ast.walk(tree) + if isinstance(node, ast.Call) + and isinstance(node.func, ast.Attribute) + and isinstance(node.func.value, ast.Name) + and node.func.value.id == "uvicorn" + and node.func.attr == "run" + ] + if len(calls) != 1: + raise RuntimeError(f"{api_path}: expected one uvicorn.run() call") + + captured: dict[str, object] = {} + + class FakeUvicorn: + @staticmethod + def run(*args, **kwargs): + captured["args"] = args + captured["kwargs"] = kwargs + + old_level = os.environ.get("ATOM_UVICORN_LOG_LEVEL") + old_access = os.environ.get("ATOM_UVICORN_ACCESS_LOG") + os.environ["ATOM_UVICORN_LOG_LEVEL"] = "warning" + os.environ["ATOM_UVICORN_ACCESS_LOG"] = "0" + try: + expression = ast.Expression(body=calls[0]) + ast.fix_missing_locations(expression) + eval( + compile(expression, str(api_path), "eval"), + { + "app": object(), + "args": types.SimpleNamespace(host="0.0.0.0", server_port=8000), + "uvicorn": FakeUvicorn, + }, + ) + kwargs = captured.get("kwargs") + if not isinstance(kwargs, dict): + raise RuntimeError("patched uvicorn.run() was not executed") + if kwargs.get("log_level") != "warning": + raise RuntimeError("Uvicorn log level is not warning") + if kwargs.get("access_log") is not False: + raise RuntimeError("Uvicorn access logging is still enabled") + finally: + if old_level is None: + os.environ.pop("ATOM_UVICORN_LOG_LEVEL", None) + else: + os.environ["ATOM_UVICORN_LOG_LEVEL"] = old_level + if old_access is None: + os.environ.pop("ATOM_UVICORN_ACCESS_LOG", None) + else: + os.environ["ATOM_UVICORN_ACCESS_LOG"] = old_access + + +verify_logger_behavior() +verify_uvicorn_behavior() + +action = "Patched" if updates else "Verified" +print( + f"[SETUP] {action} ATOM logging controls: " + "engine=WARNING propagation=off uvicorn=warning access_log=off" +) +PY + then + return 1 + fi + _SETUP_INSTALLED+=("ATOM-logging-controls") +} + # --------------------------------------------------------------------------- # 8. Patch vLLM MoRI-IO save_kv_layer busy-spin (C128 tail-batch deadlock) # In WRITE mode, save_kv_layer spins forever waiting for the handshake @@ -739,6 +1018,16 @@ install_transformers_glm5() { # Run installers (engine-gated) # ============================================================================= +if [[ "$ENGINE" == "atom-disagg" ]]; then + if ! patch_atom_logging_controls; then + echo "[SETUP] ERROR: failed to enable ATOM logging controls" >&2 + if [[ "${BASH_SOURCE[0]}" != "$0" ]]; then + return 1 + fi + exit 1 + fi +fi + if [[ "$ENGINE" == "vllm-disagg" ]]; then install_recipe_deps install_amd_quark diff --git a/benchmarks/multi_node/amd_utils/submit.sh b/benchmarks/multi_node/amd_utils/submit.sh index fa3d65418..dc30cd4c3 100755 --- a/benchmarks/multi_node/amd_utils/submit.sh +++ b/benchmarks/multi_node/amd_utils/submit.sh @@ -129,6 +129,11 @@ export BENCH_RANDOM_RANGE_RATIO=${RANDOM_RANGE_RATIO:-0.8} export RUN_EVAL="${RUN_EVAL:-false}" export EVAL_ONLY="${EVAL_ONLY:-false}" export EVAL_CONC="${EVAL_CONC:-}" +export EVAL_FRAMEWORK="${EVAL_FRAMEWORK:-}" +export EVAL_TASKS_DIR="${EVAL_TASKS_DIR:-}" +export EVAL_MAX_MODEL_LEN="${EVAL_MAX_MODEL_LEN:-}" +export MODEL="${MODEL:-}" +export MAX_MODEL_LEN="${MAX_MODEL_LEN:-}" export FRAMEWORK="${FRAMEWORK:-}" export PRECISION="${PRECISION:-}" export MODEL_PREFIX="${MODEL_PREFIX:-}" diff --git a/runners/launch_b200-dgxc.sh b/runners/launch_b200-dgxc.sh index 3bd64eb9a..a6bac4d00 100644 --- a/runners/launch_b200-dgxc.sh +++ b/runners/launch_b200-dgxc.sh @@ -375,8 +375,13 @@ EOF shopt -s nullglob for eval_file in "$EVAL_DIR"/*; do [ -f "$eval_file" ] || continue - cp "$eval_file" "$GITHUB_WORKSPACE/" - echo "Copied eval artifact: $(basename "$eval_file")" + eval_dest="$GITHUB_WORKSPACE/$(basename "$eval_file")" + rm -f "$eval_dest" + if cp "$eval_file" "$eval_dest"; then + echo "Copied eval artifact: $(basename "$eval_file")" + else + echo "WARNING: Failed to copy eval artifact: $(basename "$eval_file")" >&2 + fi done shopt -u nullglob else diff --git a/runners/launch_b300-nv.sh b/runners/launch_b300-nv.sh index a941860c0..d49cc6ad7 100644 --- a/runners/launch_b300-nv.sh +++ b/runners/launch_b300-nv.sh @@ -277,8 +277,13 @@ if [[ "${RUN_EVAL:-false}" == "true" || "${EVAL_ONLY:-false}" == "true" ]]; then shopt -s nullglob for eval_file in "$EVAL_DIR"/*; do [ -f "$eval_file" ] || continue - cp "$eval_file" "$GITHUB_WORKSPACE/" - echo "Copied eval artifact: $(basename "$eval_file")" + eval_dest="$GITHUB_WORKSPACE/$(basename "$eval_file")" + rm -f "$eval_dest" + if cp "$eval_file" "$eval_dest"; then + echo "Copied eval artifact: $(basename "$eval_file")" + else + echo "WARNING: Failed to copy eval artifact: $(basename "$eval_file")" >&2 + fi done shopt -u nullglob else diff --git a/runners/launch_gb200-nv.sh b/runners/launch_gb200-nv.sh index 36c8af203..95fea2dd1 100755 --- a/runners/launch_gb200-nv.sh +++ b/runners/launch_gb200-nv.sh @@ -500,8 +500,13 @@ if [[ "${RUN_EVAL:-false}" == "true" || "${EVAL_ONLY:-false}" == "true" ]]; then shopt -s nullglob for eval_file in "$EVAL_DIR"/*; do [ -f "$eval_file" ] || continue - cp "$eval_file" "$GITHUB_WORKSPACE/" - echo "Copied eval artifact: $(basename "$eval_file")" + eval_dest="$GITHUB_WORKSPACE/$(basename "$eval_file")" + rm -f "$eval_dest" + if cp "$eval_file" "$eval_dest"; then + echo "Copied eval artifact: $(basename "$eval_file")" + else + echo "WARNING: Failed to copy eval artifact: $(basename "$eval_file")" >&2 + fi done shopt -u nullglob else diff --git a/runners/launch_h100-cr.sh b/runners/launch_h100-cr.sh index 08a5a967a..d4f97e85c 100644 --- a/runners/launch_h100-cr.sh +++ b/runners/launch_h100-cr.sh @@ -9,14 +9,63 @@ server_name="bmk-server" # the h200 launchers, which have carried SPEC_SUFFIX since #392). SPEC_SUFFIX=$([[ "$SPEC_DECODING" == "mtp" ]] && printf '_mtp' || printf '') +DOCKER_ENV_VARS=( + HF_TOKEN + HF_HUB_CACHE + EXP_NAME + IMAGE + MODEL + MODEL_PREFIX + TP + EP_SIZE + DP_ATTENTION + CONC + MAX_MODEL_LEN + ISL + OSL + FRAMEWORK + PRECISION + SPEC_DECODING + DISAGG + RUN_EVAL + EVAL_ONLY + EVAL_FRAMEWORK + EVAL_TASKS_DIR + EVAL_MAX_MODEL_LEN + OPENAI_API_KEY + RUNNER_TYPE + RESULT_FILENAME + RANDOM_RANGE_RATIO + SCENARIO_TYPE + SCENARIO_SUBDIR + IS_AGENTIC + OFFLOADING + TOTAL_CPU_DRAM_GB + DURATION + RESULT_DIR + PYTHONDONTWRITEBYTECODE + PYTHONPYCACHEPREFIX + PROFILE + SGLANG_TORCH_PROFILER_DIR + VLLM_TORCH_PROFILER_DIR + VLLM_RPC_TIMEOUT +) +DOCKER_ENV_ARGS=() +export PYTHONPYCACHEPREFIX="${PYTHONPYCACHEPREFIX:-/tmp/pycache/}" +for env_name in "${DOCKER_ENV_VARS[@]}"; do + DOCKER_ENV_ARGS+=(--env "$env_name") +done + set -x -docker run --rm --network=host --name=$server_name \ +docker run --rm --network=host --name="$server_name" \ --runtime=nvidia --gpus=all --ipc=host --privileged --shm-size=16g --ulimit memlock=-1 --ulimit stack=67108864 \ --v $HF_HUB_CACHE_MOUNT:$HF_HUB_CACHE \ --v $GITHUB_WORKSPACE:/workspace/ -w /workspace/ \ --e HF_TOKEN -e HF_HUB_CACHE -e MODEL -e TP -e CONC -e MAX_MODEL_LEN -e ISL -e OSL -e RUN_EVAL -e EVAL_ONLY -e RUNNER_TYPE -e RESULT_FILENAME -e RANDOM_RANGE_RATIO -e PORT=$PORT \ --e PROFILE -e SGLANG_TORCH_PROFILER_DIR -e VLLM_TORCH_PROFILER_DIR -e VLLM_RPC_TIMEOUT \ --e PYTHONPYCACHEPREFIX=/tmp/pycache/ -e TORCH_CUDA_ARCH_LIST="9.0" -e CUDA_DEVICE_ORDER=PCI_BUS_ID -e CUDA_VISIBLE_DEVICES="0,1,2,3,4,5,6,7" \ +-v "$HF_HUB_CACHE_MOUNT:$HF_HUB_CACHE" \ +-v "$GITHUB_WORKSPACE:/workspace/" -w /workspace/ \ +"${DOCKER_ENV_ARGS[@]}" \ +--env "PORT=$PORT" \ +--env TORCH_CUDA_ARCH_LIST="9.0" \ +--env CUDA_DEVICE_ORDER=PCI_BUS_ID \ +--env CUDA_VISIBLE_DEVICES="0,1,2,3,4,5,6,7" \ --entrypoint=/bin/bash \ -$IMAGE \ -benchmarks/single_node/${SCENARIO_SUBDIR}"${EXP_NAME%%_*}_${PRECISION}_h100${SPEC_SUFFIX}.sh" +"$IMAGE" \ +"benchmarks/single_node/${SCENARIO_SUBDIR}${EXP_NAME%%_*}_${PRECISION}_h100${SPEC_SUFFIX}.sh" diff --git a/runners/launch_h100-dgxc-slurm.sh b/runners/launch_h100-dgxc-slurm.sh index d1bbcef58..2fad08c5c 100644 --- a/runners/launch_h100-dgxc-slurm.sh +++ b/runners/launch_h100-dgxc-slurm.sh @@ -262,8 +262,13 @@ EOF shopt -s nullglob for eval_file in "$EVAL_DIR"/*; do [ -f "$eval_file" ] || continue - cp "$eval_file" "$GITHUB_WORKSPACE/" - echo "Copied eval artifact: $(basename "$eval_file")" + eval_dest="$GITHUB_WORKSPACE/$(basename "$eval_file")" + rm -f "$eval_dest" + if cp "$eval_file" "$eval_dest"; then + echo "Copied eval artifact: $(basename "$eval_file")" + else + echo "WARNING: Failed to copy eval artifact: $(basename "$eval_file")" >&2 + fi done shopt -u nullglob else diff --git a/runners/launch_h200-dgxc-slurm.sh b/runners/launch_h200-dgxc-slurm.sh index 572056956..f51c1908f 100755 --- a/runners/launch_h200-dgxc-slurm.sh +++ b/runners/launch_h200-dgxc-slurm.sh @@ -250,8 +250,13 @@ EOF shopt -s nullglob for eval_file in "$EVAL_DIR"/*; do [ -f "$eval_file" ] || continue - cp "$eval_file" "$GITHUB_WORKSPACE/" - echo "Copied eval artifact: $(basename "$eval_file")" + eval_dest="$GITHUB_WORKSPACE/$(basename "$eval_file")" + rm -f "$eval_dest" + if cp "$eval_file" "$eval_dest"; then + echo "Copied eval artifact: $(basename "$eval_file")" + else + echo "WARNING: Failed to copy eval artifact: $(basename "$eval_file")" >&2 + fi done shopt -u nullglob else diff --git a/runners/launch_mi355x-amds.sh b/runners/launch_mi355x-amds.sh index acfd4912a..d7200992b 100644 --- a/runners/launch_mi355x-amds.sh +++ b/runners/launch_mi355x-amds.sh @@ -164,7 +164,7 @@ PY fi # Extract eval results if eval was requested - if [[ "${RUN_EVAL:-false}" == "true" ]]; then + if [[ "${RUN_EVAL:-false}" == "true" || "${EVAL_ONLY:-false}" == "true" ]]; then # Find eval_results in the slurm job logs directory EVAL_DIR=$(find "$BENCHMARK_LOGS_DIR/logs" -type d -name eval_results 2>/dev/null | head -1) if [ -n "$EVAL_DIR" ] && [ -d "$EVAL_DIR" ]; then @@ -172,8 +172,13 @@ PY shopt -s nullglob for eval_file in "$EVAL_DIR"/*; do [ -f "$eval_file" ] || continue - cp "$eval_file" "$GITHUB_WORKSPACE/" - echo "Copied eval artifact: $(basename "$eval_file")" + eval_dest="$GITHUB_WORKSPACE/$(basename "$eval_file")" + rm -f "$eval_dest" + if cp "$eval_file" "$eval_dest"; then + echo "Copied eval artifact: $(basename "$eval_file")" + else + echo "WARNING: Failed to copy eval artifact: $(basename "$eval_file")" >&2 + fi done shopt -u nullglob else diff --git a/utils/collect_eval_results.py b/utils/collect_eval_results.py index 194fa4acb..522d5c08e 100644 --- a/utils/collect_eval_results.py +++ b/utils/collect_eval_results.py @@ -46,6 +46,23 @@ def result_concurrency(path: Path) -> Optional[int]: return int(match.group(1)) if match else None +def valid_concurrency_list( + value: object, + *, + allow_empty: bool = True, +) -> bool: + """Return whether metadata contains unique positive integer concurrencies.""" + return ( + isinstance(value, list) + and (allow_empty or bool(value)) + and all( + isinstance(conc, int) and not isinstance(conc, bool) and conc > 0 + for conc in value + ) + and len(set(value)) == len(value) + ) + + def detect_lm_eval_jsons(d: Path, batched: bool = False) -> List[Path]: """Return lm-eval result JSONs from one artifact directory. @@ -99,60 +116,104 @@ def extract_lm_metrics(json_path: Path) -> List[Dict[str, Any]]: - Values from results[task][metric,filter] """ data = load_json(json_path) or {} + if not isinstance(data, dict): + return [] results = data.get('results', {}) configs = data.get('configs', {}) - if not results: + if not isinstance(results, dict) or not results: return [] + if not isinstance(configs, dict): + configs = {} extracted = [] - for task in results.keys(): - task_results = results[task] + for task, task_results in results.items(): + if not isinstance(task_results, dict): + continue task_config = configs.get(task, {}) + if not isinstance(task_config, dict): + task_config = {} # Base metric: from config's metric_list metric_list = task_config.get('metric_list', []) - base_metric = metric_list[0]['metric'] if metric_list else 'exact_match' + if ( + isinstance(metric_list, list) + and metric_list + and isinstance(metric_list[0], dict) + and isinstance(metric_list[0].get('metric'), str) + ): + base_metric = metric_list[0]['metric'] + else: + base_metric = 'exact_match' # Filters: from config's filter_list filter_list = task_config.get('filter_list', []) + if not isinstance(filter_list, list): + filter_list = [] strict_val, strict_se = None, None flex_val, flex_se = None, None accuracy_val, accuracy_se = None, None # Helper to get value/stderr pair for filtered metrics - def get_val_se(filter_name: str) -> Tuple[Optional[float], Optional[float]]: - val_key = f"{base_metric},{filter_name}" - se_key = f"{base_metric}_stderr,{filter_name}" + def get_val_se( + filter_name: Optional[str], + ) -> Tuple[Optional[float], Optional[float]]: + suffix = f",{filter_name}" if filter_name else "" + val_key = f"{base_metric}{suffix}" + se_key = f"{base_metric}_stderr{suffix}" return task_results.get(val_key), task_results.get(se_key) # Extract metrics based on filter_list if not filter_list: - # No filters - check for accuracy or use base metric - if 'acc' in task_results: - accuracy_val = task_results.get('acc') - accuracy_se = task_results.get('acc_stderr') + value, stderr = get_val_se('none') + if value is None: + value, stderr = get_val_se(None) + if base_metric in {'acc', 'accuracy'}: + accuracy_val, accuracy_se = value, stderr else: - strict_val = task_results.get(base_metric) - strict_se = task_results.get(f"{base_metric}_stderr") + strict_val, strict_se = value, stderr else: # Extract metrics for each filter - for f in filter_list: - fname = f['name'] - if 'strict' in fname: - strict_val, strict_se = get_val_se(fname) - elif 'flex' in fname or 'extract' in fname: - flex_val, flex_se = get_val_se(fname) + for filter_config in filter_list: + if not isinstance(filter_config, dict): + continue + filter_name = filter_config.get('name') + if not isinstance(filter_name, str): + continue + value, stderr = get_val_se(filter_name) + normalized_name = filter_name.lower() + if 'strict' in normalized_name: + strict_val, strict_se = value, stderr + elif ( + 'flex' in normalized_name + or 'extract' in normalized_name + ): + flex_val, flex_se = value, stderr + elif base_metric in {'acc', 'accuracy'}: + accuracy_val, accuracy_se = value, stderr + elif strict_val is None: + strict_val, strict_se = value, stderr # N-samples (effective count) - n_eff = data.get('n-samples', {}).get(task, {}).get('effective') + sample_counts = data.get('n-samples', {}) + if not isinstance(sample_counts, dict): + sample_counts = {} + task_counts = sample_counts.get(task, {}) + n_eff = ( + task_counts.get('effective') + if isinstance(task_counts, dict) + else None + ) # Model name + metadata = task_config.get('metadata', {}) + if not isinstance(metadata, dict): + metadata = {} model = ( data.get('model_name') - or task_config.get('metadata', {}).get('model') + or metadata.get('model') ) extracted.append({ @@ -264,6 +325,10 @@ def build_row(meta: Dict[str, Any], m: Dict[str, Any]) -> Dict[str, Any]: row['score'] = m.get('strict') row['score_name'] = 'em_strict' row['score_se'] = m.get('strict_se') + elif m.get('flex') is not None: + row['score'] = m.get('flex') + row['score_name'] = 'em_flexible' + row['score_se'] = m.get('flex_se') elif m.get('accuracy') is not None: row['score'] = m.get('accuracy') row['score_name'] = 'accuracy' @@ -281,15 +346,48 @@ def collect_eval_rows(root: Path) -> List[Dict[str, Any]]: rows: List[Dict[str, Any]] = [] for d in find_eval_sets(root): meta = load_json(d / 'meta_env.json') or {} + eval_exit_code = meta.get('eval_exit_code') + if ( + not isinstance(eval_exit_code, int) + or isinstance(eval_exit_code, bool) + or eval_exit_code != 0 + ): + continue + batch_concs = meta.get('eval_concs') - batched = isinstance(batch_concs, list) + if 'eval_concs' in meta and not valid_concurrency_list( + batch_concs, + allow_empty=False, + ): + continue + batched = valid_concurrency_list(batch_concs, allow_empty=False) allowed_concs: Optional[set[int]] = None if batched: - completed_concs = meta.get('completed_eval_concs', batch_concs) - if isinstance(completed_concs, list): - allowed_concs = {as_int(conc, -1) for conc in completed_concs} + completed_concs = meta.get('completed_eval_concs') + failed_concs = meta.get('failed_eval_concs') + if not ( + valid_concurrency_list(completed_concs) + and valid_concurrency_list(failed_concs) + ): + continue + expected_set = set(batch_concs) + completed_set = set(completed_concs) + failed_set = set(failed_concs) + if ( + completed_set != expected_set + or failed_set + or completed_set & failed_set + ): + continue + allowed_concs = completed_set + + lm_paths = detect_lm_eval_jsons(d, batched=batched) + if batched and { + result_concurrency(path) for path in lm_paths + } != allowed_concs: + continue - for lm_path in detect_lm_eval_jsons(d, batched=batched): + for lm_path in lm_paths: row_meta = meta if batched: conc = result_concurrency(lm_path) diff --git a/utils/evals/EVALS.md b/utils/evals/EVALS.md index 41247cf07..e18dc6bcc 100644 --- a/utils/evals/EVALS.md +++ b/utils/evals/EVALS.md @@ -90,9 +90,11 @@ Multi-node evals support two hardware paths: - `lm-eval` runner (`benchmarks/lm_eval.py`) is invoked by `do_sweep.py` as a post/eval-only step and sources InferenceX's `benchmark_lib.sh` from the mounted workspace (`/infmax-workspace`) - Eval artifacts written to `/logs/eval_results/` inside the container, collected by launch scripts - NVIDIA Slurm launch scripts always collect server logs for debugging but skip benchmark result collection when `EVAL_ONLY=true` -- Env vars threaded: `RUN_EVAL`, `EVAL_ONLY`, `IS_MULTINODE`, `FRAMEWORK`, `PRECISION`, `MODEL_PREFIX`, `RUNNER_TYPE`, `RESULT_FILENAME`, `SPEC_DECODING`, `ISL`, `OSL`, `PREFILL_TP/EP/NUM_WORKERS/DP_ATTN`, `DECODE_TP/EP/NUM_WORKERS/DP_ATTN`, `MODEL_NAME`, `EVAL_CONC` +- Env vars threaded: `RUN_EVAL`, `EVAL_ONLY`, `IS_MULTINODE`, `FRAMEWORK`, `PRECISION`, `MODEL_PREFIX`, `RUNNER_TYPE`, `RESULT_FILENAME`, `SPEC_DECODING`, `ISL`, `OSL`, `PREFILL_TP/EP/NUM_WORKERS/DP_ATTN`, `DECODE_TP/EP/NUM_WORKERS/DP_ATTN`, `MODEL_NAME`, `MODEL_PATH`, `MAX_MODEL_LEN`, `EVAL_CONC`, `EVAL_FRAMEWORK`, `EVAL_TASKS_DIR`, `EVAL_MAX_MODEL_LEN` -For multi-node `all-evals`, `EVAL_CONC` is a space-separated list. When it contains multiple values, `run_eval` runs those concurrency points sequentially against the same live engine, stages each result with a `_concN` filename suffix, and records expected/completed/failed points in `meta_env.json`. +For multi-node `all-evals`, `EVAL_CONC` is a space-separated list. When it contains multiple values, `run_eval` runs those concurrency points sequentially against the same live engine, stages each result with a `_concN` filename suffix, and records expected/completed/failed points in `meta_env.json`. The workflow passes its requested list independently to score validation, so missing metadata, missing concurrency results, result files without a checked score, and scores below threshold all fail the `Verify eval scores` step. + +AMD ATOM jobs default engine, Uvicorn, and atomesh output to warning level through `ATOM_LOG_LEVEL`, `ATOM_UVICORN_LOG_LEVEL`, and `ATOMESH_LOG_LEVEL`; Uvicorn access logs default off through `ATOM_UVICORN_ACCESS_LOG=0`. Only atomesh's `--log-level` is currently native upstream. The pinned ATOM image hardcodes its Python logger and `uvicorn.run()` defaults, so `setup_deps.sh` adds the other InferenceX compatibility controls to the installed package and behavior-tests them before serving. Startup fails if ATOM INFO can still reach either its handler or the root logger, or if Uvicorn access logging remains enabled. Set the levels to `INFO`/`info` and `ATOM_UVICORN_ACCESS_LOG=1` for verbose troubleshooting. ### Workflow structure - `e2e-tests.yml`: `test-sweep-evals` (single-node) and `test-sweep-multi-node-evals` (multi-node) @@ -129,7 +131,7 @@ cat ./evals/agg_eval_all.json | jq '[.[] | select(.hw == "B200")]' | Field | Description | |-------|-------------| -| `score` | Primary metric (exact match for GSM8K) | +| `score` | Primary task metric (`em_strict`, then `em_flexible`, then accuracy) | | `em_strict` | Strict exact match (requires `####` format) | | `em_flexible` | Flexible extraction (looser number matching) | | `n_eff` | Number of samples evaluated | @@ -148,7 +150,7 @@ cat ./evals/agg_eval_all.json | jq '[.[] | select(.hw == "B200")]' | `EVAL_CONCURRENT_REQUESTS` | `64` | Concurrent requests during eval; a space-separated list enables sequential batched evals against one live engine | ### Score validation -`utils/evals/validate_scores.py` checks eval results against thresholds in `utils/evals/thresholds.json`. Runs as a separate workflow step after artifact upload so results are preserved even if validation fails. +`utils/evals/validate_scores.py` checks eval results against thresholds in `utils/evals/thresholds.json`. The workflow supplies `--expected-concs`, and validation requires matching metadata plus at least one checked score in every result file. It runs as a separate workflow step after artifact upload so results are preserved even if validation fails. ### Adding a new eval task diff --git a/utils/evals/test_batched_eval.py b/utils/evals/test_batched_eval.py index f1ebb6b64..b5db22907 100644 --- a/utils/evals/test_batched_eval.py +++ b/utils/evals/test_batched_eval.py @@ -2,6 +2,7 @@ import json import os +import re import subprocess import sys from pathlib import Path @@ -93,6 +94,7 @@ def test_batched_eval_runs_every_concurrency_and_stages_results( assert meta["eval_concs"] == [1, 4, 8] assert meta["completed_eval_concs"] == [1, 4, 8] assert meta["failed_eval_concs"] == [] + assert meta["eval_exit_code"] == 0 assert sorted(path.name for path in tmp_path.glob("results*.json")) == [ "results_test_conc1.json", "results_test_conc4.json", @@ -101,6 +103,7 @@ def test_batched_eval_runs_every_concurrency_and_stages_results( assert validate_batch_manifest( str(tmp_path / "meta_env.json"), [str(path) for path in tmp_path.glob("results*.json")], + expected_concs=[1, 4, 8], ) == [] @@ -111,9 +114,11 @@ def test_batched_eval_preserves_partial_results_and_records_failure( assert meta["completed_eval_concs"] == [1, 8] assert meta["failed_eval_concs"] == [4] + assert meta["eval_exit_code"] == 1 errors = validate_batch_manifest( str(tmp_path / "meta_env.json"), [str(path) for path in tmp_path.glob("results*.json")], + expected_concs=[1, 4, 8], ) assert any("failed for concurrency: 4" in error for error in errors) assert any("missing completed concurrency: 4" in error for error in errors) @@ -131,7 +136,34 @@ def test_batched_eval_requires_a_valid_manifest(tmp_path: Path) -> None: assert any("unavailable or invalid" in error for error in errors) -def test_validate_scores_warns_when_batch_status_metadata_is_unreadable( +def test_batched_eval_requires_a_result_for_every_workflow_concurrency( + tmp_path: Path, +) -> None: + meta_path = tmp_path / "meta_env.json" + meta_path.write_text( + json.dumps({ + "eval_exit_code": 0, + "eval_concs": [1, 4, 8], + "completed_eval_concs": [1, 4, 8], + "failed_eval_concs": [], + }) + ) + result_files = [] + for conc in (1, 8): + result_path = tmp_path / f"results_test_conc{conc}.json" + result_path.write_text('{"results": {}}') + result_files.append(str(result_path)) + + errors = validate_batch_manifest( + str(meta_path), + result_files, + expected_concs=[1, 4, 8], + ) + + assert any("missing result files for concurrency: 4" in error for error in errors) + + +def test_validate_scores_fails_when_workflow_batch_metadata_is_unreadable( tmp_path: Path, monkeypatch, capsys, @@ -157,26 +189,1090 @@ def test_validate_scores_warns_when_batch_status_metadata_is_unreadable( str(meta_path), "--results-glob", str(result_path), + "--expected-concs", + "1 4 8", + ], + ) + + assert validate_scores_main() == 1 + captured = capsys.readouterr() + assert "meta_env.json is unavailable or invalid" in captured.err + + +def test_validate_scores_rejects_single_result_for_workflow_batch( + tmp_path: Path, + monkeypatch, + capsys, +) -> None: + meta_path = tmp_path / "meta_env.json" + meta_path.write_text(json.dumps({ + "eval_exit_code": 0, + "conc": 8, + "infmax_model_prefix": "test", + })) + result_path = tmp_path / "results_test.json" + result_path.write_text( + json.dumps({ + "results": { + "custom_eval": { + "exact_match,strict-match": 1.0, + }, + }, + }) + ) + monkeypatch.setattr( + sys, + "argv", + [ + "validate_scores.py", + "--meta-env", + str(meta_path), + "--results-glob", + str(result_path), + "--expected-concs", + "1 4 8", + ], + ) + + assert validate_scores_main() == 1 + captured = capsys.readouterr() + assert "workflow requested multiple eval concurrencies" in captured.err + assert "result lacks a concurrency suffix" in captured.err + assert "missing result files for concurrency: 1, 4, 8" in captured.err + + +def test_validate_scores_fails_when_any_concurrency_is_below_threshold( + tmp_path: Path, + monkeypatch, + capsys, +) -> None: + meta_path = tmp_path / "meta_env.json" + meta_path.write_text( + json.dumps({ + "eval_exit_code": 0, + "eval_concs": [1, 4], + "completed_eval_concs": [1, 4], + "failed_eval_concs": [], + "infmax_model_prefix": "test", + }) + ) + for conc, score in ((1, 0.9), (4, 0.8)): + (tmp_path / f"results_test_conc{conc}.json").write_text( + json.dumps({ + "results": { + "custom_eval": { + "exact_match,strict-match": score, + }, + }, + }) + ) + monkeypatch.setattr( + sys, + "argv", + [ + "validate_scores.py", + "--meta-env", + str(meta_path), + "--results-glob", + str(tmp_path / "results*.json"), + "--expected-concs", + "1 4", + "--min-score", + "0.85", + ], + ) + + assert validate_scores_main() == 1 + captured = capsys.readouterr() + assert "results_test_conc4.json" in captured.err + assert "0.8000 (< 0.85 from min-score)" in captured.err + + +def test_validate_scores_accepts_complete_batch_above_threshold( + tmp_path: Path, + monkeypatch, +) -> None: + meta_path = tmp_path / "meta_env.json" + meta_path.write_text( + json.dumps({ + "eval_exit_code": 0, + "eval_concs": [1, 4], + "completed_eval_concs": [1, 4], + "failed_eval_concs": [], + "infmax_model_prefix": "test", + }) + ) + for conc, score in ((1, 0.9), (4, 0.86)): + (tmp_path / f"results_test_conc{conc}.json").write_text( + json.dumps({ + "results": { + "custom_eval": { + "exact_match,strict-match": score, + }, + }, + }) + ) + monkeypatch.setattr( + sys, + "argv", + [ + "validate_scores.py", + "--meta-env", + str(meta_path), + "--results-glob", + str(tmp_path / "results*.json"), + "--expected-concs", + "1 4", + "--min-score", + "0.85", + ], + ) + + assert validate_scores_main() == 0 + + +def test_validate_scores_accepts_single_concurrency_above_threshold( + tmp_path: Path, + monkeypatch, +) -> None: + meta_path = tmp_path / "meta_env.json" + meta_path.write_text(json.dumps({ + "eval_exit_code": 0, + "conc": 4, + "infmax_model_prefix": "test", + })) + result_path = tmp_path / "results_test.json" + result_path.write_text( + json.dumps({ + "results": { + "custom_eval": { + "exact_match,strict-match": 0.9, + }, + }, + }) + ) + monkeypatch.setattr( + sys, + "argv", + [ + "validate_scores.py", + "--meta-env", + str(meta_path), + "--results-glob", + str(result_path), + "--expected-concs", + "4", + "--min-score", + "0.85", ], ) assert validate_scores_main() == 0 + + +def test_validate_scores_fails_when_a_concurrency_has_no_score_metric( + tmp_path: Path, + monkeypatch, + capsys, +) -> None: + meta_path = tmp_path / "meta_env.json" + meta_path.write_text( + json.dumps({ + "eval_exit_code": 0, + "eval_concs": [1, 4], + "completed_eval_concs": [1, 4], + "failed_eval_concs": [], + "infmax_model_prefix": "test", + }) + ) + (tmp_path / "results_test_conc1.json").write_text( + json.dumps({ + "results": { + "custom_eval": { + "exact_match,strict-match": 0.9, + }, + }, + }) + ) + (tmp_path / "results_test_conc4.json").write_text( + json.dumps({"results": {"custom_eval": {"alias": "custom"}}}) + ) + monkeypatch.setattr( + sys, + "argv", + [ + "validate_scores.py", + "--meta-env", + str(meta_path), + "--results-glob", + str(tmp_path / "results*.json"), + "--expected-concs", + "1 4", + "--min-score", + "0.85", + ], + ) + + assert validate_scores_main() == 1 captured = capsys.readouterr() assert ( - "WARN: could not inspect eval metadata for batched concurrency status" + "results_test_conc4.json has no numeric metrics matching prefix" in captured.err ) +def test_validate_scores_fails_when_one_task_has_no_score_metric( + tmp_path: Path, + monkeypatch, + capsys, +) -> None: + meta_path = tmp_path / "meta_env.json" + meta_path.write_text(json.dumps({ + "eval_exit_code": 0, + "conc": 4, + "infmax_model_prefix": "test", + })) + result_path = tmp_path / "results_test.json" + result_path.write_text(json.dumps({ + "results": { + "valid_eval": { + "exact_match,strict-match": 0.9, + }, + "missing_metric_eval": { + "alias": "missing_metric_eval", + }, + }, + })) + monkeypatch.setattr( + sys, + "argv", + [ + "validate_scores.py", + "--meta-env", + str(meta_path), + "--results-glob", + str(result_path), + "--expected-concs", + "4", + ], + ) + + assert validate_scores_main() == 1 + assert ( + "missing_metric_eval has no metric matching prefix" + in capsys.readouterr().err + ) + + +def test_validate_scores_fails_for_non_numeric_score( + tmp_path: Path, + monkeypatch, + capsys, +) -> None: + meta_path = tmp_path / "meta_env.json" + meta_path.write_text(json.dumps({ + "eval_exit_code": 0, + "conc": 4, + "infmax_model_prefix": "test", + })) + result_path = tmp_path / "results_test.json" + result_path.write_text(json.dumps({ + "results": { + "custom_eval": { + "exact_match,strict-match": "0.99", + }, + }, + })) + monkeypatch.setattr( + sys, + "argv", + [ + "validate_scores.py", + "--meta-env", + str(meta_path), + "--results-glob", + str(result_path), + "--expected-concs", + "4", + ], + ) + + assert validate_scores_main() == 1 + assert "has non-numeric value '0.99'" in capsys.readouterr().err + + +def test_validate_scores_fails_for_non_finite_score( + tmp_path: Path, + monkeypatch, + capsys, +) -> None: + meta_path = tmp_path / "meta_env.json" + meta_path.write_text(json.dumps({ + "eval_exit_code": 0, + "conc": 4, + "infmax_model_prefix": "test", + })) + result_path = tmp_path / "results_test.json" + result_path.write_text( + '{"results":{"custom_eval":{"exact_match,strict-match":NaN}}}' + ) + monkeypatch.setattr( + sys, + "argv", + [ + "validate_scores.py", + "--meta-env", + str(meta_path), + "--results-glob", + str(result_path), + "--expected-concs", + "4", + ], + ) + + assert validate_scores_main() == 1 + captured = capsys.readouterr() + assert "exact_match,strict-match is not finite" in captured.err + + +def test_validate_scores_fails_for_out_of_range_score( + tmp_path: Path, + monkeypatch, + capsys, +) -> None: + meta_path = tmp_path / "meta_env.json" + meta_path.write_text(json.dumps({ + "eval_exit_code": 0, + "conc": 4, + "infmax_model_prefix": "test", + })) + result_path = tmp_path / "results_test.json" + result_path.write_text(json.dumps({ + "results": { + "custom_eval": { + "exact_match,strict-match": 1.01, + }, + }, + })) + monkeypatch.setattr( + sys, + "argv", + [ + "validate_scores.py", + "--meta-env", + str(meta_path), + "--results-glob", + str(result_path), + "--expected-concs", + "4", + ], + ) + + assert validate_scores_main() == 1 + assert "is outside [0, 1]" in capsys.readouterr().err + + +def test_validate_scores_requires_consistent_metrics_across_concurrencies( + tmp_path: Path, + monkeypatch, + capsys, +) -> None: + meta_path = tmp_path / "meta_env.json" + meta_path.write_text(json.dumps({ + "eval_exit_code": 0, + "eval_concs": [1, 4], + "completed_eval_concs": [1, 4], + "failed_eval_concs": [], + "infmax_model_prefix": "test", + })) + for conc, task in ((1, "custom_eval"), (4, "different_eval")): + (tmp_path / f"results_test_conc{conc}.json").write_text(json.dumps({ + "results": { + task: { + "exact_match,strict-match": 0.9, + }, + }, + })) + monkeypatch.setattr( + sys, + "argv", + [ + "validate_scores.py", + "--meta-env", + str(meta_path), + "--results-glob", + str(tmp_path / "results*.json"), + "--expected-concs", + "1 4", + ], + ) + + assert validate_scores_main() == 1 + captured = capsys.readouterr() + assert "is missing metrics present in results_test_conc1.json" in captured.err + assert ( + "has unexpected metrics compared with results_test_conc1.json" + in captured.err + ) + + +def test_single_eval_failure_is_staged_and_fails_validation( + tmp_path: Path, + monkeypatch, + capsys, +) -> None: + benchmark_lib = ( + Path(__file__).resolve().parents[2] / "benchmarks" / "benchmark_lib.sh" + ) + env = { + **os.environ, + "BENCHMARK_LIB": str(benchmark_lib), + } + script = r''' +source "$BENCHMARK_LIB" + +run_lm_eval() { + local results_dir + results_dir=$(mktemp -d) + export EVAL_RESULT_DIR="$results_dir" + mkdir -p "$results_dir/nested" + printf '%s' \ + '{"results":{"custom_eval":{"exact_match,strict-match":0.99}}}' \ + > "$results_dir/nested/results_test.json" + return 7 +} + +export EVAL_CONCURRENT_REQUESTS=4 +export EVAL_MAX_MODEL_LEN=4096 +export EVAL_ONLY=true +export MODEL=test-model +export MODEL_NAME=test-model +export MODEL_PREFIX=test +export RUNNER_TYPE=h100 +export FRAMEWORK=vllm +export PRECISION=fp8 +export SPEC_DECODING=none +export IS_MULTINODE=false +export ISL=8192 +export OSL=1024 +export TP=8 +export EP_SIZE=1 +export CONC=4 + +run_eval --framework lm-eval --port 8888 +append_lm_eval_summary +''' + subprocess.run( + ["bash", "-c", script], + cwd=tmp_path, + env=env, + check=True, + text=True, + capture_output=True, + ) + + meta_path = tmp_path / "meta_env.json" + assert json.loads(meta_path.read_text())["eval_exit_code"] == 7 + result_path = tmp_path / "results_test.json" + assert result_path.is_file() + + monkeypatch.setattr( + sys, + "argv", + [ + "validate_scores.py", + "--meta-env", + str(meta_path), + "--results-glob", + str(result_path), + "--expected-concs", + "4", + "--min-score", + "0.85", + ], + ) + + assert validate_scores_main() == 1 + assert "eval command failed with exit code 7" in capsys.readouterr().err + + +def test_append_lm_eval_summary_reports_artifact_move_failure( + tmp_path: Path, +) -> None: + benchmark_lib = ( + Path(__file__).resolve().parents[2] / "benchmarks" / "benchmark_lib.sh" + ) + env = { + **os.environ, + "BENCHMARK_LIB": str(benchmark_lib), + } + script = r''' +source "$BENCHMARK_LIB" + +export EVAL_RESULT_DIR="$PWD/eval-output" +mkdir -p "$EVAL_RESULT_DIR/nested" +printf '%s' \ + '{"results":{"custom_eval":{"exact_match,strict-match":0.99}}}' \ + > "$EVAL_RESULT_DIR/nested/results_test.json" + +export EVAL_RUN_EXIT_CODE=0 +export MODEL=test-model +export MODEL_PREFIX=test +export RUNNER_TYPE=h100 +export FRAMEWORK=vllm +export PRECISION=fp8 +export SPEC_DECODING=none +export IS_MULTINODE=false +export ISL=8192 +export OSL=1024 +export TP=8 +export EP_SIZE=1 +export CONC=4 + +mv() { + local arg + for arg in "$@"; do + if [[ "$arg" == *results_test.json ]]; then + return 1 + fi + done + command mv "$@" +} + +if append_lm_eval_summary; then + exit 9 +fi +test -f "$EVAL_RESULT_DIR/nested/results_test.json" +test -f "$PWD/meta_env.json" +''' + completed = subprocess.run( + ["bash", "-c", script], + cwd=tmp_path, + env=env, + check=True, + text=True, + capture_output=True, + ) + + assert "eval artifact staging was incomplete" in completed.stderr + + +def test_run_eval_rejects_duplicate_concurrency_before_execution( + tmp_path: Path, +) -> None: + benchmark_lib = ( + Path(__file__).resolve().parents[2] / "benchmarks" / "benchmark_lib.sh" + ) + trace_path = tmp_path / "ran_eval" + env = { + **os.environ, + "BENCHMARK_LIB": str(benchmark_lib), + "TRACE_PATH": str(trace_path), + } + script = r''' +source "$BENCHMARK_LIB" +run_lm_eval() { + touch "$TRACE_PATH" +} +export EVAL_CONCURRENT_REQUESTS="4 4" +export EVAL_MAX_MODEL_LEN=4096 +export MODEL=test-model +if run_eval --framework lm-eval --port 8888; then + exit 9 +fi +test "$EVAL_RUN_EXIT_CODE" -eq 2 +''' + + subprocess.run( + ["bash", "-c", script], + cwd=tmp_path, + env=env, + check=True, + text=True, + capture_output=True, + ) + assert not trace_path.exists() + + +def test_validate_scores_rejects_invalid_threshold_config( + tmp_path: Path, + monkeypatch, + capsys, +) -> None: + thresholds_path = tmp_path / "thresholds.json" + thresholds_path.write_text( + json.dumps({"default": {"custom_eval": "0.9"}}) + ) + meta_path = tmp_path / "meta_env.json" + meta_path.write_text(json.dumps({ + "eval_exit_code": 0, + "conc": 4, + "infmax_model_prefix": "test", + })) + result_path = tmp_path / "results_test.json" + result_path.write_text(json.dumps({ + "results": { + "custom_eval": { + "exact_match,strict-match": 0.99, + }, + }, + })) + monkeypatch.setattr( + sys, + "argv", + [ + "validate_scores.py", + "--thresholds", + str(thresholds_path), + "--meta-env", + str(meta_path), + "--results-glob", + str(result_path), + "--expected-concs", + "4", + ], + ) + + assert validate_scores_main() == 1 + assert "must be a finite number between 0 and 1" in capsys.readouterr().err + + +def test_batched_eval_rejects_duplicate_result_files(tmp_path: Path) -> None: + meta_path = tmp_path / "meta_env.json" + meta_path.write_text(json.dumps({ + "eval_exit_code": 0, + "eval_concs": [4], + "completed_eval_concs": [4], + "failed_eval_concs": [], + })) + result_files = [] + for name in ("results_test_conc4.json", "results_test_conc4_2.json"): + result_path = tmp_path / name + result_path.write_text('{"results": {}}') + result_files.append(str(result_path)) + + errors = validate_batch_manifest( + str(meta_path), + result_files, + expected_concs=[4], + ) + + assert any( + "duplicate result files for concurrency: 4" in error + for error in errors + ) + + def test_amd_multinode_container_inherits_eval_concurrency_list() -> None: - job_slurm = ( + amd_utils = ( Path(__file__).resolve().parents[2] / "benchmarks" / "multi_node" / "amd_utils" - / "job.slurm" ) + job_slurm = amd_utils / "job.slurm" contents = job_slurm.read_text() + submit_contents = (amd_utils / "submit.sh").read_text() + + assert r'-e \"EVAL_CONC=\$EVAL_CONC\"' in contents + assert "\n -e EVAL_CONC\n" not in contents + for env_name in ( + "RUN_EVAL", + "EVAL_ONLY", + "EVAL_FRAMEWORK", + "EVAL_TASKS_DIR", + "EVAL_MAX_MODEL_LEN", + "MODEL", + "MAX_MODEL_LEN", + "FRAMEWORK", + "PRECISION", + "MODEL_PREFIX", + "RUNNER_TYPE", + "RESULT_FILENAME", + "SPEC_DECODING", + "PREFILL_TP_SIZE", + "PREFILL_ENABLE_EP", + "PREFILL_ENABLE_DP", + "DECODE_TP_SIZE", + "DECODE_ENABLE_EP", + "DECODE_ENABLE_DP", + "IS_MULTINODE", + ): + assert rf"-e {env_name}=\${env_name}" in contents + assert "-e MODEL_PATH=$DOCKER_MODEL_PATH" in contents + for env_name in ( + "EVAL_CONC", + "EVAL_FRAMEWORK", + "EVAL_TASKS_DIR", + "EVAL_MAX_MODEL_LEN", + "MODEL", + "MAX_MODEL_LEN", + ): + export_line = f'export {env_name}="${{{env_name}:-}}"' + assert export_line in contents + assert export_line in submit_contents + + +def test_direct_docker_launcher_forwards_workflow_metadata() -> None: + launcher = ( + Path(__file__).resolve().parents[2] / "runners" / "launch_h100-cr.sh" + ).read_text() + + for env_name in ( + "IMAGE", + "MODEL_PREFIX", + "FRAMEWORK", + "PRECISION", + "EP_SIZE", + "DP_ATTENTION", + "SPEC_DECODING", + "DISAGG", + "RUN_EVAL", + "EVAL_ONLY", + "EVAL_FRAMEWORK", + "EVAL_TASKS_DIR", + "EVAL_MAX_MODEL_LEN", + "OPENAI_API_KEY", + "RUNNER_TYPE", + "RESULT_FILENAME", + "SCENARIO_TYPE", + "SCENARIO_SUBDIR", + "IS_AGENTIC", + "OFFLOADING", + "TOTAL_CPU_DRAM_GB", + "DURATION", + "RESULT_DIR", + "PYTHONPYCACHEPREFIX", + ): + assert f" {env_name}\n" in launcher + + +def test_multinode_launchers_replace_container_owned_eval_artifacts() -> None: + runners_dir = Path(__file__).resolve().parents[2] / "runners" + launchers = ( + "launch_b200-dgxc.sh", + "launch_b300-nv.sh", + "launch_gb200-nv.sh", + "launch_gb300-nv.sh", + "launch_h100-dgxc-slurm.sh", + "launch_h200-dgxc-slurm.sh", + "launch_mi355x-amds.sh", + ) + + for launcher_name in launchers: + contents = (runners_dir / launcher_name).read_text() + assert 'eval_dest="$GITHUB_WORKSPACE/$(basename "$eval_file")"' in contents + assert 'rm -f "$eval_dest"' in contents + assert 'if cp "$eval_file" "$eval_dest"; then' in contents + assert 'cp "$eval_file" "$GITHUB_WORKSPACE/"' not in contents + + amd_launcher = (runners_dir / "launch_mi355x-amds.sh").read_text() + assert ( + '[[ "${RUN_EVAL:-false}" == "true" || ' + '"${EVAL_ONLY:-false}" == "true" ]]' + in amd_launcher + ) + + +def test_atom_launcher_uses_and_records_requested_parallelism() -> None: + server = ( + Path(__file__).resolve().parents[2] + / "benchmarks" + / "multi_node" + / "amd_utils" + / "server_atom.sh" + ).read_text() + + assert 'DECODE_PARALLEL_ARGS=(-tp "$DECODE_TP_SIZE")' in server + assert '[[ "$PREFILL_ENABLE_EP" == "true" ]]' in server + assert '[[ "$PREFILL_ENABLE_DP" == "true" ]]' in server + assert '[[ "$DECODE_ENABLE_EP" == "true" ]]' in server + assert '[[ "$DECODE_ENABLE_DP" == "true" ]]' in server + for metadata_assignment in ( + 'export EP_SIZE=1', + 'export PREFILL_EP=1', + 'export DECODE_EP=1', + 'export DP_ATTENTION="${PREFILL_ENABLE_DP}"', + 'export PREFILL_DP_ATTENTION="${PREFILL_ENABLE_DP}"', + 'export DECODE_DP_ATTENTION="${DECODE_ENABLE_DP}"', + ): + assert metadata_assignment in server + + +def test_eval_workflows_pass_their_requested_concurrencies_to_validation() -> None: + repo_root = Path(__file__).resolve().parents[2] + single_workflow = ( + repo_root / ".github" / "workflows" / "benchmark-tmpl.yml" + ).read_text() + multinode_workflow = ( + repo_root / ".github" / "workflows" / "benchmark-multinode-tmpl.yml" + ).read_text() + run_sweep = ( + repo_root / ".github" / "workflows" / "run-sweep.yml" + ).read_text() + + assert '--expected-concs "${CONC}"' in single_workflow + assert 'expected_concs="${EVAL_CONC}"' in multinode_workflow + assert '--expected-concs "${expected_concs}"' in multinode_workflow + verify_condition = ( + "(success() || failure()) && " + "(inputs.run-eval || inputs.eval-only)" + ) + assert verify_condition in single_workflow + assert verify_condition in multinode_workflow + single_inputs = run_sweep.split("&single-node-inputs", 1)[1].split( + "sweep-single-node-8k1k", + 1, + )[0] + assert "run-eval: false" in single_inputs + assert "run-eval: ${{ matrix.config.run-eval }}" not in run_sweep + + +def test_eval_regression_tests_run_for_every_inspected_runtime_path() -> None: + workflow = ( + Path(__file__).resolve().parents[2] + / ".github" + / "workflows" + / "test-changelog-gate.yml" + ).read_text() + + for watched_path in ( + '".github/workflows/benchmark-tmpl.yml"', + '".github/workflows/benchmark-multinode-tmpl.yml"', + '"benchmarks/benchmark_lib.sh"', + '"benchmarks/multi_node/amd_utils/**"', + '"benchmarks/single_node/**"', + '"runners/launch_*.sh"', + '"utils/evals/**"', + ): + assert watched_path in workflow + assert "utils/evals/test_batched_eval.py" in workflow + + +def test_eval_scripts_in_audited_scope_use_shared_finalization() -> None: + repo_root = Path(__file__).resolve().parents[2] + benchmark_lib = (repo_root / "benchmarks" / "benchmark_lib.sh").resolve() + ignored_scripts = { + "dsv4_fp4_mi355x_atom_mtp.sh", + "dsv4_fp4_mi355x_sglang_mtp.sh", + "dsv4_fp4_mi355x_vllm_mtp.sh", + } + source_pattern = re.compile( + r'source "\$\(dirname "\$0"\)/([^"]*benchmark_lib\.sh)"' + ) + failures = [] + + for script in sorted( + (repo_root / "benchmarks" / "single_node" / "fixed_seq_len").rglob( + "*.sh" + ) + ): + if "deprecated" in script.parts or script.name in ignored_scripts: + continue + contents = script.read_text() + if "run_eval --framework" not in contents: + continue + if "append_lm_eval_summary" not in contents: + failures.append(f"{script}: missing append_lm_eval_summary") + match = source_pattern.search(contents) + if match is None: + failures.append(f"{script}: missing relative benchmark_lib source") + continue + resolved = (script.parent / match.group(1)).resolve() + if resolved != benchmark_lib: + failures.append(f"{script}: resolves benchmark_lib to {resolved}") + + assert not failures, "\n".join(failures) + + +def test_slurm_container_launchers_export_the_workflow_environment() -> None: + runners_dir = Path(__file__).resolve().parents[2] / "runners" + checked = [] + + for launcher in sorted(runners_dir.glob("launch_*.sh")): + contents = launcher.read_text() + if "--container-image=" not in contents: + continue + checked.append(launcher.name) + assert "--export=ALL" in contents, ( + f"{launcher.name} starts a Slurm container without exporting " + "the workflow environment" + ) + + assert checked + + +def test_srtctl_launchers_export_the_eval_workspace() -> None: + runners_dir = Path(__file__).resolve().parents[2] / "runners" + launchers = ( + "launch_b200-dgxc.sh", + "launch_b300-nv.sh", + "launch_gb200-nv.sh", + "launch_gb300-nv.sh", + "launch_h100-dgxc-slurm.sh", + "launch_h200-dgxc-slurm.sh", + ) + + for launcher_name in launchers: + contents = (runners_dir / launcher_name).read_text() + assert 'export EVAL_ONLY="${EVAL_ONLY:-false}"' in contents + assert 'export INFMAX_WORKSPACE="$GITHUB_WORKSPACE"' in contents + assert "srtctl apply" in contents + + +def test_atom_logging_uses_env_and_native_router_flag_without_stream_filter() -> None: + repo_root = Path(__file__).resolve().parents[2] + amd_utils = repo_root / "benchmarks" / "multi_node" / "amd_utils" + env_contents = (amd_utils / "env_atom.sh").read_text() + job_contents = (amd_utils / "job.slurm").read_text() + server_contents = (amd_utils / "server_atom.sh").read_text() + setup_contents = (amd_utils / "setup_deps.sh").read_text() + + assert 'ATOM_LOG_LEVEL="${ATOM_LOG_LEVEL:-WARNING}"' in env_contents + assert ( + 'ATOM_UVICORN_LOG_LEVEL="${ATOM_UVICORN_LOG_LEVEL:-warning}"' + in env_contents + ) + assert 'ATOM_UVICORN_ACCESS_LOG="${ATOM_UVICORN_ACCESS_LOG:-0}"' in env_contents + assert 'ATOMESH_LOG_LEVEL="${ATOMESH_LOG_LEVEL:-warn}"' in env_contents + assert r"-e ATOM_LOG_LEVEL=\${ATOM_LOG_LEVEL:-WARNING}" in job_contents + assert ( + r"-e ATOM_UVICORN_LOG_LEVEL=\${ATOM_UVICORN_LOG_LEVEL:-warning}" + in job_contents + ) + assert ( + r"-e ATOM_UVICORN_ACCESS_LOG=\${ATOM_UVICORN_ACCESS_LOG:-0}" + in job_contents + ) + assert r"-e ATOMESH_LOG_LEVEL=\${ATOMESH_LOG_LEVEL:-warn}" in job_contents + assert "--log-level ${ATOMESH_LOG_LEVEL}" in server_contents + assert "filter_atom_logs.sh" not in server_contents + assert 'os.getenv("ATOM_LOG_LEVEL", "WARNING")' in setup_contents + assert "logger.setLevel(_atom_log_level)" in setup_contents + assert "console_handler.setLevel(_atom_log_level)" in setup_contents + assert "logger.propagate = False" in setup_contents + assert "ATOM_UVICORN_ACCESS_LOG" in setup_contents + assert 'access_log=__import__("os").getenv(' in setup_contents + assert "verify_logger_behavior()" in setup_contents + assert "verify_uvicorn_behavior()" in setup_contents + assert not (amd_utils / "filter_atom_logs.sh").exists() + + +def test_atom_server_aborts_when_setup_or_environment_fails(tmp_path: Path) -> None: + server = ( + Path(__file__).resolve().parents[2] + / "benchmarks" + / "multi_node" + / "amd_utils" + / "server_atom.sh" + ) + + for failing_script, expected_error in ( + ("setup_deps.sh", "failed to initialize ATOM dependencies"), + ("env_atom.sh", "failed to initialize ATOM environment"), + ): + atom_workspace = tmp_path / failing_script + atom_workspace.mkdir() + (atom_workspace / "setup_deps.sh").write_text("return 0\n") + (atom_workspace / "env_atom.sh").write_text("return 0\n") + (atom_workspace / failing_script).write_text("return 42\n") + + result = subprocess.run( + ["bash", str(server)], + env={ + **os.environ, + "ATOM_WS_PATH": str(atom_workspace), + }, + text=True, + capture_output=True, + ) + + assert result.returncode == 1 + assert expected_error in result.stderr + + +def test_atom_logging_patch_is_idempotent(tmp_path: Path) -> None: + repo_root = Path(__file__).resolve().parents[2] + setup_deps = ( + repo_root / "benchmarks" / "multi_node" / "amd_utils" / "setup_deps.sh" + ) + atom_dir = tmp_path / "atom" + logger_path = atom_dir / "utils" / "__init__.py" + api_path = atom_dir / "entrypoints" / "openai" / "api_server.py" + logger_path.parent.mkdir(parents=True) + api_path.parent.mkdir(parents=True) + (atom_dir / "__init__.py").write_text("") + logger_path.write_text( + "import logging\n" + "import os\n\n" + 'logger = logging.getLogger("atom")\n\n' + "def getLogger():\n" + " global logger\n" + " if not logger.handlers:\n" + " logger.setLevel(logging.DEBUG)\n" + " console_handler = logging.StreamHandler()\n" + " console_handler.setLevel(logging.INFO)\n" + " logger.addHandler(console_handler)\n" + " return logger\n\n" + "logger = getLogger()\n" + ) + api_path.write_text( + "def main(app, args, uvicorn):\n" + " uvicorn.run(app, host=args.host, port=args.server_port)\n" + ) + env = { + **os.environ, + "PYTHONPATH": str(tmp_path), + "SETUP_DEPS": str(setup_deps), + } + script = r''' +_SETUP_INSTALLED=() +eval "$(sed -n '/^patch_atom_logging_controls()/,/^}/p' "$SETUP_DEPS")" +patch_atom_logging_controls +patch_atom_logging_controls +''' + + completed = subprocess.run( + ["bash", "-c", script], + env=env, + check=True, + text=True, + capture_output=True, + ) + + logger_contents = logger_path.read_text() + api_contents = api_path.read_text() + assert logger_contents.count('os.getenv("ATOM_LOG_LEVEL"') == 1 + assert logger_contents.count("logger.setLevel(_atom_log_level)") == 1 + assert logger_contents.count("console_handler.setLevel(_atom_log_level)") == 1 + assert logger_contents.count("logger.propagate = False") == 1 + assert api_contents.count("ATOM_UVICORN_LOG_LEVEL") == 1 + assert api_contents.count("ATOM_UVICORN_ACCESS_LOG") == 1 + assert "engine=WARNING propagation=off uvicorn=warning access_log=off" in ( + completed.stdout + ) + subprocess.run( + ["python3", "-m", "py_compile", str(logger_path), str(api_path)], + check=True, + text=True, + capture_output=True, + ) + + +def test_amd_servers_fail_when_eval_artifact_finalization_fails() -> None: + amd_utils = ( + Path(__file__).resolve().parents[2] + / "benchmarks" + / "multi_node" + / "amd_utils" + ) - assert "-e EVAL_CONC\n" in contents - assert r"-e EVAL_CONC=\$EVAL_CONC" not in contents + for server_name in ("server_atom.sh", "server_sglang.sh", "server_vllm.sh"): + contents = (amd_utils / server_name).read_text() + assert "append_lm_eval_summary; then" in contents + assert ( + 'if ! _copy_lm_eval_artifacts /workspace "$EVAL_COPY_DIR"; then' + in contents + ) + assert 'echo "ERROR: failed to finalize eval artifacts"' in contents + assert 'echo "ERROR: failed to stage eval artifacts' in contents diff --git a/utils/evals/validate_scores.py b/utils/evals/validate_scores.py index f74ed267f..760f89294 100644 --- a/utils/evals/validate_scores.py +++ b/utils/evals/validate_scores.py @@ -28,13 +28,16 @@ Usage: python3 utils/evals/validate_scores.py + python3 utils/evals/validate_scores.py --expected-concs "1 2 4 8" python3 utils/evals/validate_scores.py --thresholds my_thresholds.json python3 utils/evals/validate_scores.py --model-prefix dsv4 python3 utils/evals/validate_scores.py --min-score 0.90 # flat fallback """ import argparse +from collections import Counter import glob import json +import math import os import re import sys @@ -43,6 +46,28 @@ CONC_SUFFIX_RE = re.compile(r"_conc(\d+)(?:_\d+)?\.json$") +def _validate_threshold_map(values: object, location: str) -> dict[str, float]: + """Validate and normalize one task-to-threshold mapping.""" + if not isinstance(values, dict): + raise ValueError(f"{location} must be a JSON object") + + normalized = {} + for task, value in values.items(): + if not isinstance(task, str) or not task: + raise ValueError(f"{location} contains an invalid task name") + if ( + not isinstance(value, (int, float)) + or isinstance(value, bool) + or not math.isfinite(value) + or not 0 <= value <= 1 + ): + raise ValueError( + f"{location}.{task} must be a finite number between 0 and 1" + ) + normalized[task] = float(value) + return normalized + + def load_config(path: str) -> dict: """Load thresholds config, normalized to {"default": {...}, "models": {...}}. @@ -56,8 +81,34 @@ def load_config(path: str) -> dict: raise ValueError("thresholds config must be a JSON object") if "default" not in cfg and "models" not in cfg: # Legacy flat format: the whole object is the per-task default. - return {"default": cfg, "models": {}} - return {"default": cfg.get("default", {}), "models": cfg.get("models", {})} + return { + "default": _validate_threshold_map(cfg, "default"), + "models": {}, + } + + unknown_keys = sorted(set(cfg) - {"default", "models"}) + if unknown_keys: + raise ValueError( + "thresholds config contains unsupported keys: " + + ", ".join(unknown_keys) + ) + + models = cfg.get("models", {}) + if not isinstance(models, dict): + raise ValueError("models must be a JSON object") + normalized_models = {} + for prefix, thresholds in models.items(): + if not isinstance(prefix, str) or not prefix: + raise ValueError("models contains an invalid model prefix") + normalized_models[prefix] = _validate_threshold_map( + thresholds, + f"models.{prefix}", + ) + + return { + "default": _validate_threshold_map(cfg.get("default", {}), "default"), + "models": normalized_models, + } def detect_model_prefix(meta_env_path: str, override: str | None) -> str | None: @@ -67,7 +118,7 @@ def detect_model_prefix(meta_env_path: str, override: str | None) -> str | None: try: with open(meta_env_path) as f: prefix = json.load(f).get("infmax_model_prefix") - if prefix and prefix != "unknown": + if isinstance(prefix, str) and prefix and prefix != "unknown": return prefix except (json.JSONDecodeError, OSError, AttributeError): pass @@ -88,68 +139,197 @@ def resolve_threshold(config: dict, prefix: str | None, task: str, fallback: flo return fallback, "min-score" +def parse_expected_concs(raw_value: str | None) -> list[int] | None: + """Parse a workflow-provided, space-separated concurrency list.""" + if raw_value is None: + return None + if not raw_value.strip(): + raise ValueError("expected concurrency list is empty") + + values = raw_value.split() + if not all(re.fullmatch(r"[1-9][0-9]*", value) for value in values): + raise ValueError( + "expected concurrencies must be positive integers separated by spaces" + ) + + concs = [int(value) for value in values] + if len(set(concs)) != len(concs): + raise ValueError("expected concurrency list contains duplicates") + return concs + + +def _is_valid_conc_list( + values: object, + *, + allow_empty: bool = True, +) -> bool: + """Return whether a value is a list of unique positive integer concurrencies.""" + return ( + isinstance(values, list) + and (allow_empty or bool(values)) + and all( + isinstance(value, int) and not isinstance(value, bool) and value > 0 + for value in values + ) + and len(set(values)) == len(values) + ) + + def validate_batch_manifest( meta_env_path: str, result_files: list[str], + expected_concs: list[int] | None = None, ) -> list[str]: - """Validate that a batched eval produced every requested concurrency.""" + """Validate that eval artifacts cover every workflow-requested concurrency.""" + errors = [] + if expected_concs is not None and not _is_valid_conc_list( + expected_concs, + allow_empty=False, + ): + return ["workflow supplied an invalid expected concurrency list"] + if not result_files: + errors.append("eval produced no result files") + try: with open(meta_env_path) as f: meta = json.load(f) - except (json.JSONDecodeError, OSError) as exc: - if any( + if not isinstance(meta, dict): + raise ValueError("metadata root must be a JSON object") + except (json.JSONDecodeError, OSError, ValueError) as exc: + if expected_concs is not None or any( CONC_SUFFIX_RE.search(Path(result_file).name) for result_file in result_files ): - return [ - "batched eval result files exist but " - f"{meta_env_path} is unavailable or invalid: {exc}" - ] - return [] - - if "eval_concs" not in meta: - return [] - - expected = meta.get("eval_concs") - completed = meta.get("completed_eval_concs") - failed = meta.get("failed_eval_concs") - if not all(isinstance(values, list) for values in (expected, completed, failed)): - return ["batched eval metadata must contain list-valued concurrency fields"] - if not all( - isinstance(value, int) and value > 0 - for values in (expected, completed, failed) - for value in values - ): - return ["batched eval metadata contains an invalid concurrency"] + errors.append( + f"eval metadata {meta_env_path} is unavailable or invalid: {exc}" + ) + return errors - errors = [] - expected_set = set(expected) - completed_set = set(completed) - failed_set = set(failed) - if len(expected_set) != len(expected): - errors.append("batched eval metadata contains duplicate expected concurrencies") - if len(completed_set) != len(completed): - errors.append("batched eval metadata contains duplicate completed concurrencies") - if failed_set: + eval_exit_code = meta.get("eval_exit_code") + if expected_concs is not None and ( + not isinstance(eval_exit_code, int) + or isinstance(eval_exit_code, bool) + or eval_exit_code < 0 + ): errors.append( - "batched eval failed for concurrency: " - + ", ".join(str(value) for value in sorted(failed_set)) + "eval metadata must contain a non-negative integer eval_exit_code" ) - if completed_set != expected_set: - missing = sorted(expected_set - completed_set) - unexpected = sorted(completed_set - expected_set) - if missing: + elif "eval_exit_code" in meta and ( + not isinstance(eval_exit_code, int) + or isinstance(eval_exit_code, bool) + or eval_exit_code < 0 + ): + errors.append("eval metadata contains an invalid eval_exit_code") + elif isinstance(eval_exit_code, int) and eval_exit_code != 0: + errors.append(f"eval command failed with exit code {eval_exit_code}") + + metadata_expected = meta.get("eval_concs") + metadata_is_batched = "eval_concs" in meta + if expected_concs is None and not metadata_is_batched: + if any( + CONC_SUFFIX_RE.search(Path(result_file).name) + for result_file in result_files + ): errors.append( - "batched eval is missing completed concurrency: " - + ", ".join(str(value) for value in missing) + "concurrency-suffixed eval results exist but batched metadata is missing" ) - if unexpected: + return errors + + if expected_concs is not None and len(expected_concs) == 1 and not metadata_is_batched: + metadata_conc = meta.get("conc") + if ( + not isinstance(metadata_conc, int) + or isinstance(metadata_conc, bool) + or metadata_conc != expected_concs[0] + ): + errors.append( + "eval metadata concurrency " + f"{metadata_conc!r} does not match workflow request " + f"{expected_concs[0]}" + ) + if len(result_files) != 1: + errors.append( + "non-batched eval must produce exactly one result file; " + f"found {len(result_files)}" + ) + suffixed_results = [ + result_file + for result_file in result_files + if CONC_SUFFIX_RE.search(Path(result_file).name) + ] + if suffixed_results: + errors.append( + "non-batched eval produced concurrency-suffixed result files" + ) + return errors + + if not metadata_is_batched: + errors.append( + "workflow requested multiple eval concurrencies but batched metadata is missing" + ) + expected_set = set(expected_concs or []) + else: + completed = meta.get("completed_eval_concs") + failed = meta.get("failed_eval_concs") + if not ( + _is_valid_conc_list(metadata_expected, allow_empty=False) + and _is_valid_conc_list(completed) + and _is_valid_conc_list(failed) + ): errors.append( - "batched eval completed unexpected concurrency: " - + ", ".join(str(value) for value in unexpected) + "batched eval metadata must contain unique, positive-integer " + "concurrency lists" ) + expected_set = set(expected_concs or []) + else: + metadata_expected_set = set(metadata_expected) + expected_set = set(expected_concs or metadata_expected) + completed_set = set(completed) + failed_set = set(failed) + overlap = sorted(completed_set & failed_set) + if overlap: + errors.append( + "batched eval metadata marks concurrency as both completed " + "and failed: " + + ", ".join(str(value) for value in overlap) + ) + + if ( + expected_concs is not None + and metadata_expected_set != expected_set + ): + missing = sorted(expected_set - metadata_expected_set) + unexpected = sorted(metadata_expected_set - expected_set) + if missing: + errors.append( + "batched eval metadata is missing workflow concurrency: " + + ", ".join(str(value) for value in missing) + ) + if unexpected: + errors.append( + "batched eval metadata has unexpected concurrency: " + + ", ".join(str(value) for value in unexpected) + ) + if failed_set: + errors.append( + "batched eval failed for concurrency: " + + ", ".join(str(value) for value in sorted(failed_set)) + ) + if completed_set != expected_set: + missing = sorted(expected_set - completed_set) + unexpected = sorted(completed_set - expected_set) + if missing: + errors.append( + "batched eval is missing completed concurrency: " + + ", ".join(str(value) for value in missing) + ) + if unexpected: + errors.append( + "batched eval completed unexpected concurrency: " + + ", ".join(str(value) for value in unexpected) + ) - actual_concs = set() + actual_conc_counts = Counter() for result_file in result_files: match = CONC_SUFFIX_RE.search(Path(result_file).name) if match is None: @@ -157,8 +337,18 @@ def validate_batch_manifest( f"batched eval result lacks a concurrency suffix: {result_file}" ) continue - actual_concs.add(int(match.group(1))) + actual_conc_counts[int(match.group(1))] += 1 + duplicate_results = sorted( + conc for conc, count in actual_conc_counts.items() if count > 1 + ) + if duplicate_results: + errors.append( + "batched eval has duplicate result files for concurrency: " + + ", ".join(str(value) for value in duplicate_results) + ) + + actual_concs = set(actual_conc_counts) missing_results = sorted(expected_set - actual_concs) unexpected_results = sorted(actual_concs - expected_set) if missing_results: @@ -200,21 +390,43 @@ def main() -> int: "--results-glob", default="results*.json", help="Glob pattern for result files (default: 'results*.json')", ) + parser.add_argument( + "--expected-concs", + default=None, + help=( + "Space-separated concurrencies requested by the workflow. When set, " + "metadata and result coverage must match exactly." + ), + ) args = parser.parse_args() + try: + expected_concs = parse_expected_concs(args.expected_concs) + except ValueError as exc: + print(f"FAIL: {exc}", file=sys.stderr) + return 1 + + if not math.isfinite(args.min_score) or not 0 <= args.min_score <= 1: + print( + "FAIL: --min-score must be a finite number between 0 and 1", + file=sys.stderr, + ) + return 1 + if not args.metric_prefix: + print("FAIL: --metric-prefix must not be empty", file=sys.stderr) + return 1 + # Load thresholds config - config = {"default": {}, "models": {}} - thresholds_path = args.thresholds - if thresholds_path is None: - default_path = Path(__file__).parent / "thresholds.json" - if default_path.exists(): - thresholds_path = str(default_path) - if thresholds_path: - try: - config = load_config(thresholds_path) - print(f"Loaded thresholds from {thresholds_path}") - except (json.JSONDecodeError, OSError, ValueError) as e: - print(f"WARN: could not load thresholds from {thresholds_path}: {e}", file=sys.stderr) + thresholds_path = args.thresholds or str(Path(__file__).parent / "thresholds.json") + try: + config = load_config(thresholds_path) + print(f"Loaded thresholds from {thresholds_path}") + except (json.JSONDecodeError, OSError, ValueError) as e: + print( + f"FAIL: could not load thresholds from {thresholds_path}: {e}", + file=sys.stderr, + ) + return 1 # Identify the model so per-model thresholds can apply prefix = detect_model_prefix(args.meta_env, args.model_prefix) @@ -228,45 +440,146 @@ def main() -> int: failed = False checked = 0 result_files = sorted(glob.glob(args.results_glob)) + expected_metric_set: set[tuple[str, str]] | None = None + expected_metric_source: str | None = None - manifest_errors = validate_batch_manifest(args.meta_env, result_files) + manifest_errors = validate_batch_manifest( + args.meta_env, + result_files, + expected_concs=expected_concs, + ) for error in manifest_errors: print(f"FAIL: {error}", file=sys.stderr) failed = True if not manifest_errors: - try: - with open(args.meta_env) as f: - if "eval_concs" in json.load(f): - print("PASS: batched eval produced every requested concurrency") - except (json.JSONDecodeError, OSError) as exc: + if expected_concs is not None: print( - "WARN: could not inspect eval metadata for batched concurrency " - f"status: {exc}", - file=sys.stderr, + "PASS: eval produced every requested concurrency: " + + ", ".join(str(value) for value in expected_concs) ) + else: + try: + with open(args.meta_env) as f: + if "eval_concs" in json.load(f): + print("PASS: batched eval produced every requested concurrency") + except (json.JSONDecodeError, OSError) as exc: + print( + "WARN: could not inspect eval metadata for batched concurrency " + f"status: {exc}", + file=sys.stderr, + ) for f in result_files: - with open(f) as fh: - data = json.load(fh) - for task, metrics in data.get("results", {}).items(): + try: + with open(f) as fh: + data = json.load(fh) + except (json.JSONDecodeError, OSError) as exc: + print(f"FAIL: could not read eval result {f}: {exc}", file=sys.stderr) + failed = True + continue + + file_checked = 0 + file_metric_set: set[tuple[str, str]] = set() + results = data.get("results", {}) if isinstance(data, dict) else {} + if not isinstance(results, dict): + results = {} + for task, metrics in results.items(): + if not isinstance(metrics, dict): + print( + f"FAIL: {Path(f).name}: {task} result is not a JSON object", + file=sys.stderr, + ) + failed = True + continue min_score, source = resolve_threshold(config, prefix, task, args.min_score) + task_has_metric = False + task_checked = 0 for name, val in metrics.items(): if not name.startswith(args.metric_prefix) or "stderr" in name: continue - if not isinstance(val, (int, float)): + task_has_metric = True + file_metric_set.add((task, name)) + if not isinstance(val, (int, float)) or isinstance(val, bool): + print( + f"FAIL: {Path(f).name}: {task} {name} has non-numeric " + f"value {val!r}", + file=sys.stderr, + ) + failed = True continue checked += 1 - if val < min_score: + file_checked += 1 + task_checked += 1 + if not math.isfinite(val): + print( + f"FAIL: {Path(f).name}: {task} {name} is not finite", + file=sys.stderr, + ) + failed = True + elif not 0 <= val <= 1: print( - f"FAIL: {task} {name} = {val:.4f} (< {min_score} from {source})", + f"FAIL: {Path(f).name}: {task} {name} = {val:.4f} " + "is outside [0, 1]", + file=sys.stderr, + ) + failed = True + elif val < min_score: + print( + f"FAIL: {Path(f).name}: {task} {name} = {val:.4f} " + f"(< {min_score} from {source})", file=sys.stderr, ) failed = True else: - print(f"PASS: {task} {name} = {val:.4f} (>= {min_score} from {source})") + print( + f"PASS: {Path(f).name}: {task} {name} = {val:.4f} " + f"(>= {min_score} from {source})" + ) + if not task_has_metric: + print( + f"FAIL: {Path(f).name}: {task} has no metric matching " + f"prefix {args.metric_prefix!r}", + file=sys.stderr, + ) + failed = True + elif task_checked == 0: + failed = True + if file_checked == 0: + print( + f"FAIL: {Path(f).name} has no numeric metrics matching " + f"prefix {args.metric_prefix!r}", + file=sys.stderr, + ) + failed = True + elif expected_metric_set is None: + expected_metric_set = file_metric_set + expected_metric_source = Path(f).name + elif file_metric_set != expected_metric_set: + missing_metrics = sorted(expected_metric_set - file_metric_set) + unexpected_metrics = sorted(file_metric_set - expected_metric_set) + if missing_metrics: + print( + f"FAIL: {Path(f).name} is missing metrics present in " + f"{expected_metric_source}: " + + ", ".join(f"{task}/{metric}" for task, metric in missing_metrics), + file=sys.stderr, + ) + if unexpected_metrics: + print( + f"FAIL: {Path(f).name} has unexpected metrics compared with " + f"{expected_metric_source}: " + + ", ".join( + f"{task}/{metric}" for task, metric in unexpected_metrics + ), + file=sys.stderr, + ) + failed = True if checked == 0: - print("WARN: no metrics matched prefix '{}'".format(args.metric_prefix), file=sys.stderr) + print( + "FAIL: no metrics matched prefix '{}'".format(args.metric_prefix), + file=sys.stderr, + ) return 1 if (failed or checked == 0) else 0 diff --git a/utils/matrix_logic/test_validation.py b/utils/matrix_logic/test_validation.py index 0f5bc4424..53d70fece 100644 --- a/utils/matrix_logic/test_validation.py +++ b/utils/matrix_logic/test_validation.py @@ -276,11 +276,16 @@ def test_valid_entry(self, valid_single_node_matrix_entry): assert entry.conc == 4 assert entry.framework == "sglang" - def test_conc_as_list(self, valid_single_node_matrix_entry): - """Conc can be a list of integers.""" + def test_conc_as_list_is_rejected(self, valid_single_node_matrix_entry): + """Single-node workflow entries require one concurrency integer.""" valid_single_node_matrix_entry["conc"] = [4, 8, 16, 32, 64] - entry = SingleNodeMatrixEntry(**valid_single_node_matrix_entry) - assert entry.conc == [4, 8, 16, 32, 64] + with pytest.raises(Exception): + SingleNodeMatrixEntry(**valid_single_node_matrix_entry) + + def test_eval_only_requires_run_eval(self, valid_single_node_matrix_entry): + valid_single_node_matrix_entry["eval-only"] = True + with pytest.raises(Exception): + SingleNodeMatrixEntry(**valid_single_node_matrix_entry) def test_spec_decoding_values(self, valid_single_node_matrix_entry): """Spec decoding should accept valid literal values.""" @@ -389,12 +394,34 @@ def test_all_eval_concurrency_batch_marker( self, valid_multinode_matrix_entry, ): + valid_multinode_matrix_entry["run-eval"] = True valid_multinode_matrix_entry["eval-all-concs"] = True entry = MultiNodeMatrixEntry(**valid_multinode_matrix_entry) assert entry.eval_all_concs is True + def test_eval_conc_must_be_in_conc_list( + self, + valid_multinode_matrix_entry, + ): + valid_multinode_matrix_entry["run-eval"] = True + valid_multinode_matrix_entry["eval-conc"] = 64 + + with pytest.raises(Exception): + MultiNodeMatrixEntry(**valid_multinode_matrix_entry) + + def test_eval_all_concs_rejects_eval_conc( + self, + valid_multinode_matrix_entry, + ): + valid_multinode_matrix_entry["run-eval"] = True + valid_multinode_matrix_entry["eval-all-concs"] = True + valid_multinode_matrix_entry["eval-conc"] = 2150 + + with pytest.raises(Exception): + MultiNodeMatrixEntry(**valid_multinode_matrix_entry) + def test_conc_must_be_list(self, valid_multinode_matrix_entry): """Conc must be a list for multinode.""" valid_multinode_matrix_entry["conc"] = 2150 # Single int, not list diff --git a/utils/matrix_logic/validation.py b/utils/matrix_logic/validation.py index 2b7bf3baf..4a140d6f0 100644 --- a/utils/matrix_logic/validation.py +++ b/utils/matrix_logic/validation.py @@ -96,13 +96,19 @@ class SingleNodeMatrixEntry(BaseModel): tp: int ep: int dp_attn: bool = Field(alias=Fields.DP_ATTN.value) - conc: Union[int, List[int]] + conc: int = Field(gt=0) max_model_len: int = Field(alias=Fields.MAX_MODEL_LEN.value) exp_name: str = Field(alias=Fields.EXP_NAME.value) disagg: bool run_eval: bool = Field(alias=Fields.RUN_EVAL.value) eval_only: bool = Field(alias=Fields.EVAL_ONLY.value, default=False) + @model_validator(mode='after') + def validate_eval_fields(self): + if self.eval_only and not self.run_eval: + raise ValueError("eval-only requires run-eval=true") + return self + class WorkerConfig(BaseModel): """Pydantic model for validating worker configuration in multinode entries.""" @@ -134,7 +140,7 @@ class MultiNodeMatrixEntry(BaseModel): osl: int prefill: WorkerConfig decode: WorkerConfig - conc: List[int] + conc: List[int] = Field(min_length=1) max_model_len: int = Field(alias=Fields.MAX_MODEL_LEN.value) exp_name: str = Field(alias=Fields.EXP_NAME.value) disagg: bool @@ -145,6 +151,26 @@ class MultiNodeMatrixEntry(BaseModel): default=False, alias=Fields.EVAL_ALL_CONCS.value ) + @model_validator(mode='after') + def validate_eval_fields(self): + if any(conc <= 0 for conc in self.conc): + raise ValueError("conc entries must be greater than 0") + if len(set(self.conc)) != len(self.conc): + raise ValueError("conc entries must be unique") + if self.eval_only and not self.run_eval: + raise ValueError("eval-only requires run-eval=true") + if self.eval_conc is not None: + if not self.run_eval: + raise ValueError("eval-conc requires run-eval=true") + if self.eval_conc not in self.conc: + raise ValueError("eval-conc must be present in conc") + if self.eval_all_concs: + if not self.run_eval: + raise ValueError("eval-all-concs requires run-eval=true") + if self.eval_conc is not None: + raise ValueError("eval-all-concs cannot be combined with eval-conc") + return self + class SingleNodeAgenticMatrixEntry(BaseModel): """Pydantic model for validating single-node agentic coding matrix entries.""" @@ -273,6 +299,10 @@ def _validate_conc_fields(self): raise ValueError( f"Input '{Fields.CONC_LIST.value}' entries must be greater than 0." ) + if len(set(self.conc_list)) != len(self.conc_list): + raise ValueError( + f"Input '{Fields.CONC_LIST.value}' entries must be unique." + ) return self diff --git a/utils/test_collect_eval_results.py b/utils/test_collect_eval_results.py index 019bbdf12..f66b795aa 100644 --- a/utils/test_collect_eval_results.py +++ b/utils/test_collect_eval_results.py @@ -3,7 +3,11 @@ import json from pathlib import Path -from collect_eval_results import build_row, collect_eval_rows +from collect_eval_results import ( + build_row, + collect_eval_rows, + extract_lm_metrics, +) def test_build_row_preserves_sequence_lengths() -> None: @@ -23,6 +27,52 @@ def test_build_row_preserves_sequence_lengths() -> None: assert row["osl"] == 1024 +def test_build_row_uses_flexible_metric_as_primary_score() -> None: + row = build_row( + { + "infmax_model_prefix": "test", + "hw": "h100", + "framework": "vllm", + "precision": "fp8", + }, + { + "task": "gpqa_diamond_cot_n_shot", + "flex": 0.42, + "flex_se": 0.02, + }, + ) + + assert row["score"] == 0.42 + assert row["score_name"] == "em_flexible" + assert row["score_se"] == 0.02 + + +def test_extract_lm_metrics_supports_default_none_filter( + tmp_path: Path, +) -> None: + result_path = tmp_path / "results_accuracy.json" + result_path.write_text(json.dumps({ + "lm_eval_version": "0.4.9", + "results": { + "multiple_choice": { + "acc,none": 0.75, + "acc_stderr,none": 0.03, + }, + }, + "configs": { + "multiple_choice": { + "metric_list": [{"metric": "acc"}], + "filter_list": [], + }, + }, + })) + + metrics = extract_lm_metrics(result_path) + + assert metrics[0]["accuracy"] == 0.75 + assert metrics[0]["accuracy_se"] == 0.03 + + def _write_lm_eval_result(path: Path, score: float) -> None: path.write_text(json.dumps({ "lm_eval_version": "0.4.0", @@ -63,6 +113,7 @@ def test_collect_eval_rows_expands_batched_concurrencies( "decode_tp": 8, "decode_ep": 1, "decode_num_workers": 2, + "eval_exit_code": 0, "eval_concs": [4, 16], "completed_eval_concs": [4, 16], "failed_eval_concs": [], @@ -83,13 +134,14 @@ def test_collect_eval_rows_expands_batched_concurrencies( assert [row["score"] for row in rows] == [0.90, 0.91] -def test_collect_eval_rows_ignores_failed_batch_points( +def test_collect_eval_rows_rejects_failed_batch( tmp_path: Path, ) -> None: artifact_dir = tmp_path / "eval_batch" artifact_dir.mkdir() (artifact_dir / "meta_env.json").write_text(json.dumps({ "is_multinode": True, + "eval_exit_code": 1, "eval_concs": [4, 16], "completed_eval_concs": [4], "failed_eval_concs": [16], @@ -106,4 +158,59 @@ def test_collect_eval_rows_ignores_failed_batch_points( rows = collect_eval_rows(tmp_path) - assert [row["conc"] for row in rows] == [4] + assert rows == [] + + +def test_collect_eval_rows_ignores_failed_single_eval(tmp_path: Path) -> None: + artifact_dir = tmp_path / "eval_failed" + artifact_dir.mkdir() + (artifact_dir / "meta_env.json").write_text(json.dumps({ + "is_multinode": False, + "eval_exit_code": 7, + "conc": 4, + })) + _write_lm_eval_result(artifact_dir / "results_test.json", 0.99) + + assert collect_eval_rows(tmp_path) == [] + + +def test_collect_eval_rows_rejects_inconsistent_batch_metadata( + tmp_path: Path, +) -> None: + artifact_dir = tmp_path / "eval_batch" + artifact_dir.mkdir() + (artifact_dir / "meta_env.json").write_text(json.dumps({ + "is_multinode": True, + "eval_exit_code": 0, + "eval_concs": [4, 16], + "completed_eval_concs": [4, 16], + "failed_eval_concs": [16], + "conc": 4, + })) + _write_lm_eval_result( + artifact_dir / "results_test_conc4.json", + 0.90, + ) + + assert collect_eval_rows(tmp_path) == [] + + +def test_collect_eval_rows_rejects_incomplete_batch_results( + tmp_path: Path, +) -> None: + artifact_dir = tmp_path / "eval_batch" + artifact_dir.mkdir() + (artifact_dir / "meta_env.json").write_text(json.dumps({ + "is_multinode": True, + "eval_exit_code": 0, + "eval_concs": [4, 16], + "completed_eval_concs": [4, 16], + "failed_eval_concs": [], + "conc": 4, + })) + _write_lm_eval_result( + artifact_dir / "results_test_conc4.json", + 0.90, + ) + + assert collect_eval_rows(tmp_path) == [] diff --git a/utils/test_process_agentic_result.py b/utils/test_process_agentic_result.py index 38477b62a..e21e96d61 100644 --- a/utils/test_process_agentic_result.py +++ b/utils/test_process_agentic_result.py @@ -21,6 +21,8 @@ import pytest +from process_agentic_result import _HF_DATASET + PROCESSOR = Path(__file__).resolve().parent / "process_agentic_result.py" @@ -405,7 +407,8 @@ def test_processor_loads_traces_jsonl_for_theoretical_cache(tmp_path: Path): # Build a fake HF cache with traces.jsonl matching the conv_ids the # fixture references (trace-A, trace-B). hf_cache = tmp_path / "_hf" - snapshot = hf_cache / "datasets--semianalysisai--cc-traces-weka-042026" / "snapshots" / "abc" + org, name = _HF_DATASET.split("/", 1) + snapshot = hf_cache / f"datasets--{org}--{name}" / "snapshots" / "abc" snapshot.mkdir(parents=True) # Real corpus uses the ``out`` alias (Pydantic's external name for # output_length). Mix both to verify the loader accepts either. diff --git a/utils/test_validate_reusable_sweep_artifacts.py b/utils/test_validate_reusable_sweep_artifacts.py index 318724e4c..be10a5295 100644 --- a/utils/test_validate_reusable_sweep_artifacts.py +++ b/utils/test_validate_reusable_sweep_artifacts.py @@ -72,6 +72,8 @@ def single_eval_result( "dp_attention": False, "conc": conc, "task": "gsm8k", + "score": 0.95, + "score_name": "em_strict", } @@ -83,6 +85,7 @@ def single_eval_meta( ) -> dict: row = single_eval_result(conc, runner, isl, osl) row["infmax_model_prefix"] = row.pop("model_prefix") + row["eval_exit_code"] = 0 return row @@ -100,6 +103,9 @@ def write_raw_eval_artifact( (artifact_dir / "meta_env.json").write_text( json.dumps(single_eval_meta(conc, logical_runner, isl, osl)) ) + (artifact_dir / "results_test.json").write_text( + json.dumps({"lm_eval_version": "0.4.0", "results": {}}) + ) def multinode_eval_entry(concs: list[int]) -> dict: @@ -149,6 +155,8 @@ def multinode_eval_result(conc: int) -> dict: "decode_num_workers": 2, "conc": conc, "task": "gsm8k", + "score": 0.95, + "score_name": "em_strict", } @@ -163,7 +171,12 @@ def write_raw_batched_eval_artifact( meta["eval_concs"] = concs meta["completed_eval_concs"] = concs meta["failed_eval_concs"] = [] + meta["eval_exit_code"] = 0 (artifact_dir / "meta_env.json").write_text(json.dumps(meta)) + for conc in concs: + (artifact_dir / f"results_test_conc{conc}.json").write_text( + json.dumps({"lm_eval_version": "0.4.0", "results": {}}) + ) def single_fixed_entry(conc: int) -> dict: @@ -439,6 +452,97 @@ def test_eval_validation_expands_one_batched_multinode_artifact( assert validate_eval_artifacts(tmp_path, expected) == [] +def test_eval_validation_rejects_failed_raw_eval_status(tmp_path: Path) -> None: + config = {"evals": [single_eval_entry(32)], "multinode_evals": []} + write_eval_aggregate(tmp_path, [single_eval_result(32)]) + write_raw_eval_artifact(tmp_path, 32) + meta_path = next(tmp_path.glob("eval_*/meta_env.json")) + meta = json.loads(meta_path.read_text()) + meta["eval_exit_code"] = 7 + meta_path.write_text(json.dumps(meta)) + + errors = validate_eval_artifacts(tmp_path, expected_eval_keys(config)) + + assert any("eval_exit_code 7" in error for error in errors) + + +def test_eval_validation_rejects_incomplete_batch_metadata( + tmp_path: Path, +) -> None: + concs = [4, 16] + config = { + "evals": [], + "multinode_evals": [multinode_eval_entry(concs)], + } + write_eval_aggregate( + tmp_path, + [multinode_eval_result(conc) for conc in concs], + ) + write_raw_batched_eval_artifact(tmp_path, concs) + meta_path = tmp_path / "eval_gptoss_8k1k_batch" / "meta_env.json" + meta = json.loads(meta_path.read_text()) + meta["completed_eval_concs"] = [4] + meta["failed_eval_concs"] = [16] + meta_path.write_text(json.dumps(meta)) + + errors = validate_eval_artifacts(tmp_path, expected_eval_keys(config)) + + assert any("incomplete batched eval results" in error for error in errors) + + +def test_eval_validation_rejects_missing_raw_batch_result( + tmp_path: Path, +) -> None: + concs = [4, 16] + config = { + "evals": [], + "multinode_evals": [multinode_eval_entry(concs)], + } + write_eval_aggregate( + tmp_path, + [multinode_eval_result(conc) for conc in concs], + ) + write_raw_batched_eval_artifact(tmp_path, concs) + ( + tmp_path + / "eval_gptoss_8k1k_batch" + / "results_test_conc16.json" + ).unlink() + + errors = validate_eval_artifacts(tmp_path, expected_eval_keys(config)) + + assert any( + "missing result files for concurrency: 16" in error + for error in errors + ) + + +def test_eval_validation_rejects_missing_aggregate_score(tmp_path: Path) -> None: + config = {"evals": [single_eval_entry(32)], "multinode_evals": []} + result = single_eval_result(32) + result["score"] = None + write_eval_aggregate(tmp_path, [result]) + write_raw_eval_artifact(tmp_path, 32) + + errors = validate_eval_artifacts(tmp_path, expected_eval_keys(config)) + + assert any("contains invalid score None" in error for error in errors) + + +def test_eval_validation_rejects_below_threshold_aggregate_score( + tmp_path: Path, +) -> None: + config = {"evals": [single_eval_entry(32)], "multinode_evals": []} + result = single_eval_result(32) + result["score"] = 0.80 + write_eval_aggregate(tmp_path, [result]) + write_raw_eval_artifact(tmp_path, 32) + + errors = validate_eval_artifacts(tmp_path, expected_eval_keys(config)) + + assert any("is below 0.91 from models.gptoss" in error for error in errors) + + def test_eval_aggregate_validation_is_exact(tmp_path: Path) -> None: config = { "evals": [single_eval_entry(32)], diff --git a/utils/validate_reusable_sweep_artifacts.py b/utils/validate_reusable_sweep_artifacts.py index bbd64d174..ff97d923f 100644 --- a/utils/validate_reusable_sweep_artifacts.py +++ b/utils/validate_reusable_sweep_artifacts.py @@ -6,13 +6,23 @@ import argparse import csv import json +import math import sys from collections import Counter from pathlib import Path from typing import Any, Iterable +from evals.validate_scores import ( + load_config, + resolve_threshold, + validate_batch_manifest, +) + FIXED_SEQ_KEYS = ("1k1k", "8k1k") +EVAL_THRESHOLD_CONFIG = load_config( + str(Path(__file__).resolve().parent / "evals" / "thresholds.json") +) def as_bool(value: Any) -> bool: @@ -30,6 +40,23 @@ def as_int(value: Any, default: int = 0) -> int: return default +def valid_concurrency_list( + value: object, + *, + allow_empty: bool = True, +) -> bool: + """Return whether metadata contains unique positive integer concurrencies.""" + return ( + isinstance(value, list) + and (allow_empty or bool(value)) + and all( + isinstance(conc, int) and not isinstance(conc, bool) and conc > 0 + for conc in value + ) + and len(set(value)) == len(value) + ) + + def load_json(path: Path) -> Any: """Load a JSON file.""" with open(path) as handle: @@ -512,6 +539,40 @@ def eval_key(row: dict[str, Any]) -> tuple[Any, ...]: ) +def validate_eval_score(path: Path, row: dict[str, Any]) -> list[str]: + """Validate one aggregate eval row's primary score.""" + task = row.get("task") + score = row.get("score") + if not isinstance(task, str) or not task: + return [f"eval aggregate {path.name!r} contains a row without a task"] + if ( + not isinstance(score, (int, float)) + or isinstance(score, bool) + or not math.isfinite(score) + or not 0 <= score <= 1 + ): + return [ + f"eval aggregate {path.name!r} contains invalid score " + f"{score!r} for task {task!r}" + ] + + prefix = row.get("model_prefix", row.get("infmax_model_prefix")) + if not isinstance(prefix, str) or not prefix: + prefix = None + threshold, source = resolve_threshold( + EVAL_THRESHOLD_CONFIG, + prefix, + task, + 0.85, + ) + if score < threshold: + return [ + f"eval aggregate {path.name!r} score {score:.4f} for task " + f"{task!r} is below {threshold} from {source}" + ] + return [] + + def raw_eval_artifact_dirs(artifacts_dir: Path) -> list[Path]: """Return raw eval result artifacts, excluding aggregate and debug artifacts.""" return sorted( @@ -552,19 +613,87 @@ def raw_eval_key_rows( "meta_env.json" ) continue - eval_concs = meta.get("completed_eval_concs") - if isinstance(meta.get("eval_concs"), list): - if not isinstance(eval_concs, list): + + eval_exit_code = meta.get("eval_exit_code") + if ( + not isinstance(eval_exit_code, int) + or isinstance(eval_exit_code, bool) + or eval_exit_code != 0 + ): + errors.append( + f"raw eval artifact {artifact_dir.name!r} records failed " + f"eval_exit_code {eval_exit_code!r}" + ) + continue + + metadata_concs = meta.get("eval_concs") + if "eval_concs" in meta: + completed_concs = meta.get("completed_eval_concs") + failed_concs = meta.get("failed_eval_concs") + if not ( + valid_concurrency_list(metadata_concs, allow_empty=False) + and valid_concurrency_list(completed_concs) + and valid_concurrency_list(failed_concs) + ): errors.append( f"raw eval artifact {artifact_dir.name!r} has invalid " "batched concurrency metadata" ) continue + + metadata_set = set(metadata_concs) + completed_set = set(completed_concs) + failed_set = set(failed_concs) + if failed_set or completed_set != metadata_set: + errors.append( + f"raw eval artifact {artifact_dir.name!r} has incomplete " + "batched eval results" + ) + continue + manifest_errors = validate_batch_manifest( + str(meta_path), + [ + str(result_path) + for result_path in artifact_dir.glob("results*.json") + ], + expected_concs=metadata_concs, + ) + if manifest_errors: + errors.extend( + f"raw eval artifact {artifact_dir.name!r}: {error}" + for error in manifest_errors + ) + continue rows.extend( eval_key({**meta, "conc": eval_conc}) - for eval_conc in eval_concs + for eval_conc in completed_concs ) else: + conc = meta.get("conc") + if ( + not isinstance(conc, int) + or isinstance(conc, bool) + or conc <= 0 + ): + errors.append( + f"raw eval artifact {artifact_dir.name!r} has invalid " + f"concurrency {conc!r}" + ) + continue + manifest_errors = validate_batch_manifest( + str(meta_path), + [ + str(result_path) + for result_path in artifact_dir.glob("results*.json") + ], + expected_concs=[conc], + ) + if manifest_errors: + errors.extend( + f"raw eval artifact {artifact_dir.name!r}: {error}" + for error in manifest_errors + ) + continue rows.append(eval_key(meta)) return rows, errors @@ -597,6 +726,9 @@ def validate_eval_artifacts( for row in data if isinstance(row, dict) ) + for row in data: + if isinstance(row, dict): + errors.extend(validate_eval_score(path, row)) if row_count == 0: errors.append("eval_results_all contains no rows") errors.extend(