fix: HIGH severity bugs in checkpointing I/O timing, MPI collection, and submission validation#397
fix: HIGH severity bugs in checkpointing I/O timing, MPI collection, and submission validation#397FileSystemGuy wants to merge 7 commits into
Conversation
…point.py SC-1 — Async S3 I/O time invisible to metric For MinIO and s3torchconnector backends, write_chunk() returns before the ThreadPoolExecutor UploadPart tasks complete. Total I/O time was accumulated as a sum of buffer-fill microseconds, missing the actual network transmission that happens in parallel. Fixed by capturing io_wall_start before the write loop and io_wall_end after close(), then storing io_wall_time in the stats dict. _format_results() now uses io_wall_time as the io_throughput denominator (falling back to io_accumulated_time for synchronous backends where they are equal). The old accumulated metric is retained as io_accumulated_time for diagnostic visibility. SC-2 — memcpy inside timed I/O window inflates CPU time in I/O metric bytes(shm.buf[:nbytes]) — a full memcpy of up to 32 MB — was executed inside the per-chunk io_start/io_stop window in _writer_process(). For a 32 MB chunk this adds ~10–50 ms of pure CPU time to reported I/O time. Fixed by moving chunk_bytes = bytes(shm.buf[:nbytes]) to before io_start, then passing the already-converted bytes object to write_chunk(). The write_chunk() callers in minio_writer.py and s3torch_writer.py are updated separately (see their commits). SC-3 — total_time includes process setup; pipeline_overhead_pct is misleading start_time was captured at the top of save() before buffer pool creation (up to 2 GB of shared memory), generator init, IPC queue/ event creation, and process fork. These can take seconds for large checkpoints. Fixed by splitting into setup_start (original position) and pipeline_start (immediately after writer_proc.start()). The total_time returned by _format_results() now measures the data pipeline only. setup_time is passed through and included in the results dict for diagnostic visibility. SC-4 — gen_throughput and io_throughput use different byte counts gen_throughput used total_size_bytes (requested size) while io_throughput used stats['total_bytes'] (actual bytes written). When these differ (truncated last chunk, backend short-writes), throughput_ratio = gen/io is a dimensionally inconsistent comparison. Fixed by computing actual_bytes_gb = stats['total_bytes'] / 1024**3 once and using it for both numerators. A warning is emitted when requested and actual byte counts differ. SC-5 — parallel reader backend_stats discards all-but-last reader's stats In the load() parallel reader finally block, backend_stats was assigned inside a for loop over readers, overwriting on every iteration. All readers except the last had their backend stats (S3 request counts, errors, retries) silently discarded. Fixed by accumulating into all_backend_stats = [] before the loop and returning the full list.
…ter.py SC-1 — Async upload latency not captured in io_time The MinIO multipart upload writer uses a ThreadPoolExecutor to send UploadPart requests in parallel. write_chunk() returns as soon as the part buffer is flushed to the executor queue, before the actual S3 network round-trips complete. This caused the accumulated io_time in streaming_checkpoint.py to reflect only buffer-management overhead (microseconds per chunk) rather than true network I/O time. The fix in streaming_checkpoint.py (SC-1) switches to a wall-clock window that captures the full upload pipeline duration; this file required no additional changes for SC-1 but is listed as affected because its async behaviour is the root cause. SC-2 — bytes() memcpy inside write_chunk() added CPU time to I/O metric data = bytes(buffer[:size]) performed a full copy of up to 32 MB of data inside write_chunk(), which was called from inside the timed I/O window in streaming_checkpoint._writer_process(). This attributed ~10–50 ms of CPU memcpy time per chunk to reported I/O time, inflating throughput numbers. Fixed by: - Changing the buffer parameter type from memoryview to bytes - Removing the data = bytes(buffer[:size]) internal copy - Replacing all references to data in the method body with buffer (a bytes object supports the same slicing used downstream) The conversion now happens in streaming_checkpoint.py before io_start, outside the timed window.
…riter.py SC-1 — Async upload latency not captured in io_time The s3torchconnector writer streams data to S3 via write(), which queues bytes internally and returns before the network transmission completes. write_chunk() therefore returns immediately after handing off the buffer, so the per-chunk timer in streaming_checkpoint.py captured only the handoff cost (microseconds), not the actual upload latency. The wall-clock io_wall_time fix in streaming_checkpoint.py (SC-1) addresses the measurement; this file is listed as affected because its async behaviour is the root cause. SC-2 — bytes() memcpy inside write_chunk() added CPU time to I/O metric data = bytes(buffer[:size]) performed a full in-process copy of up to 32 MB inside write_chunk(), inside the timed I/O window. For a 32 MB chunk this is ~10–50 ms of pure CPU memcpy attributed as I/O time, inflating throughput figures. Fixed by: - Changing the buffer parameter type annotation from memoryview to bytes - Removing the data = bytes(buffer[:size]) internal copy line - Passing buffer directly to self.writer.write() The conversion now happens in streaming_checkpoint.py before io_start, outside the timed window.
…l data When mpirun exits with a non-zero return code, one or more ranks failed and the collected_data dict may reflect only the ranks that completed before the failure. The previous code logged a warning and then returned that partial data as if collection had succeeded. Downstream validation (ClusterInformation.validate_cluster_consistency()) received no signal that data was missing and silently validated an incomplete cluster view. Fixed by replacing the warning-and-continue block with a RuntimeError that includes the exit code, the count of hosts that did report, and mpirun's stderr. The caller in base.py already wraps collect_cluster_info() in a try/except that handles collection failures gracefully (logs a warning and continues without cluster data), so no additional call-site changes are needed.
…ode check
In the CLOSED subset-mode MPI process count check, model_key was
referenced in the if branch error log but was only assigned inside the
else branch (after a successful regex match on model_size). At runtime,
entering the if branch with checkpoint_mode == "subset" always raised
NameError before the error message could be emitted. The exception was
silently swallowed by the outer try/except, causing the check to pass
unconditionally for CLOSED subset-mode submissions with the wrong
process count.
Fixed by replacing model_key with model_name in the error log at the
affected line. model_name is assigned unconditionally earlier in the
function from metadata.get("args", {}).get("model", "").lower() and
is the correct human-readable identifier to include in the error
message. The model_key assignment in the else branch is unchanged and
still used correctly in that branch.
…n training_checks.py
RULES-1 — 500-steps dataset minimum formula is circular; check never fires
The formula derived num_steps_per_epoch from the actual file count
using max(MIN_STEPS_PER_EPOCH, actual_steps), then multiplied back to
get min_samples_steps. Because actual_steps >= itself, min_samples_steps
was always >= the actual sample count, so the 500-steps constraint never
produced a "too few files" error regardless of actual dataset size.
Fixed by replacing the two-step calculation with the direct formula:
min_samples_steps = MIN_STEPS_PER_EPOCH * batch_size * num_accelerators
This matches the canonical computation in rules/utils.py.
RULES-2 — Multi-host memory total wrong for mixed-memory clusters
host_memory_GB is a list with one entry per host, but the checker took
only the first element ([0]) and multiplied by num_hosts. For clusters
where hosts have different memory sizes, this produced an incorrect
total that could be larger or smaller than the true sum. Fixed by
replacing the [0]-index-and-multiply pattern with:
total_host_memory = sum(summary.get("host_memory_GB", [0]))
This matches the canonical pattern in rules/utils.py which sums the
per-host memory list.
RULES-4 — AU check reads non-existent DLIO fields; every submission fails
The accelerator utilization check looked up train_au_mean_percentage
and train_au_meet_expectation from the DLIO summary JSON. Neither field
exists in actual DLIO output (the real field is train_au_percentage, a
list of per-epoch AU values). Both .get() calls always returned their
defaults (0 and "" respectively), causing au_expectation != "success"
to be permanently True. Every training submission was flagged as an AU
failure regardless of actual utilization, making real failures and
valid runs indistinguishable. Fixed by reading train_au_percentage,
computing its mean, and comparing against the 90% MLPerf Storage
minimum (Rules.md §3.3.2).
…acy.py
Direct dict access dataset_params['record_length_bytes'] raised KeyError
for parquet-format datasets where this key is absent. The active code
path in rules/utils.py handles parquet correctly, but the legacy verifier
still routes training runs through TrainingRunRulesChecker, which called
this function. Parquet workloads using the legacy path crashed with
KeyError rather than validating.
Fixed by replacing the direct access with .get():
record_length_bytes = dataset_params.get('record_length_bytes')
When record_length_bytes is absent or zero, a warning is logged and
file_size_bytes is set to 0. The subsequent min_num_files_by_bytes
calculation is guarded against division by zero:
min_num_files_by_bytes = (dataset_size_bytes // file_size_bytes) if file_size_bytes else 0
With min_num_files_by_bytes = 0, required_file_count falls back to the
samples-based minimum, which matches the parquet behaviour in the active
rules path.
|
MLCommons CLA bot All contributors have signed the MLCommons CLA ✍️ ✅ |
|
I ran the checkpointing benchmark on my local repository after merging this PR and noticed a significant write performance regression. Benchmark command: ./mlpstorage checkpointing run \
--model llama3-8b \
--hosts 192.168.255.82 \
--num-processes 8 \
--num-checkpoints-read 0 \
--num-checkpoints-write 1 \
--checkpoint-folder /mnt/pfs/checkpoint \
--results-dir /home/mlperf/results \
--client-host-memory-in-gb 503 \
--exec-type mpi \
--closed \
--allow-run-as-rootAfter merging PR #397:
Before merging PR #397:
Summary: Checkpoint write throughput dropped from ~54 GiB/s to ~10 GiB/s (~5x regression), and duration increased from ~2s to ~10.4s. Could you take a look at what might be causing this? Happy to provide additional debugging info if needed. |
|
Excellent!! Thank you! This PR (like the others I recently submitted) was a result of asking Claude for a broad review of the code looking for bugs. It very much needs code review and performance testing before we accept it, so thank you for helping! |
Summary
This PR fixes 9 HIGH severity bugs identified in a broad codebase scan (BROAD_SCAN.md). The bugs fall into three subsystems: checkpointing I/O timing (SC-1 through SC-5), MPI cluster collection (MPI-1), and submission checker validation (RULES-1 through RULES-5). All changes are targeted fixes with no refactoring.
Checkpointing / Streaming Checkpoint (SC-1 through SC-5)
SC-1 — Async S3 I/O time invisible to reported metric
Files:
streaming_checkpoint.py,minio_writer.py,s3torch_writer.pyFor MinIO and s3torchconnector backends,
write_chunk()returns beforeThreadPoolExecutorUploadPart tasks complete. The per-chunk timer accumulated only buffer-handoff microseconds, not actual network transmission time. Result:io_throughputwas overstated for async backends.Fix: Capture
io_wall_start/io_wall_endaround the full write+close pipeline. Reportio_wall_timeas the throughput denominator; retain the old accumulated value asio_accumulated_timefor diagnostics.SC-2 — memcpy inside timed I/O window inflates CPU time in metric
Files:
streaming_checkpoint.py,minio_writer.py,s3torch_writer.pybytes(buffer[:size])— a full 32 MB memcpy — executed inside theio_start/io_stopwindow in_writer_process(), adding ~10–50 ms of CPU-bound time per chunk to reported I/O time and inflating throughput numbers.Fix: Move
chunk_bytes = bytes(shm.buf[:nbytes])to beforeio_startin the caller. Remove the internal copy fromminio_writer.write_chunk()ands3torch_writer.write_chunk(); update their parameter types frommemoryviewtobytes.SC-3 — total_time includes process setup overhead
File:
streaming_checkpoint.pystart_timewas captured before buffer pool creation (up to 2 GB shared memory), generator init, IPC queue/event creation, and process fork — all of which can take seconds.pipeline_overhead_pctconsequently attributed process initialization cost as pipeline stall time.Fix: Split into
setup_start(original position) andpipeline_start(afterwriter_proc.start()).total_timenow measures the data pipeline only;setup_timeis included in results for diagnostic visibility.SC-4 — gen_throughput and io_throughput use different byte denominators
File:
streaming_checkpoint.pygen_throughputusedtotal_size_bytes(requested) whileio_throughputusedstats['total_bytes'](actual written).throughput_ratio = gen/iocompared different quantities, producing a meaningless ratio when the values diverged.Fix: Use
stats['total_bytes']for both numerators. Emit a warning when requested and actual counts differ.SC-5 — Parallel reader backend_stats discards all-but-last reader
File:
streaming_checkpoint.pyIn
load()'s parallel readerfinallyblock,backend_stats = r.close()assigned inside a loop over all readers, overwriting on every iteration. All readers except the last had their backend stats (S3 request counts, errors, retries) silently discarded.Fix: Accumulate into
all_backend_stats = []before the loop; return the full list.MPI / Cluster Collection (MPI-1)
MPI-1 — Non-zero mpirun exit returns partial data as if collection succeeded
File:
cluster_collector.pyWhen
mpirunexits non-zero (one or more ranks failed), the code logged a warning and returned the partialcollected_datadict. Downstream validation (ClusterInformation.validate_cluster_consistency()) received no signal that data was missing and silently validated an incomplete cluster view.Fix: Replace the warning-and-return with
raise RuntimeError(...)that includes the exit code, partial host count, and stderr. The caller inbase.pyalready handles collection failures gracefully.Rules / Submission Checker (RULES-1 through RULES-5)
RULES-1 — 500-steps dataset minimum formula is circular; check never fires
File:
submission_checker/checks/training_checks.pynum_steps_per_epochwas derived from the actual file count usingmax(MIN_STEPS_PER_EPOCH, actual_steps), then multiplied back. Becauseactual_steps >= itself,min_samples_stepswas always ≥ actual samples — the constraint never produced a "too few files" error.Fix: Replace with the direct formula
min_samples_steps = MIN_STEPS_PER_EPOCH * batch_size * num_accelerators, matchingrules/utils.py.RULES-2 — Multi-host memory total wrong for mixed-memory clusters
File:
submission_checker/checks/training_checks.pyhost_memory_GBis a per-host list. The checker took only[0]and multiplied bynum_hosts, giving an incorrect total whenever hosts have different memory capacities.Fix: Replace with
total_host_memory = sum(summary.get("host_memory_GB", [0])), matchingrules/utils.py.RULES-3 — NameError in subset-mode process count check; check silently passes
File:
submission_checker/checks/checkpointing_checks.pymodel_keywas referenced in theif checkpoint_mode == "subset":branch but was only assigned in theelse:branch. TheNameErrorwas silently swallowed, causing CLOSED subset-mode submissions with the wrong process count to pass validation unconditionally.Fix: Replace
model_keywithmodel_name(assigned unconditionally frommetadata) in the error log.RULES-4 — AU check reads non-existent DLIO fields; every submission fails
File:
submission_checker/checks/training_checks.pytrain_au_mean_percentageandtrain_au_meet_expectationdo not exist in DLIO output. Both.get()calls always returned defaults (0 and ""), makingau_expectation != "success"permanently True. Every training submission was flagged as an AU failure regardless of actual utilization.Fix: Read the real field
train_au_percentage(a list), compute its mean, and compare against the 90% MLPerf Storage minimum (Rules.md §3.3.2).RULES-5 — KeyError on parquet datasets in legacy rules path
File:
rules_legacy.pyDirect
dataset_params['record_length_bytes']raisedKeyErrorfor parquet datasets where this key is absent. Parquet workloads routed through the legacy verifier crashed rather than validating.Fix: Use
.get()with a warning when absent, setfile_size_bytes = 0, and guardmin_num_files_by_bytesagainst division by zero. Falls back to the samples-based minimum, matching the active rules path.Test plan
pytest tests/unit -vtrain_au_percentage(RULES-4)