Skip to content

[CELEBORN-2350] Support chunk level compression to optimize storage#3699

Open
saurabhd336 wants to merge 25 commits into
apache:mainfrom
saurabhd336:chunkCompressedWriter
Open

[CELEBORN-2350] Support chunk level compression to optimize storage#3699
saurabhd336 wants to merge 25 commits into
apache:mainfrom
saurabhd336:chunkCompressedWriter

Conversation

@saurabhd336

@saurabhd336 saurabhd336 commented May 23, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Adds chunk-level ZSTD compression on the worker write path and streaming decompression on the client read path. Records accumulate in a fixed-size chunk buffer (default 8 MB); when the buffer overflows it is compressed as a single ZSTD frame and written to disk file. On the read side, CelebornInputStream wraps each fetched chunk in a ZstdInputStream to uncompress.

This is orthogonal to the existing batch-level LZ4/ZSTD codec: both can be active simultaneously, or the batch codec can be NONE.

This change primarily helps in reducing the disk usage (~40% lower disk usage seen in tests) as well as read flow celeborn network egress.

Impl details

Writer side

  1. Added a new FileChannelWriter interface which supports write / close functionalities. BypassFileChannelWriter is the default and ensures the current behaviour (directly write flushBuffer to disk file channel).
  2. Added ChunkCompressedFileChannelWriter: Accumulates records in a direct ByteBuffer of chunkSize bytes. On overflow, ZSTD-compresses and writes as a single frame. Records larger than chunkSize stream directly to disk via ZstdOutputStream. Replaces compressed chunk-boundary offsets into ReduceFileMeta on close. Also updates the bytesFlushed to overwrite the FileInfo length post close. The buffers used to buffer chunkSize data before compression and flush is powered by MmapMemoryManager and ChunkBufferPool which uses mmap'ed temporary files to avoid the memory overhead of buffering chunk sized data.
  3. Choice b/w ChunkCompressedFileChannelWriter and FileChannelWriter is made basis the new config set by client during ReserveSlots (conf.isChunkCompressionEnabled).

Read side

  1. No changes in the worker (YET TO IMPLEMENT: Partition sorting during AQE flow)
  2. CelebornInputStream: When reading chunkCompressed chunks, wraps the read ByteBuf into a ZSTDIs to inplace decompress and read.

Configs added

Key Default Meaning
celeborn.chunk.compression.enabled false Client side config. Enables chunk-level ZSTD compression on the worker write path and transparent decompression in CelebornInputStream.

Does this PR resolve a correctness bug?

No

Does this PR introduce any user-facing change?

celeborn.chunk.compression.enabled config to enable / disable chunk level compression (disabled by default)

How was this patch tested?

UTs, ITs

@saurabhd336 saurabhd336 changed the title [FEATURE] [WIP] Support chunk level compression to optimize storage [CELEBORN-XXXX] [FEATURE] [WIP] Support chunk level compression to optimize storage May 25, 2026
@saurabhd336

Copy link
Copy Markdown
Contributor Author

Hi team @SteNicholas / @s0nskar / @zaynt4606 / others

I wanted to start an early discussion for these proposed changes (can share a design doc too).

At a high level, our Celeborn infra costs have largely been influenced by large locally attached SSD requirements. Additionally, also looking for ways to reduce celeborn network ingress / egress. This change helps is reducing both.

Wanted to start this discussion for the change thanks

@codecov

codecov Bot commented May 26, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 95.00000% with 3 lines in your changes missing coverage. Please review.
✅ Project coverage is 67.28%. Comparing base (b4cb5a0) to head (39595da).
⚠️ Report is 54 commits behind head on main.

Files with missing lines Patch % Lines
.../org/apache/celeborn/common/meta/DiskFileInfo.java 85.72% 1 Missing ⚠️
...rg/apache/celeborn/common/meta/ReduceFileMeta.java 88.89% 0 Missing and 1 partial ⚠️
...leborn/common/network/buffer/FileChunkBuffers.java 50.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3699      +/-   ##
==========================================
+ Coverage   66.91%   67.28%   +0.37%     
==========================================
  Files         358      360       +2     
  Lines       21986    22337     +351     
  Branches     1946     1982      +36     
==========================================
+ Hits        14710    15027     +317     
- Misses       6262     6285      +23     
- Partials     1014     1025      +11     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@SteNicholas SteNicholas force-pushed the main branch 2 times, most recently from 1d92a40 to cf8d472 Compare May 27, 2026 02:11
saurabhd336 and others added 9 commits June 1, 2026 12:48
…ge-record chunks

readChunks() was unconditionally decompressing every chunk, but large
records are written raw (uncompressed) by flushLargeRecord(). The fix
consults ReduceFileMeta.getChunkCompressed() per chunk and only calls
ZstdInputStream on chunks that were actually compressed. Also exposes
compressAndFlush() as public so the test can call it directly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@saurabhd336 saurabhd336 changed the title [CELEBORN-XXXX] [FEATURE] [WIP] Support chunk level compression to optimize storage [CELEBORN-XXXX] Support chunk level compression to optimize storage Jun 5, 2026
@saurabhd336

Copy link
Copy Markdown
Contributor Author

Hi team
@SteNicholas / @s0nskar / @zaynt4606 / others

Ping on this again!

So far, i've been able to get ~40% reduction in celeborn worker disk usage with a roughly 80% increase in CPU usage. A lot of times, the CPU usage of our celeborn fleets is idle, hence this tradeoff feels reasonable and worth testing at scale for us. Can you please take a look at this PR?

@saurabhd336 saurabhd336 changed the title [CELEBORN-XXXX] Support chunk level compression to optimize storage [CELEBORN-2350] Support chunk level compression to optimize storage Jun 5, 2026
@SteNicholas SteNicholas requested a review from Copilot June 8, 2026 06:19

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces chunk-level ZSTD compression for shuffle data written by workers (disk write path) and adds transparent per-chunk decompression on the client read path (via CelebornInputStream). It threads a new ChunkCompressionContext from client → worker (ReserveSlots) and extends stream metadata so clients can identify which chunks are compressed (to support “large record” chunks written uncompressed).

Changes:

  • Add worker-side FileChannelWriter abstraction with a new ChunkCompressedFileChannelWriter that buffers into fixed-size chunks and ZSTD-compresses each chunk before writing.
  • Extend wire/file metadata (PbStreamHandler, PbFileInfo, ReduceFileMeta) to carry chunk-compressed flags and chunk compression config.
  • Update client readers and CelebornInputStream to support streaming chunk decompression while preserving existing batch-level codecs.

Reviewed changes

Copilot reviewed 49 out of 49 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerSuite.scala Update test writer creation call to pass ChunkCompressionContext.
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala Update writer/file-info construction for new chunk compression context parameter.
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala Adjust mocked writer ctor signature for added parameter.
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala Adjust mocked writer ctor signature for added parameter.
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala Adjust mocked writer ctor signature for added parameter.
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala Adjust mocked writer ctor signature for added parameter.
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala Update writer creation for added chunk compression context parameter.
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandlerSuite.scala Update DiskFileInfo construction for added chunk compression context parameter.
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala New end-to-end mini-cluster tests for chunk-compressed read/write scenarios.
worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java Update writer context creation for new chunk compression context parameter.
worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java Update writer context creation for new chunk compression context parameter.
worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java Update writer context creation for new chunk compression context parameter.
worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/file/chunk/compressed/MmapMemoryManagerSuiteJ.java New unit tests for mmap-based buffer allocator.
worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/file/chunk/compressed/ChunkCompressedFileChannelWriterSuiteJ.java New unit tests validating chunk writer chunking/offsets/compression behavior.
worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/file/chunk/compressed/ChunkBufferPoolSuiteJ.java New unit tests for pooled buffer pair behavior and concurrency.
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala Switch local tier writer flush to use FileChannelWriter abstraction.
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala Pass chunk compression context into disk file creation.
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala Thread chunk compression context through writer creation and DiskFileInfo.
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala Use FileChannelWriter in local flush task rather than raw FileChannel.
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala Include per-chunk compression flags in stream handler metadata to clients.
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala Receive/pass chunk compression context from ReserveSlots to writer creation.
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java Explicitly reject sorting chunk-compressed shuffle files (not yet supported).
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterContext.java Add ChunkCompressionContext to writer context and accessors.
worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/FileWriterType.java New enum for file writer types.
worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/FileChannelWriterFactory.java Factory to choose bypass vs chunk-compressed writer based on file info.
worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/FileChannelWriter.java New abstraction for file write/close.
worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/chunk/compressed/MmapMemoryManager.java New mmap-backed allocator for chunk buffers.
worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/chunk/compressed/ChunkCompressedFileChannelWriter.java New chunk-buffering + ZSTD-compressing local file writer.
worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/chunk/compressed/ChunkBufferPool.java New pool for reusable (chunk, compressed) buffer pairs.
worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/BypassFileChannelWriter.java New writer preserving existing direct-to-channel behavior.
worker/pom.xml Add zstd-jni dependency for worker module.
project/CelebornBuild.scala Add zstd JNI dependency for worker build.
docs/configuration/client.md Document new chunk compression configs.
common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala Update PB serde tests for new file info ctor + chunk compression context.
common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala Serialize/deserialize chunk compression config into PbFileInfo.
common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala Send/receive chunk compression config in ReserveSlots messages.
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala Add chunk compression config entries + accessors.
common/src/main/proto/TransportMessages.proto Add PbChunkCompressionConfig and per-chunk compressed flags to stream handler.
common/src/main/java/org/apache/celeborn/common/network/buffer/FileChunkBuffers.java Enforce no sliced reads for chunk-compressed files.
common/src/main/java/org/apache/celeborn/common/meta/ReduceFileMeta.java Track per-chunk compression flags in reduce metadata.
common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java Add setBytesFlushed to override file length post-close.
common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java Add ChunkCompressionContext and expose chunk compression settings.
common/src/main/java/org/apache/celeborn/common/compression/ChunkCompressionContext.java New context object to carry chunk compression settings.
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala Send chunk compression settings to worker via ReserveSlots.
client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java Return (ByteBuf, isChunkCompressed) from next().
client/src/main/java/org/apache/celeborn/client/read/PartitionReader.java Change API to return (ByteBuf, isChunkCompressed) from next().
client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java Return per-chunk compression flag alongside the chunk buffer.
client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java Return per-chunk compression flag alongside the chunk buffer.
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java Stream per-chunk ZSTD decompression and handle large-record uncompressed chunks.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +258 to +263
int chunkIdx = returnedChunks;
returnedChunks++;
return chunk;
// If no per-chunk list was sent (old worker), treat as compressed to honour the global flag.
boolean compressed =
streamHandler.getChunkCompressedCount() == 0 || streamHandler.getChunkCompressed(chunkIdx);
return Pair.of(chunk, compressed);
Comment on lines 873 to 883
if (shouldDecompress) {
if (size > compressedBuf.length) {
compressedBuf = new byte[size];
}

currentChunk.readBytes(compressedBuf, 0, size);
readFully(currentStream, compressedBuf, 0, size);
} else {
if (size > rawDataBuf.length) {
rawDataBuf = new byte[size];
}

currentChunk.readBytes(rawDataBuf, 0, size);
readFully(currentStream, rawDataBuf, 0, size);
}
Comment on lines 39 to 43
@Override
public ManagedBuffer chunk(int chunkIndex, int offset, int len) {
// sliced reads unsupported for chunkCompressed files
assert (!isChunkCompressed || (offset == 0 && len == Integer.MAX_VALUE));
Tuple2<Long, Long> offsetLen = getChunkOffsetLength(chunkIndex, offset, len);
Comment on lines +5291 to +5300
val CHUNK_COMPRESSION_LEVEL: ConfigEntry[Int] =
buildConf("celeborn.chunk.compression.level")
.categories("client")
.doc(
"ZSTD compression level to use for chunk-level compression " +
"(celeborn.chunk.compression.enabled must be true). " +
"Valid range is 1–22; the default (3) matches the ZSTD library default.")
.version("0.6.0")
.intConf
.createWithDefault(3)
Comment on lines +132 to +137
@Override
public void close(boolean commitFilesFsync) {
try {
compressAndFlush();
if (commitFilesFsync) {
channel.force(false);
Comment on lines +62 to +67
ByteBuffer chunkBuf = MmapMemoryManager.getInstance().allocateBuffer((int) chunkSize);
// allocateDirect, NOT MmapMemoryManager: mmap duplicates share one backing region, so
// after clear() both chunkBuf and a mmap-backed compressedBuf would have position=0
// pointing to the same physical address. ZSTD would then write its frame header to
// mmap[0..N] before reading mmap[0..N] as input, silently corrupting the source.
ByteBuffer compressedBuf = MmapMemoryManager.getInstance().allocateBuffer((int) chunkSize);
Comment on lines +28 to +33
import java.util.logging.Logger;

public class MmapMemoryManager {
private static final Logger LOG = Logger.getLogger(MmapMemoryManager.class.getName());
private static MmapMemoryManager INSTANCE;
private static final long DEFAULT_FILE_LENGTH = 512 * 1024 * 1024L;
Comment on lines 329 to 332
returnedChunks++;
lastReturnedChunkId = chunk.getLeft();
return chunk.getRight();
return Pair.of(chunk.getRight(), true);
}

@SteNicholas SteNicholas left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a substantial, genuinely valuable feature (~40% disk reduction is a big deal), and the high-level design — local-disk-only scoping, a clean FileChannelWriter factory, appended proto fields, a guard on the sort path — shows good instincts. The core streaming-decompression machinery in CelebornInputStream is sound. But as implemented it is not safe to enable yet: there are several independent paths that cause silent data corruption or silent data loss when celeborn.chunk.compression.enabled=true. None affect the default-off path, so this isn't a regression for existing users — but the criticals below should block enabling/merging.

Critical (silent data loss / corruption when enabled)

C1 — Final-chunk flush failure is silently committed as success (data loss). ChunkCompressedFileChannelWriter.write() only flushes on overflow, so the trailing partial chunk is written only in close()compressAndFlush(). That close() catches IOException with an empty // log and ignore block, never signals the FlushNotifier, then unconditionally runs setBytesFlushed(...) + replaceFileMeta(...). Pre-PR, the final write happened on the flush thread and a failure propagated through notifier.checkException()Controller.commitFilesfailedIds. Now a failed last-chunk write commits the partition as success with its tail records missing — no FetchFailedException, no recompute. FileChannelWriter.close(boolean) doesn't declare throws, which is the root enabler. Fix: close() must propagate (or set the notifier) so the commit fails.

C2 — The per-chunk chunkCompressed list is never persisted → corruption after worker graceful-shutdown recovery. PbReduceFileMeta has only chunkOffsets + sorted; PbSerDeUtils.fromPbFileInfo rebuilds with new ReduceFileMeta(getChunkOffsetsList), dropping chunkCompressed/chunkSize. After recovery the stream handler sends an empty list, and the reader then defaults every chunk to compressed (see C4) — so any raw large-record chunk (flushLargeRecord records false) gets fed through ZstdInputStream → corrupt/failed reads. Fix: add chunkCompressed (and chunkSize) to PbReduceFileMeta and round-trip them; add a PbSerDeUtilsTest with an enabled context (the current test only uses disabled()).

C3 — DfsPartitionReader.next() hardcodes compressed = true. It returns Pair.of(chunk.getRight(), true) and never consults the per-chunk list. DFS/S3/OSS files are always written uncompressed (StorageManager forces disabled()), and a DFS network chunk is a dfsReadChunkSize slice, not a ZSTD frame. So enabled=true + remote storage wraps every raw slice in ZstdInputStream → total corruption. Fix: return false (DFS never chunk-compresses).

C4 — Decompression keys off the reader's global config + an "absent list ⇒ compressed" default, instead of the authoritative per-chunk flag. CelebornInputStream decompresses when (chunkCompressed && currentChunkCompressed), where chunkCompressed = conf.isChunkCompressionEnabled() (reader-side), and both readers compute getChunkCompressedCount()==0 || getChunkCompressed(idx) — i.e. absent list means "assume compressed." This is the lynchpin that turns C2/C3 and version skew into corruption: a NEW client (flag on) reading an OLD worker's uncompressed file, or a recovered file, decompresses raw bytes. Fix: make the per-chunk list the sole authority — absent/false ⇒ read raw; let the global config influence writes only.

High

H1 — LocalPartitionReader indexes the per-chunk flag with a relative index. getChunkCompressed(returnedChunks) but returnedChunks is relative to startChunkIndex; for skew/AQE sub-range reads the absolute index is startChunkIndex + returnedChunks (WorkerPartitionReader correctly uses the absolute chunk index). On a mixed compressed/raw file read via skew, the wrong flag is used → silent corruption.

H2 — AQE/skew read of a chunk-compressed file throws an uncaught UnsupportedOperationException. The PartitionFilesSorter guard throws a RuntimeException, but FetchHandler.handleReduceOpenStreamInternal/handleOpenStreamInternal catch only IOException, so it escapes: no OPEN_STREAM_FAILED reply, request hangs to timeout, no metric. Either throw IOException and surface it cleanly, or — better — reject enabled + AQE/skew at reserve time so the combination can't arise.

Medium

  • M1 — Compressed buffer sized at chunkSize, not Zstd.compressBound(chunkSize). A full incompressible chunk (likely, since this can stack on batch-level LZ4/ZSTD) overflows the destination and ZSTD throws; the return code also isn't checked with Zstd.isError. The client's own ZstdCompressor already uses compressBound — follow that. Combined with C1, an overflow on the last chunk = silent loss.
  • M2 — MmapMemoryManager.close() has no caller; the buffers are untracked, unbounded-until-peak, and /tmp-backed. Backing files (512 MB each) leak for the worker lifetime (reclaimed only via deleteOnExit), are not accounted by Celeborn's MemoryManager (bypassing congestion control), and on a tmpfs /tmp are RAM the worker doesn't know it holds. Ownership/lifecycle and memory budgeting need to be wired in.
  • M3 — bytesFlushed dual semantics. It's accumulated as uncompressed during flush, then overwritten with the compressed total in close(). Works only because of strict close-after-flush ordering and full meta replacement; fragile, and disk-usage metrics (getFileLength) now report compressed while in-flight incrementDiskBuffer tracks uncompressed.
  • M4 — celeborn.chunk.compression.level has no checkValue despite the documented 1–22 range; out-of-range passes straight to ZSTD.

Low / Nit

  • ChunkBufferPool comment says "allocateDirect, NOT MmapMemoryManager … would silently corrupt the source," but the code allocates the compressed buffer from MmapMemoryManager. It's safe today (the manager hands out non-overlapping slices), but the comment is actively misleading — fix it.
  • CHUNK_COMPRESSION_ENABLED is tagged version("0.3.0") (and in docs) for a feature added now — should be the current release.
  • destroy() now runs a wasted compress+write right before deleting the file.
  • Read metrics (incBytesRead) count decompressed batch sizes, over-reporting vs on-wire bytes.

Suggested framing

The cleanest fix narrative ties most of the criticals together: (1) make the per-chunk list the single source of truth for decompression (absent ⇒ raw), (2) persist that list, (3) have DFS report false, and (4) make close() propagate flush failures. With those plus compressBound and the mmap lifecycle, the feature gets much closer to safely enable-able. Given the promising results and that this was opened for early discussion, it might be worth landing it explicitly gated/experimental with these tracked as blockers-before-GA.

@saurabhd336 saurabhd336 requested a review from Copilot June 9, 2026 08:03

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 50 out of 50 changed files in this pull request and generated 4 comments.

Comments suppressed due to low confidence (4)

common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala:1

  • PbFileInfo now includes chunkCompressionConfig, and fromPbFileInfo reconstructs DiskFileInfo using pbFileInfo.getChunkCompressionConfig. However, toPbFileInfo never sets chunkCompressionConfig, so serialized metadata will always deserialize to (enabled=false, level=0) even for chunk-compressed shuffles. Populate builder.setChunkCompressionConfig(...) using the source file info's ChunkCompressionContext to preserve correctness across PB round-trips.
/*

worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala:1

  • If the read-back byte stream is corrupted or mis-ordered such that no expected prefix is found, this code silently drops the remainder (remaining = \"\"). That can let tests pass incorrectly (the loop that asserts blob content only iterates over whatever was extracted). Make this fail fast (e.g., throw/assert when None is hit) and assert that all expected prefixes were extracted (e.g., result.size == prefixes.length).
    worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala:1
  • The fixed Thread.sleep calls will make the suite significantly slower and can be flaky under load (timing-dependent). Prefer waiting on an explicit condition/ack (e.g., rely on mapperEnd/commit completion, or poll for committed state with a bounded timeout) and avoid unconditional sleeps in finally.
    worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala:1
  • The fixed Thread.sleep calls will make the suite significantly slower and can be flaky under load (timing-dependent). Prefer waiting on an explicit condition/ack (e.g., rely on mapperEnd/commit completion, or poll for committed state with a bounded timeout) and avoid unconditional sleeps in finally.

Comment on lines +27 to +36
public static FileChannelWriter getFileChannelWriter(
DiskFileInfo diskFileInfo, long chunkSize, ChunkBufferPool chunkBufferPool)
throws IOException {
if (diskFileInfo.isChunkCompressionEnabled()) {
return new ChunkCompressedFileChannelWriter(
diskFileInfo, chunkSize, diskFileInfo.getChunkCompressionLevel(), chunkBufferPool);
} else {
return new BypassFileChannelWriter(diskFileInfo);
}
}
Comment on lines +142 to +172
@Override
public void close(boolean commitFilesFsync) throws IOException {
if (closed) {
return;
}
closed = true;
IOException failure = null;
try {
compressAndFlush();
if (commitFilesFsync) {
channel.force(false);
}
} catch (IOException e) {
failure = e;
} finally {
try {
channel.close();
} catch (IOException e) {
if (failure == null) {
failure = e;
}
}
}

if (failure != null) {
throw failure;
}
diskFileInfo.setBytesFlushed(chunkOffsets.get(chunkOffsets.size() - 1));
diskFileInfo.replaceFileMeta(new ReduceFileMeta(chunkOffsets, chunkCompressed));
chunkBufferPool.release(bufferPair);
}
Comment on lines 178 to 185
val reduceFileMeta = fileInfo.getFileMeta.asInstanceOf[ReduceFileMeta]
builder.setPartitionType(PartitionType.REDUCE.getValue)
builder.addAllChunkOffsets(reduceFileMeta.getChunkOffsets)
if (reduceFileMeta.getChunkCompressed != null && !reduceFileMeta.getChunkCompressed.isEmpty) {
builder.addAllChunkCompressed(reduceFileMeta.getChunkCompressed)
}
}
builder.build
Comment on lines +88 to +110
public synchronized ByteBuffer allocateBuffer(long size) {
addFileIfNecessary(size);
ByteBuffer buffer = _currentBuffer.duplicate();
buffer.position((int) _availableOffset);
buffer.limit((int) (_availableOffset + size));
_availableOffset += size;
return buffer.slice();
}

public void close() {
// MappedByteBuffers cannot be explicitly unmapped in Java; GC handles the unmap.
// We clear the internal state and delete the backing files so disk space is reclaimed.
_memMappedBuffers.clear();
for (String path : _paths) {
File file = new File(path);
if (!file.delete()) {
LOG.warn("Unable to delete mmap backing file: {}", file);
}
}
_paths.clear();
_curFileLen = -1;
_availableOffset = DEFAULT_FILE_LENGTH;
}
@saurabhd336 saurabhd336 requested a review from Copilot June 9, 2026 08:15

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 50 out of 50 changed files in this pull request and generated 7 comments.

Comments suppressed due to low confidence (2)

worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala:1

  • LocalTierWriter closes fileChannelWriter in both closeStreams() and closeResource(). ChunkCompressedFileChannelWriter is idempotent via an internal closed flag, but BypassFileChannelWriter.close() is not; a second close can throw and potentially mask the original failure / complicate cleanup. Recommendation (required): make FileChannelWriter.close(...) idempotent for all implementations (e.g., add a closed guard to BypassFileChannelWriter), or ensure LocalTierWriter only closes once (e.g., track a closed flag and skip in closeResource() if already closed).
    worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala:1
  • LocalTierWriter closes fileChannelWriter in both closeStreams() and closeResource(). ChunkCompressedFileChannelWriter is idempotent via an internal closed flag, but BypassFileChannelWriter.close() is not; a second close can throw and potentially mask the original failure / complicate cleanup. Recommendation (required): make FileChannelWriter.close(...) idempotent for all implementations (e.g., add a closed guard to BypassFileChannelWriter), or ensure LocalTierWriter only closes once (e.g., track a closed flag and skip in closeResource() if already closed).

Comment on lines +170 to +171
diskFileInfo.setBytesFlushed(chunkOffsets.get(chunkOffsets.size() - 1));
diskFileInfo.replaceFileMeta(new ReduceFileMeta(chunkOffsets, chunkCompressed));
Comment on lines +861 to +867
while (true) {
if (readFully(currentStream, sizeBuf, 0, BATCH_HEADER_SIZE) < BATCH_HEADER_SIZE) {
closeCurrentStream();
if (!moveToNextChunk()) break;
setupCurrentStream();
continue;
}
}

currentChunk.readBytes(compressedBuf, 0, size);
readFully(currentStream, compressedBuf, 0, size);
}

currentChunk.readBytes(rawDataBuf, 0, size);
readFully(currentStream, rawDataBuf, 0, size);
this.localHostAddress = Utils.localHostName(conf);
this.shouldDecompress =
!conf.shuffleCompressionCodec().equals(CompressionCodec.NONE) && needDecompress;
this.chunkCompressed = conf.isChunkCompressionEnabled();
Comment on lines +826 to +827
currentStream =
(chunkCompressed && currentChunkCompressed) ? new ZstdInputStream(base) : base;
Comment on lines +81 to +86
ByteBuffer[] buffers = buffer.nioBuffers();
for (ByteBuffer byteBuffer : buffers) {
while (byteBuffer.hasRemaining()) {
chunkBuffer.put(byteBuffer);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants