Skip to content

fix: HIGH severity bugs in checkpointing I/O timing, MPI collection, and submission validation#397

Open
FileSystemGuy wants to merge 7 commits into
mainfrom
FileSystemGuy-SC-high
Open

fix: HIGH severity bugs in checkpointing I/O timing, MPI collection, and submission validation#397
FileSystemGuy wants to merge 7 commits into
mainfrom
FileSystemGuy-SC-high

Conversation

@FileSystemGuy
Copy link
Copy Markdown
Contributor

Summary

This PR fixes 9 HIGH severity bugs identified in a broad codebase scan (BROAD_SCAN.md). The bugs fall into three subsystems: checkpointing I/O timing (SC-1 through SC-5), MPI cluster collection (MPI-1), and submission checker validation (RULES-1 through RULES-5). All changes are targeted fixes with no refactoring.


Checkpointing / Streaming Checkpoint (SC-1 through SC-5)

SC-1 — Async S3 I/O time invisible to reported metric

Files: streaming_checkpoint.py, minio_writer.py, s3torch_writer.py

For MinIO and s3torchconnector backends, write_chunk() returns before ThreadPoolExecutor UploadPart tasks complete. The per-chunk timer accumulated only buffer-handoff microseconds, not actual network transmission time. Result: io_throughput was overstated for async backends.

Fix: Capture io_wall_start/io_wall_end around the full write+close pipeline. Report io_wall_time as the throughput denominator; retain the old accumulated value as io_accumulated_time for diagnostics.

SC-2 — memcpy inside timed I/O window inflates CPU time in metric

Files: streaming_checkpoint.py, minio_writer.py, s3torch_writer.py

bytes(buffer[:size]) — a full 32 MB memcpy — executed inside the io_start/io_stop window in _writer_process(), adding ~10–50 ms of CPU-bound time per chunk to reported I/O time and inflating throughput numbers.

Fix: Move chunk_bytes = bytes(shm.buf[:nbytes]) to before io_start in the caller. Remove the internal copy from minio_writer.write_chunk() and s3torch_writer.write_chunk(); update their parameter types from memoryview to bytes.

SC-3 — total_time includes process setup overhead

File: streaming_checkpoint.py

start_time was captured before buffer pool creation (up to 2 GB shared memory), generator init, IPC queue/event creation, and process fork — all of which can take seconds. pipeline_overhead_pct consequently attributed process initialization cost as pipeline stall time.

Fix: Split into setup_start (original position) and pipeline_start (after writer_proc.start()). total_time now measures the data pipeline only; setup_time is included in results for diagnostic visibility.

SC-4 — gen_throughput and io_throughput use different byte denominators

File: streaming_checkpoint.py

gen_throughput used total_size_bytes (requested) while io_throughput used stats['total_bytes'] (actual written). throughput_ratio = gen/io compared different quantities, producing a meaningless ratio when the values diverged.

Fix: Use stats['total_bytes'] for both numerators. Emit a warning when requested and actual counts differ.

SC-5 — Parallel reader backend_stats discards all-but-last reader

File: streaming_checkpoint.py

In load()'s parallel reader finally block, backend_stats = r.close() assigned inside a loop over all readers, overwriting on every iteration. All readers except the last had their backend stats (S3 request counts, errors, retries) silently discarded.

Fix: Accumulate into all_backend_stats = [] before the loop; return the full list.


MPI / Cluster Collection (MPI-1)

MPI-1 — Non-zero mpirun exit returns partial data as if collection succeeded

File: cluster_collector.py

When mpirun exits non-zero (one or more ranks failed), the code logged a warning and returned the partial collected_data dict. Downstream validation (ClusterInformation.validate_cluster_consistency()) received no signal that data was missing and silently validated an incomplete cluster view.

Fix: Replace the warning-and-return with raise RuntimeError(...) that includes the exit code, partial host count, and stderr. The caller in base.py already handles collection failures gracefully.


Rules / Submission Checker (RULES-1 through RULES-5)

RULES-1 — 500-steps dataset minimum formula is circular; check never fires

File: submission_checker/checks/training_checks.py

num_steps_per_epoch was derived from the actual file count using max(MIN_STEPS_PER_EPOCH, actual_steps), then multiplied back. Because actual_steps >= itself, min_samples_steps was always ≥ actual samples — the constraint never produced a "too few files" error.

Fix: Replace with the direct formula min_samples_steps = MIN_STEPS_PER_EPOCH * batch_size * num_accelerators, matching rules/utils.py.

RULES-2 — Multi-host memory total wrong for mixed-memory clusters

File: submission_checker/checks/training_checks.py

host_memory_GB is a per-host list. The checker took only [0] and multiplied by num_hosts, giving an incorrect total whenever hosts have different memory capacities.

Fix: Replace with total_host_memory = sum(summary.get("host_memory_GB", [0])), matching rules/utils.py.

RULES-3 — NameError in subset-mode process count check; check silently passes

File: submission_checker/checks/checkpointing_checks.py

model_key was referenced in the if checkpoint_mode == "subset": branch but was only assigned in the else: branch. The NameError was silently swallowed, causing CLOSED subset-mode submissions with the wrong process count to pass validation unconditionally.

Fix: Replace model_key with model_name (assigned unconditionally from metadata) in the error log.

RULES-4 — AU check reads non-existent DLIO fields; every submission fails

File: submission_checker/checks/training_checks.py

train_au_mean_percentage and train_au_meet_expectation do not exist in DLIO output. Both .get() calls always returned defaults (0 and ""), making au_expectation != "success" permanently True. Every training submission was flagged as an AU failure regardless of actual utilization.

Fix: Read the real field train_au_percentage (a list), compute its mean, and compare against the 90% MLPerf Storage minimum (Rules.md §3.3.2).

RULES-5 — KeyError on parquet datasets in legacy rules path

File: rules_legacy.py

Direct dataset_params['record_length_bytes'] raised KeyError for parquet datasets where this key is absent. Parquet workloads routed through the legacy verifier crashed rather than validating.

Fix: Use .get() with a warning when absent, set file_size_bytes = 0, and guard min_num_files_by_bytes against division by zero. Falls back to the samples-based minimum, matching the active rules path.


Test plan

  • Run existing unit tests: pytest tests/unit -v
  • Verify checkpointing benchmark runs without error on file backend (SC-3 setup_time in results)
  • Verify training submission checker correctly rejects datasets below the 500-step minimum (RULES-1)
  • Verify training submission checker correctly computes AU pass/fail from train_au_percentage (RULES-4)
  • Verify parquet training submission does not crash legacy rules path (RULES-5)
  • Verify MPI collection failure raises RuntimeError rather than returning partial data (MPI-1)

…point.py

SC-1 — Async S3 I/O time invisible to metric
For MinIO and s3torchconnector backends, write_chunk() returns before
the ThreadPoolExecutor UploadPart tasks complete. Total I/O time was
accumulated as a sum of buffer-fill microseconds, missing the actual
network transmission that happens in parallel. Fixed by capturing
io_wall_start before the write loop and io_wall_end after close(),
then storing io_wall_time in the stats dict. _format_results() now
uses io_wall_time as the io_throughput denominator (falling back to
io_accumulated_time for synchronous backends where they are equal).
The old accumulated metric is retained as io_accumulated_time for
diagnostic visibility.

SC-2 — memcpy inside timed I/O window inflates CPU time in I/O metric
bytes(shm.buf[:nbytes]) — a full memcpy of up to 32 MB — was executed
inside the per-chunk io_start/io_stop window in _writer_process(). For
a 32 MB chunk this adds ~10–50 ms of pure CPU time to reported I/O
time. Fixed by moving chunk_bytes = bytes(shm.buf[:nbytes]) to before
io_start, then passing the already-converted bytes object to
write_chunk(). The write_chunk() callers in minio_writer.py and
s3torch_writer.py are updated separately (see their commits).

SC-3 — total_time includes process setup; pipeline_overhead_pct is misleading
start_time was captured at the top of save() before buffer pool
creation (up to 2 GB of shared memory), generator init, IPC queue/
event creation, and process fork. These can take seconds for large
checkpoints. Fixed by splitting into setup_start (original position)
and pipeline_start (immediately after writer_proc.start()). The
total_time returned by _format_results() now measures the data
pipeline only. setup_time is passed through and included in the
results dict for diagnostic visibility.

SC-4 — gen_throughput and io_throughput use different byte counts
gen_throughput used total_size_bytes (requested size) while
io_throughput used stats['total_bytes'] (actual bytes written). When
these differ (truncated last chunk, backend short-writes),
throughput_ratio = gen/io is a dimensionally inconsistent comparison.
Fixed by computing actual_bytes_gb = stats['total_bytes'] / 1024**3
once and using it for both numerators. A warning is emitted when
requested and actual byte counts differ.

SC-5 — parallel reader backend_stats discards all-but-last reader's stats
In the load() parallel reader finally block, backend_stats was assigned
inside a for loop over readers, overwriting on every iteration. All
readers except the last had their backend stats (S3 request counts,
errors, retries) silently discarded. Fixed by accumulating into
all_backend_stats = [] before the loop and returning the full list.
…ter.py

SC-1 — Async upload latency not captured in io_time
The MinIO multipart upload writer uses a ThreadPoolExecutor to send
UploadPart requests in parallel. write_chunk() returns as soon as the
part buffer is flushed to the executor queue, before the actual S3
network round-trips complete. This caused the accumulated io_time in
streaming_checkpoint.py to reflect only buffer-management overhead
(microseconds per chunk) rather than true network I/O time. The fix in
streaming_checkpoint.py (SC-1) switches to a wall-clock window that
captures the full upload pipeline duration; this file required no
additional changes for SC-1 but is listed as affected because its
async behaviour is the root cause.

SC-2 — bytes() memcpy inside write_chunk() added CPU time to I/O metric
data = bytes(buffer[:size]) performed a full copy of up to 32 MB of
data inside write_chunk(), which was called from inside the timed I/O
window in streaming_checkpoint._writer_process(). This attributed
~10–50 ms of CPU memcpy time per chunk to reported I/O time, inflating
throughput numbers. Fixed by:
- Changing the buffer parameter type from memoryview to bytes
- Removing the data = bytes(buffer[:size]) internal copy
- Replacing all references to data in the method body with buffer
  (a bytes object supports the same slicing used downstream)
The conversion now happens in streaming_checkpoint.py before io_start,
outside the timed window.
…riter.py

SC-1 — Async upload latency not captured in io_time
The s3torchconnector writer streams data to S3 via write(), which
queues bytes internally and returns before the network transmission
completes. write_chunk() therefore returns immediately after handing
off the buffer, so the per-chunk timer in streaming_checkpoint.py
captured only the handoff cost (microseconds), not the actual upload
latency. The wall-clock io_wall_time fix in streaming_checkpoint.py
(SC-1) addresses the measurement; this file is listed as affected
because its async behaviour is the root cause.

SC-2 — bytes() memcpy inside write_chunk() added CPU time to I/O metric
data = bytes(buffer[:size]) performed a full in-process copy of up to
32 MB inside write_chunk(), inside the timed I/O window. For a 32 MB
chunk this is ~10–50 ms of pure CPU memcpy attributed as I/O time,
inflating throughput figures. Fixed by:
- Changing the buffer parameter type annotation from memoryview to bytes
- Removing the data = bytes(buffer[:size]) internal copy line
- Passing buffer directly to self.writer.write()
The conversion now happens in streaming_checkpoint.py before io_start,
outside the timed window.
…l data

When mpirun exits with a non-zero return code, one or more ranks failed
and the collected_data dict may reflect only the ranks that completed
before the failure. The previous code logged a warning and then returned
that partial data as if collection had succeeded. Downstream validation
(ClusterInformation.validate_cluster_consistency()) received no signal
that data was missing and silently validated an incomplete cluster view.

Fixed by replacing the warning-and-continue block with a RuntimeError
that includes the exit code, the count of hosts that did report, and
mpirun's stderr. The caller in base.py already wraps collect_cluster_info()
in a try/except that handles collection failures gracefully (logs a
warning and continues without cluster data), so no additional call-site
changes are needed.
…ode check

In the CLOSED subset-mode MPI process count check, model_key was
referenced in the if branch error log but was only assigned inside the
else branch (after a successful regex match on model_size). At runtime,
entering the if branch with checkpoint_mode == "subset" always raised
NameError before the error message could be emitted. The exception was
silently swallowed by the outer try/except, causing the check to pass
unconditionally for CLOSED subset-mode submissions with the wrong
process count.

Fixed by replacing model_key with model_name in the error log at the
affected line. model_name is assigned unconditionally earlier in the
function from metadata.get("args", {}).get("model", "").lower() and
is the correct human-readable identifier to include in the error
message. The model_key assignment in the else branch is unchanged and
still used correctly in that branch.
…n training_checks.py

RULES-1 — 500-steps dataset minimum formula is circular; check never fires
The formula derived num_steps_per_epoch from the actual file count
using max(MIN_STEPS_PER_EPOCH, actual_steps), then multiplied back to
get min_samples_steps. Because actual_steps >= itself, min_samples_steps
was always >= the actual sample count, so the 500-steps constraint never
produced a "too few files" error regardless of actual dataset size.
Fixed by replacing the two-step calculation with the direct formula:
  min_samples_steps = MIN_STEPS_PER_EPOCH * batch_size * num_accelerators
This matches the canonical computation in rules/utils.py.

RULES-2 — Multi-host memory total wrong for mixed-memory clusters
host_memory_GB is a list with one entry per host, but the checker took
only the first element ([0]) and multiplied by num_hosts. For clusters
where hosts have different memory sizes, this produced an incorrect
total that could be larger or smaller than the true sum. Fixed by
replacing the [0]-index-and-multiply pattern with:
  total_host_memory = sum(summary.get("host_memory_GB", [0]))
This matches the canonical pattern in rules/utils.py which sums the
per-host memory list.

RULES-4 — AU check reads non-existent DLIO fields; every submission fails
The accelerator utilization check looked up train_au_mean_percentage
and train_au_meet_expectation from the DLIO summary JSON. Neither field
exists in actual DLIO output (the real field is train_au_percentage, a
list of per-epoch AU values). Both .get() calls always returned their
defaults (0 and "" respectively), causing au_expectation != "success"
to be permanently True. Every training submission was flagged as an AU
failure regardless of actual utilization, making real failures and
valid runs indistinguishable. Fixed by reading train_au_percentage,
computing its mean, and comparing against the 90% MLPerf Storage
minimum (Rules.md §3.3.2).
…acy.py

Direct dict access dataset_params['record_length_bytes'] raised KeyError
for parquet-format datasets where this key is absent. The active code
path in rules/utils.py handles parquet correctly, but the legacy verifier
still routes training runs through TrainingRunRulesChecker, which called
this function. Parquet workloads using the legacy path crashed with
KeyError rather than validating.

Fixed by replacing the direct access with .get():
  record_length_bytes = dataset_params.get('record_length_bytes')
When record_length_bytes is absent or zero, a warning is logged and
file_size_bytes is set to 0. The subsequent min_num_files_by_bytes
calculation is guarded against division by zero:
  min_num_files_by_bytes = (dataset_size_bytes // file_size_bytes) if file_size_bytes else 0
With min_num_files_by_bytes = 0, required_file_count falls back to the
samples-based minimum, which matches the parquet behaviour in the active
rules path.
@FileSystemGuy FileSystemGuy requested a review from a team May 27, 2026 02:50
@github-actions
Copy link
Copy Markdown

MLCommons CLA bot All contributors have signed the MLCommons CLA ✍️ ✅

@xanturing
Copy link
Copy Markdown

@FileSystemGuy

I ran the checkpointing benchmark on my local repository after merging this PR and noticed a significant write performance regression.

Benchmark command:

./mlpstorage checkpointing run \
  --model llama3-8b \
  --hosts 192.168.255.82 \
  --num-processes 8 \
  --num-checkpoints-read 0 \
  --num-checkpoints-write 1 \
  --checkpoint-folder /mnt/pfs/checkpoint \
  --results-dir /home/mlperf/results \
  --client-host-memory-in-gb 503 \
  --exec-type mpi \
  --closed \
  --allow-run-as-root

After merging PR #397:

Run Duration Throughput
run1 10.3330s 10.1330 GiB/s
run2 10.3063s 10.1593 GiB/s
run3 10.5496s 9.9250 GiB/s

Before merging PR #397:

Run Duration Throughput
run1 1.9048s 54.9698 GiB/s
run2 2.0252s 51.7012 GiB/s
run3 1.9346s 54.1231 GiB/s

Summary: Checkpoint write throughput dropped from ~54 GiB/s to ~10 GiB/s (~5x regression), and duration increased from ~2s to ~10.4s. Could you take a look at what might be causing this? Happy to provide additional debugging info if needed.

@FileSystemGuy
Copy link
Copy Markdown
Contributor Author

Excellent!! Thank you! This PR (like the others I recently submitted) was a result of asking Claude for a broad review of the code looking for bugs. It very much needs code review and performance testing before we accept it, so thank you for helping!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants