[CELEBORN-2329][CIP22] Encryption at Rest Spark Impl#3689
Conversation
|
+CC @mridulm @rmcyang @FMX @SteNicholas PTAL, thanks. |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #3689 +/- ##
==========================================
+ Coverage 66.91% 66.98% +0.08%
==========================================
Files 358 359 +1
Lines 21986 22218 +232
Branches 1946 1968 +22
==========================================
+ Hits 14710 14881 +171
- Misses 6262 6315 +53
- Partials 1014 1022 +8 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR adds Spark-side “encryption at rest” support by introducing a pluggable crypto interface in the Celeborn client, wiring encryption into push paths and decryption into fetch/read paths, and providing a Spark-backed implementation that uses Spark’s IO encryption settings/key.
Changes:
- Introduces
CryptoHandlerand threads an optional handler throughShuffleClient/ShuffleClientImplinto push (encrypt) and read (decrypt) code paths. - Adds Spark implementation (
SparkCryptoHandler) + plumbing (SparkCommonUtils.getCryptoHandler, Spark shuffle readers/managers updated to pass the handler). - Updates tests and shaded artifacts/dependencies to include the needed crypto runtime.
Reviewed changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| client/src/test/java/org/apache/celeborn/client/read/CelebornInputStreamPeerFailoverTest.java | Updates test calls to new CelebornInputStream.create signature with optional crypto handler. |
| client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java | Encrypts outgoing shuffle payloads when crypto is enabled; passes handler into partition reads. |
| client/src/main/java/org/apache/celeborn/client/ShuffleClient.java | Adds ShuffleClient.get(...) overload accepting an optional crypto handler; exposes setup hook. |
| client/src/main/java/org/apache/celeborn/client/security/CryptoHandler.java | New interface for encrypt/decrypt on byte arrays. |
| client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java | Decrypts incoming batches before optional decompression and consumption. |
| client/src/main/java/org/apache/celeborn/client/DummyShuffleClient.java | Adds no-op crypto handler setup for dummy client. |
| client/pom.xml | Adds commons-crypto dependency to client module. |
| client-spark/spark-3/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReaderSuite.scala | Updates static mock to match new ShuffleClient.get signature. |
| client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala | Extends constructors/plumbing to carry optional crypto handler to ShuffleClient. |
| client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java | Updates reflective columnar shuffle reader construction to pass optional crypto handler. |
| client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java | Passes Spark-derived crypto handler into writers/readers. |
| client-spark/spark-3-shaded/pom.xml | Ensures commons-crypto is included in shaded Spark 3 artifact. |
| client-spark/spark-3-columnar-shuffle/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornColumnarShuffleReaderSuite.scala | Updates tests to pass Optional.empty() crypto handler. |
| client-spark/spark-3-columnar-shuffle/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornColumnarShuffleReader.scala | Threads optional crypto handler through columnar reader. |
| client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala | Threads optional crypto handler through Spark 2 reader. |
| client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java | Passes Spark-derived crypto handler into Spark 2 writers/readers. |
| client-spark/spark-2-shaded/pom.xml | Ensures commons-crypto is included in shaded Spark 2 artifact. |
| client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SparkCryptoHandlerSuiteJ.java | Adds unit tests for Spark crypto handler behavior. |
| client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SparkCryptoHandler.java | Implements CryptoHandler using Spark CryptoStreamUtils. |
| client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SparkCommonUtils.java | Derives optional crypto handler from Spark config/env IO encryption key. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
PR Review: [CELEBORN-2329][CIP22] Encryption at Rest Spark ImplOverall: Good architecture, some performance and safety concerns. The encrypt-at-client / decrypt-at-client approach is clean — workers store ciphertext, no key distribution needed to workers. Compress-then-encrypt ordering is correct. Performance Concerns1. Heavy allocation on hot path — every batch allocates multiple objects
For a typical shuffle with thousands of batches per second per executor, this generates significant GC pressure. Consider:
Moderate Issues2. Non-Spark engines (Flink, MR) using the Celeborn client now pull in 3. SparkCommonUtils.getCryptoHandler(conf) // called in getWriter
SparkCommonUtils.getCryptoHandler(conf) // called in getReader (multiple times for columnar path)Each call accesses Minor / Nits4. No AES-GCM / integrity protection Spark's 5. Test coverage is good — roundtrip, wrong key, large data, empty data, offset. Could add a test for corrupted ciphertext (flip a byte in encrypted output, verify decrypt fails or produces wrong output). 6. Debug logging on every batch logger.debug("Encrypted shuffle data for shuffle {} ...", shuffleId, ...);
logger.debug("Decrypted shuffle data for shuffle {} ...", shuffleId, ...);Even with debug disabled, the varargs boxing and string formatting preparation happens. On a hot path with millions of batches, this adds up. Consider guarding with Reviewed with Claude Code |
|
I will address the above comments in 2 weeks, I will be on vacation. |
1d92a40 to
cf8d472
Compare
Review — Encryption at Rest (Spark)Nice work. The wiring is clean and the integration points are well chosen. I traced the full write→store→read path and the design is sound. A few notes below, mostly non-blocking suggestions and a couple of questions. What works well
Suggestions / questions
Overall this looks solid and well-scoped. The main thing I'd push on before merge is item (1) — an integration test for the round-trip wiring. |
- Add upper-bound check (decryptedLength > length - 4) in SparkCryptoHandler.decrypt() to guard against OOM from corrupted or wrong-key input - Fix incBytesRead/incDuplicateBytesRead metric undercount in CelebornInputStream by tracking encryptedSize separately from the decrypted size - Fix testEncryptWithOffset to actually exercise a non-zero offset (offset=10) - Defensively handle null in ShuffleClientImpl.setupCryptoHandler to prevent NPE - Add fallback 9-arg constructor in SparkUtils.ColumnarShuffleReaderConstructorHolder for backward compatibility with older columnar-shuffle modules - Move commons-crypto dependency from client/pom.xml to client-spark/common/pom.xml so non-Spark engines (Flink, MR) no longer pull it in unnecessarily Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…uffleReader spark-3.5-columnar-shuffle and spark-4-columnar-shuffle test suites call createColumnarShuffleReader without the Optional<CryptoHandler> parameter, causing a compile error. Add a 9-arg overload that defaults to Optional.empty() so existing callers don't need to be updated. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Hi @mridulm @RexXiong @SteNicholas I addressed the above comments. Seems like some of the tests are failing due to transient errors, can you please help to re-run them? |
- Cache getCryptoHandler() result in SparkShuffleManager (Spark 2 + 3) to avoid re-reading and re-parsing the config on every shuffle read/write call; uses a volatile field for safe lazy initialization - Add CelebornInputStreamCryptoRoundTripSuiteJ: integration-style test that exercises the full encrypt-on-write / decrypt-on-read path in CelebornInputStream, including compress+encrypt ordering, integrity check compatibility, and large payloads Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
What changes were proposed in this pull request?
Adds EAR support for Spark side.
See more details in doc.
Why are the changes needed?
See more details in doc.
Does this PR resolve a correctness bug?
no
Does this PR introduce any user-facing change?
no
How was this patch tested?
unit tests and tested in production internally.