[CELEBORN-2350] Support chunk level compression to optimize storage#3699
[CELEBORN-2350] Support chunk level compression to optimize storage#3699saurabhd336 wants to merge 25 commits into
Conversation
|
Hi team @SteNicholas / @s0nskar / @zaynt4606 / others I wanted to start an early discussion for these proposed changes (can share a design doc too). At a high level, our Celeborn infra costs have largely been influenced by large locally attached SSD requirements. Additionally, also looking for ways to reduce celeborn network ingress / egress. This change helps is reducing both. Wanted to start this discussion for the change thanks |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3699 +/- ##
==========================================
+ Coverage 66.91% 67.28% +0.37%
==========================================
Files 358 360 +2
Lines 21986 22337 +351
Branches 1946 1982 +36
==========================================
+ Hits 14710 15027 +317
- Misses 6262 6285 +23
- Partials 1014 1025 +11 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
1d92a40 to
cf8d472
Compare
…ge-record chunks readChunks() was unconditionally decompressing every chunk, but large records are written raw (uncompressed) by flushLargeRecord(). The fix consults ReduceFileMeta.getChunkCompressed() per chunk and only calls ZstdInputStream on chunks that were actually compressed. Also exposes compressAndFlush() as public so the test can call it directly. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Hi team Ping on this again! So far, i've been able to get ~40% reduction in celeborn worker disk usage with a roughly 80% increase in CPU usage. A lot of times, the CPU usage of our celeborn fleets is idle, hence this tradeoff feels reasonable and worth testing at scale for us. Can you please take a look at this PR? |
There was a problem hiding this comment.
Pull request overview
This PR introduces chunk-level ZSTD compression for shuffle data written by workers (disk write path) and adds transparent per-chunk decompression on the client read path (via CelebornInputStream). It threads a new ChunkCompressionContext from client → worker (ReserveSlots) and extends stream metadata so clients can identify which chunks are compressed (to support “large record” chunks written uncompressed).
Changes:
- Add worker-side
FileChannelWriterabstraction with a newChunkCompressedFileChannelWriterthat buffers into fixed-size chunks and ZSTD-compresses each chunk before writing. - Extend wire/file metadata (
PbStreamHandler,PbFileInfo,ReduceFileMeta) to carry chunk-compressed flags and chunk compression config. - Update client readers and
CelebornInputStreamto support streaming chunk decompression while preserving existing batch-level codecs.
Reviewed changes
Copilot reviewed 49 out of 49 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerSuite.scala | Update test writer creation call to pass ChunkCompressionContext. |
| worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala | Update writer/file-info construction for new chunk compression context parameter. |
| worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala | Adjust mocked writer ctor signature for added parameter. |
| worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala | Adjust mocked writer ctor signature for added parameter. |
| worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala | Adjust mocked writer ctor signature for added parameter. |
| worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala | Adjust mocked writer ctor signature for added parameter. |
| worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala | Update writer creation for added chunk compression context parameter. |
| worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandlerSuite.scala | Update DiskFileInfo construction for added chunk compression context parameter. |
| worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala | New end-to-end mini-cluster tests for chunk-compressed read/write scenarios. |
| worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java | Update writer context creation for new chunk compression context parameter. |
| worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java | Update writer context creation for new chunk compression context parameter. |
| worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java | Update writer context creation for new chunk compression context parameter. |
| worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/file/chunk/compressed/MmapMemoryManagerSuiteJ.java | New unit tests for mmap-based buffer allocator. |
| worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/file/chunk/compressed/ChunkCompressedFileChannelWriterSuiteJ.java | New unit tests validating chunk writer chunking/offsets/compression behavior. |
| worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/file/chunk/compressed/ChunkBufferPoolSuiteJ.java | New unit tests for pooled buffer pair behavior and concurrency. |
| worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala | Switch local tier writer flush to use FileChannelWriter abstraction. |
| worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala | Pass chunk compression context into disk file creation. |
| worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala | Thread chunk compression context through writer creation and DiskFileInfo. |
| worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala | Use FileChannelWriter in local flush task rather than raw FileChannel. |
| worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala | Include per-chunk compression flags in stream handler metadata to clients. |
| worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala | Receive/pass chunk compression context from ReserveSlots to writer creation. |
| worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java | Explicitly reject sorting chunk-compressed shuffle files (not yet supported). |
| worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterContext.java | Add ChunkCompressionContext to writer context and accessors. |
| worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/FileWriterType.java | New enum for file writer types. |
| worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/FileChannelWriterFactory.java | Factory to choose bypass vs chunk-compressed writer based on file info. |
| worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/FileChannelWriter.java | New abstraction for file write/close. |
| worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/chunk/compressed/MmapMemoryManager.java | New mmap-backed allocator for chunk buffers. |
| worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/chunk/compressed/ChunkCompressedFileChannelWriter.java | New chunk-buffering + ZSTD-compressing local file writer. |
| worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/chunk/compressed/ChunkBufferPool.java | New pool for reusable (chunk, compressed) buffer pairs. |
| worker/src/main/java/org/apache/celeborn/service/deploy/worker/file/BypassFileChannelWriter.java | New writer preserving existing direct-to-channel behavior. |
| worker/pom.xml | Add zstd-jni dependency for worker module. |
| project/CelebornBuild.scala | Add zstd JNI dependency for worker build. |
| docs/configuration/client.md | Document new chunk compression configs. |
| common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala | Update PB serde tests for new file info ctor + chunk compression context. |
| common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala | Serialize/deserialize chunk compression config into PbFileInfo. |
| common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala | Send/receive chunk compression config in ReserveSlots messages. |
| common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala | Add chunk compression config entries + accessors. |
| common/src/main/proto/TransportMessages.proto | Add PbChunkCompressionConfig and per-chunk compressed flags to stream handler. |
| common/src/main/java/org/apache/celeborn/common/network/buffer/FileChunkBuffers.java | Enforce no sliced reads for chunk-compressed files. |
| common/src/main/java/org/apache/celeborn/common/meta/ReduceFileMeta.java | Track per-chunk compression flags in reduce metadata. |
| common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java | Add setBytesFlushed to override file length post-close. |
| common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java | Add ChunkCompressionContext and expose chunk compression settings. |
| common/src/main/java/org/apache/celeborn/common/compression/ChunkCompressionContext.java | New context object to carry chunk compression settings. |
| client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala | Send chunk compression settings to worker via ReserveSlots. |
| client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java | Return (ByteBuf, isChunkCompressed) from next(). |
| client/src/main/java/org/apache/celeborn/client/read/PartitionReader.java | Change API to return (ByteBuf, isChunkCompressed) from next(). |
| client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java | Return per-chunk compression flag alongside the chunk buffer. |
| client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java | Return per-chunk compression flag alongside the chunk buffer. |
| client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java | Stream per-chunk ZSTD decompression and handle large-record uncompressed chunks. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| int chunkIdx = returnedChunks; | ||
| returnedChunks++; | ||
| return chunk; | ||
| // If no per-chunk list was sent (old worker), treat as compressed to honour the global flag. | ||
| boolean compressed = | ||
| streamHandler.getChunkCompressedCount() == 0 || streamHandler.getChunkCompressed(chunkIdx); | ||
| return Pair.of(chunk, compressed); |
| if (shouldDecompress) { | ||
| if (size > compressedBuf.length) { | ||
| compressedBuf = new byte[size]; | ||
| } | ||
|
|
||
| currentChunk.readBytes(compressedBuf, 0, size); | ||
| readFully(currentStream, compressedBuf, 0, size); | ||
| } else { | ||
| if (size > rawDataBuf.length) { | ||
| rawDataBuf = new byte[size]; | ||
| } | ||
|
|
||
| currentChunk.readBytes(rawDataBuf, 0, size); | ||
| readFully(currentStream, rawDataBuf, 0, size); | ||
| } |
| @Override | ||
| public ManagedBuffer chunk(int chunkIndex, int offset, int len) { | ||
| // sliced reads unsupported for chunkCompressed files | ||
| assert (!isChunkCompressed || (offset == 0 && len == Integer.MAX_VALUE)); | ||
| Tuple2<Long, Long> offsetLen = getChunkOffsetLength(chunkIndex, offset, len); |
| val CHUNK_COMPRESSION_LEVEL: ConfigEntry[Int] = | ||
| buildConf("celeborn.chunk.compression.level") | ||
| .categories("client") | ||
| .doc( | ||
| "ZSTD compression level to use for chunk-level compression " + | ||
| "(celeborn.chunk.compression.enabled must be true). " + | ||
| "Valid range is 1–22; the default (3) matches the ZSTD library default.") | ||
| .version("0.6.0") | ||
| .intConf | ||
| .createWithDefault(3) |
| @Override | ||
| public void close(boolean commitFilesFsync) { | ||
| try { | ||
| compressAndFlush(); | ||
| if (commitFilesFsync) { | ||
| channel.force(false); |
| ByteBuffer chunkBuf = MmapMemoryManager.getInstance().allocateBuffer((int) chunkSize); | ||
| // allocateDirect, NOT MmapMemoryManager: mmap duplicates share one backing region, so | ||
| // after clear() both chunkBuf and a mmap-backed compressedBuf would have position=0 | ||
| // pointing to the same physical address. ZSTD would then write its frame header to | ||
| // mmap[0..N] before reading mmap[0..N] as input, silently corrupting the source. | ||
| ByteBuffer compressedBuf = MmapMemoryManager.getInstance().allocateBuffer((int) chunkSize); |
| import java.util.logging.Logger; | ||
|
|
||
| public class MmapMemoryManager { | ||
| private static final Logger LOG = Logger.getLogger(MmapMemoryManager.class.getName()); | ||
| private static MmapMemoryManager INSTANCE; | ||
| private static final long DEFAULT_FILE_LENGTH = 512 * 1024 * 1024L; |
| returnedChunks++; | ||
| lastReturnedChunkId = chunk.getLeft(); | ||
| return chunk.getRight(); | ||
| return Pair.of(chunk.getRight(), true); | ||
| } |
SteNicholas
left a comment
There was a problem hiding this comment.
This is a substantial, genuinely valuable feature (~40% disk reduction is a big deal), and the high-level design — local-disk-only scoping, a clean FileChannelWriter factory, appended proto fields, a guard on the sort path — shows good instincts. The core streaming-decompression machinery in CelebornInputStream is sound. But as implemented it is not safe to enable yet: there are several independent paths that cause silent data corruption or silent data loss when celeborn.chunk.compression.enabled=true. None affect the default-off path, so this isn't a regression for existing users — but the criticals below should block enabling/merging.
Critical (silent data loss / corruption when enabled)
C1 — Final-chunk flush failure is silently committed as success (data loss). ChunkCompressedFileChannelWriter.write() only flushes on overflow, so the trailing partial chunk is written only in close() → compressAndFlush(). That close() catches IOException with an empty // log and ignore block, never signals the FlushNotifier, then unconditionally runs setBytesFlushed(...) + replaceFileMeta(...). Pre-PR, the final write happened on the flush thread and a failure propagated through notifier.checkException() → Controller.commitFiles → failedIds. Now a failed last-chunk write commits the partition as success with its tail records missing — no FetchFailedException, no recompute. FileChannelWriter.close(boolean) doesn't declare throws, which is the root enabler. Fix: close() must propagate (or set the notifier) so the commit fails.
C2 — The per-chunk chunkCompressed list is never persisted → corruption after worker graceful-shutdown recovery. PbReduceFileMeta has only chunkOffsets + sorted; PbSerDeUtils.fromPbFileInfo rebuilds with new ReduceFileMeta(getChunkOffsetsList), dropping chunkCompressed/chunkSize. After recovery the stream handler sends an empty list, and the reader then defaults every chunk to compressed (see C4) — so any raw large-record chunk (flushLargeRecord records false) gets fed through ZstdInputStream → corrupt/failed reads. Fix: add chunkCompressed (and chunkSize) to PbReduceFileMeta and round-trip them; add a PbSerDeUtilsTest with an enabled context (the current test only uses disabled()).
C3 — DfsPartitionReader.next() hardcodes compressed = true. It returns Pair.of(chunk.getRight(), true) and never consults the per-chunk list. DFS/S3/OSS files are always written uncompressed (StorageManager forces disabled()), and a DFS network chunk is a dfsReadChunkSize slice, not a ZSTD frame. So enabled=true + remote storage wraps every raw slice in ZstdInputStream → total corruption. Fix: return false (DFS never chunk-compresses).
C4 — Decompression keys off the reader's global config + an "absent list ⇒ compressed" default, instead of the authoritative per-chunk flag. CelebornInputStream decompresses when (chunkCompressed && currentChunkCompressed), where chunkCompressed = conf.isChunkCompressionEnabled() (reader-side), and both readers compute getChunkCompressedCount()==0 || getChunkCompressed(idx) — i.e. absent list means "assume compressed." This is the lynchpin that turns C2/C3 and version skew into corruption: a NEW client (flag on) reading an OLD worker's uncompressed file, or a recovered file, decompresses raw bytes. Fix: make the per-chunk list the sole authority — absent/false ⇒ read raw; let the global config influence writes only.
High
H1 — LocalPartitionReader indexes the per-chunk flag with a relative index. getChunkCompressed(returnedChunks) but returnedChunks is relative to startChunkIndex; for skew/AQE sub-range reads the absolute index is startChunkIndex + returnedChunks (WorkerPartitionReader correctly uses the absolute chunk index). On a mixed compressed/raw file read via skew, the wrong flag is used → silent corruption.
H2 — AQE/skew read of a chunk-compressed file throws an uncaught UnsupportedOperationException. The PartitionFilesSorter guard throws a RuntimeException, but FetchHandler.handleReduceOpenStreamInternal/handleOpenStreamInternal catch only IOException, so it escapes: no OPEN_STREAM_FAILED reply, request hangs to timeout, no metric. Either throw IOException and surface it cleanly, or — better — reject enabled + AQE/skew at reserve time so the combination can't arise.
Medium
- M1 — Compressed buffer sized at
chunkSize, notZstd.compressBound(chunkSize). A full incompressible chunk (likely, since this can stack on batch-level LZ4/ZSTD) overflows the destination and ZSTD throws; the return code also isn't checked withZstd.isError. The client's ownZstdCompressoralready usescompressBound— follow that. Combined with C1, an overflow on the last chunk = silent loss. - M2 —
MmapMemoryManager.close()has no caller; the buffers are untracked, unbounded-until-peak, and/tmp-backed. Backing files (512 MB each) leak for the worker lifetime (reclaimed only viadeleteOnExit), are not accounted by Celeborn'sMemoryManager(bypassing congestion control), and on a tmpfs/tmpare RAM the worker doesn't know it holds. Ownership/lifecycle and memory budgeting need to be wired in. - M3 —
bytesFlusheddual semantics. It's accumulated as uncompressed during flush, then overwritten with the compressed total inclose(). Works only because of strict close-after-flush ordering and full meta replacement; fragile, and disk-usage metrics (getFileLength) now report compressed while in-flightincrementDiskBuffertracks uncompressed. - M4 —
celeborn.chunk.compression.levelhas nocheckValuedespite the documented 1–22 range; out-of-range passes straight to ZSTD.
Low / Nit
ChunkBufferPoolcomment says "allocateDirect, NOT MmapMemoryManager … would silently corrupt the source," but the code allocates the compressed buffer fromMmapMemoryManager. It's safe today (the manager hands out non-overlapping slices), but the comment is actively misleading — fix it.CHUNK_COMPRESSION_ENABLEDis taggedversion("0.3.0")(and in docs) for a feature added now — should be the current release.destroy()now runs a wasted compress+write right before deleting the file.- Read metrics (
incBytesRead) count decompressed batch sizes, over-reporting vs on-wire bytes.
Suggested framing
The cleanest fix narrative ties most of the criticals together: (1) make the per-chunk list the single source of truth for decompression (absent ⇒ raw), (2) persist that list, (3) have DFS report false, and (4) make close() propagate flush failures. With those plus compressBound and the mmap lifecycle, the feature gets much closer to safely enable-able. Given the promising results and that this was opened for early discussion, it might be worth landing it explicitly gated/experimental with these tracked as blockers-before-GA.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 50 out of 50 changed files in this pull request and generated 4 comments.
Comments suppressed due to low confidence (4)
common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala:1
PbFileInfonow includeschunkCompressionConfig, andfromPbFileInforeconstructsDiskFileInfousingpbFileInfo.getChunkCompressionConfig. However,toPbFileInfonever setschunkCompressionConfig, so serialized metadata will always deserialize to(enabled=false, level=0)even for chunk-compressed shuffles. Populatebuilder.setChunkCompressionConfig(...)using the source file info'sChunkCompressionContextto preserve correctness across PB round-trips.
/*
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala:1
- If the read-back byte stream is corrupted or mis-ordered such that no expected prefix is found, this code silently drops the remainder (
remaining = \"\"). That can let tests pass incorrectly (the loop that asserts blob content only iterates over whatever was extracted). Make this fail fast (e.g., throw/assert whenNoneis hit) and assert that all expected prefixes were extracted (e.g.,result.size == prefixes.length).
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala:1 - The fixed
Thread.sleepcalls will make the suite significantly slower and can be flaky under load (timing-dependent). Prefer waiting on an explicit condition/ack (e.g., rely onmapperEnd/commit completion, or poll for committed state with a bounded timeout) and avoid unconditional sleeps infinally.
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ChunkCompressedReadWriteTest.scala:1 - The fixed
Thread.sleepcalls will make the suite significantly slower and can be flaky under load (timing-dependent). Prefer waiting on an explicit condition/ack (e.g., rely onmapperEnd/commit completion, or poll for committed state with a bounded timeout) and avoid unconditional sleeps infinally.
| public static FileChannelWriter getFileChannelWriter( | ||
| DiskFileInfo diskFileInfo, long chunkSize, ChunkBufferPool chunkBufferPool) | ||
| throws IOException { | ||
| if (diskFileInfo.isChunkCompressionEnabled()) { | ||
| return new ChunkCompressedFileChannelWriter( | ||
| diskFileInfo, chunkSize, diskFileInfo.getChunkCompressionLevel(), chunkBufferPool); | ||
| } else { | ||
| return new BypassFileChannelWriter(diskFileInfo); | ||
| } | ||
| } |
| @Override | ||
| public void close(boolean commitFilesFsync) throws IOException { | ||
| if (closed) { | ||
| return; | ||
| } | ||
| closed = true; | ||
| IOException failure = null; | ||
| try { | ||
| compressAndFlush(); | ||
| if (commitFilesFsync) { | ||
| channel.force(false); | ||
| } | ||
| } catch (IOException e) { | ||
| failure = e; | ||
| } finally { | ||
| try { | ||
| channel.close(); | ||
| } catch (IOException e) { | ||
| if (failure == null) { | ||
| failure = e; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if (failure != null) { | ||
| throw failure; | ||
| } | ||
| diskFileInfo.setBytesFlushed(chunkOffsets.get(chunkOffsets.size() - 1)); | ||
| diskFileInfo.replaceFileMeta(new ReduceFileMeta(chunkOffsets, chunkCompressed)); | ||
| chunkBufferPool.release(bufferPair); | ||
| } |
| val reduceFileMeta = fileInfo.getFileMeta.asInstanceOf[ReduceFileMeta] | ||
| builder.setPartitionType(PartitionType.REDUCE.getValue) | ||
| builder.addAllChunkOffsets(reduceFileMeta.getChunkOffsets) | ||
| if (reduceFileMeta.getChunkCompressed != null && !reduceFileMeta.getChunkCompressed.isEmpty) { | ||
| builder.addAllChunkCompressed(reduceFileMeta.getChunkCompressed) | ||
| } | ||
| } | ||
| builder.build |
| public synchronized ByteBuffer allocateBuffer(long size) { | ||
| addFileIfNecessary(size); | ||
| ByteBuffer buffer = _currentBuffer.duplicate(); | ||
| buffer.position((int) _availableOffset); | ||
| buffer.limit((int) (_availableOffset + size)); | ||
| _availableOffset += size; | ||
| return buffer.slice(); | ||
| } | ||
|
|
||
| public void close() { | ||
| // MappedByteBuffers cannot be explicitly unmapped in Java; GC handles the unmap. | ||
| // We clear the internal state and delete the backing files so disk space is reclaimed. | ||
| _memMappedBuffers.clear(); | ||
| for (String path : _paths) { | ||
| File file = new File(path); | ||
| if (!file.delete()) { | ||
| LOG.warn("Unable to delete mmap backing file: {}", file); | ||
| } | ||
| } | ||
| _paths.clear(); | ||
| _curFileLen = -1; | ||
| _availableOffset = DEFAULT_FILE_LENGTH; | ||
| } |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 50 out of 50 changed files in this pull request and generated 7 comments.
Comments suppressed due to low confidence (2)
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala:1
LocalTierWriterclosesfileChannelWriterin bothcloseStreams()andcloseResource().ChunkCompressedFileChannelWriteris idempotent via an internalclosedflag, butBypassFileChannelWriter.close()is not; a second close can throw and potentially mask the original failure / complicate cleanup. Recommendation (required): makeFileChannelWriter.close(...)idempotent for all implementations (e.g., add aclosedguard toBypassFileChannelWriter), or ensureLocalTierWriteronly closes once (e.g., track a closed flag and skip incloseResource()if already closed).
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala:1LocalTierWriterclosesfileChannelWriterin bothcloseStreams()andcloseResource().ChunkCompressedFileChannelWriteris idempotent via an internalclosedflag, butBypassFileChannelWriter.close()is not; a second close can throw and potentially mask the original failure / complicate cleanup. Recommendation (required): makeFileChannelWriter.close(...)idempotent for all implementations (e.g., add aclosedguard toBypassFileChannelWriter), or ensureLocalTierWriteronly closes once (e.g., track a closed flag and skip incloseResource()if already closed).
| diskFileInfo.setBytesFlushed(chunkOffsets.get(chunkOffsets.size() - 1)); | ||
| diskFileInfo.replaceFileMeta(new ReduceFileMeta(chunkOffsets, chunkCompressed)); |
| while (true) { | ||
| if (readFully(currentStream, sizeBuf, 0, BATCH_HEADER_SIZE) < BATCH_HEADER_SIZE) { | ||
| closeCurrentStream(); | ||
| if (!moveToNextChunk()) break; | ||
| setupCurrentStream(); | ||
| continue; | ||
| } |
| } | ||
|
|
||
| currentChunk.readBytes(compressedBuf, 0, size); | ||
| readFully(currentStream, compressedBuf, 0, size); |
| } | ||
|
|
||
| currentChunk.readBytes(rawDataBuf, 0, size); | ||
| readFully(currentStream, rawDataBuf, 0, size); |
| this.localHostAddress = Utils.localHostName(conf); | ||
| this.shouldDecompress = | ||
| !conf.shuffleCompressionCodec().equals(CompressionCodec.NONE) && needDecompress; | ||
| this.chunkCompressed = conf.isChunkCompressionEnabled(); |
| currentStream = | ||
| (chunkCompressed && currentChunkCompressed) ? new ZstdInputStream(base) : base; |
| ByteBuffer[] buffers = buffer.nioBuffers(); | ||
| for (ByteBuffer byteBuffer : buffers) { | ||
| while (byteBuffer.hasRemaining()) { | ||
| chunkBuffer.put(byteBuffer); | ||
| } | ||
| } |
What changes were proposed in this pull request?
Adds chunk-level ZSTD compression on the worker write path and streaming decompression on the client read path. Records accumulate in a fixed-size chunk buffer (default 8 MB); when the buffer overflows it is compressed as a single ZSTD frame and written to disk file. On the read side, CelebornInputStream wraps each fetched chunk in a ZstdInputStream to uncompress.
This is orthogonal to the existing batch-level LZ4/ZSTD codec: both can be active simultaneously, or the batch codec can be NONE.
This change primarily helps in reducing the disk usage (~40% lower disk usage seen in tests) as well as read flow celeborn network egress.
Impl details
Writer side
FileChannelWriterinterface which supports write / close functionalities.BypassFileChannelWriteris the default and ensures the current behaviour (directly write flushBuffer to disk file channel).ChunkCompressedFileChannelWriter: Accumulates records in a directByteBufferofchunkSizebytes. On overflow, ZSTD-compresses and writes as a single frame. Records larger thanchunkSizestream directly to disk viaZstdOutputStream. Replaces compressed chunk-boundary offsets intoReduceFileMetaon close. Also updates thebytesFlushedto overwrite the FileInfo length post close. The buffers used to buffer chunkSize data before compression and flush is powered byMmapMemoryManagerandChunkBufferPoolwhich uses mmap'ed temporary files to avoid the memory overhead of buffering chunk sized data.ChunkCompressedFileChannelWriterandFileChannelWriteris made basis the new config set by client duringReserveSlots(conf.isChunkCompressionEnabled).Read side
CelebornInputStream: When reading chunkCompressed chunks, wraps the readByteBufinto a ZSTDIs to inplace decompress and read.Configs added
celeborn.chunk.compression.enabledfalseCelebornInputStream.Does this PR resolve a correctness bug?
No
Does this PR introduce any user-facing change?
celeborn.chunk.compression.enabledconfig to enable / disable chunk level compression (disabled by default)How was this patch tested?
UTs, ITs