Add tuple compression for inter-worker communication#8707
Conversation
rzo1
left a comment
There was a problem hiding this comment.
Thanks for the PR. The feature is well-scoped (opt-in, per-component, threshold-gated) and the test coverage is solid. The decompression-bomb defense via the bounded ZstdUtils.decompress cap is the right control. A few things to address, mostly around the receive path.
1. (Blocking) The blank catch (RuntimeException) in KryoTupleDeserializer.deserialize masks the security-cap signal.
The catch silently swallows decompress()'s "Decompression threshold exceeded! Possible security risk" rejection and re-parses the still-compressed bytes, surfacing it as a generic "Failed to deserialize tuple". A cap-exceeded payload is a valid zstd frame (it decompressed far enough to bust the limit), so it is definitely not a magic false-positive and must propagate, not fall back. Please distinguish "cap exceeded / malformed frame" (propagate the original exception) from a true header collision (the only case where retry-as-raw is justified), so the security-relevant rejection isn't lost.
Relatedly, the justifying comment is incorrect. zstd's magic 0xFD2FB528 is little-endian on the wire, so the byte isZstd matches at offset 0 is 0x28, not 0xFD, and a Kryo writeInt(taskId, true) of 40 produces exactly 0x28. What actually makes a collision astronomically unlikely is the second field (it would require streamId == 6069), not the taskId range. Please correct the comment so the reasoning matches reality.
2. Use a dedicated decompression cap for tuples.
The deserializer reuses storm.compression.zstd.max.decompressed.bytes (default 100 MB), which is shared with cluster-state serialization. 100 MB per tuple on the inter-worker hot path is very high. Please add a dedicated key (e.g. topology.tuple.compression.max.decompressed.bytes, @IsPositiveNumber) so the per-tuple bound can be tuned and lowered independently of cluster-state. A lower default would be worth discussing here.
3. Only enter the decompress branch when the topology actually uses compression.
The magic-byte check itself is effectively free, so no concern there. But the decompress branch is currently reachable for every topology's inter-worker traffic, including ones that never enable the feature. Since workers are per-topology, you can compute once at KryoTupleDeserializer construction whether any component in the topology enables compression (scan the merged component configs from the context's raw topology) and only take the decompress branch if so. Zero per-tuple cost, and topologies that don't use the feature never touch the zstd path at all. Good defense-in-depth alongside items 1 and 2.
4. Flux: the recommended per-component usage isn't expressible today.
The docs recommend enabling this per component via declarer.addConfiguration(...), but Flux has no per-component config mechanism. FluxBuilder only applies parallelism/memory/CPU/numTasks/groupings to the declarers, and config: is topology-scope only. So Flux users can only enable it topology-wide, which is the mode the PR advises against. Preferred fix: add a per-component config: map to BoltDef/SpoutDef, applied via declarer.addConfigurations(...) in FluxBuilder. If that's out of scope for this PR, please open a tracking issue before merge so it doesn't drift, and have docs/Serialization.md state explicitly that, via Flux, only topology-wide enablement is currently possible (with the config: snippet).
Minor:
Config.javajavadoc for the threshold says "equals or exceeds ... will be compressed", but the code isdataLength > threshold(strict) anddocs/Serialization.mdsays "at or below ... uncompressed". Align theConfig.javajavadoc to the code.FileReadWordCountSpoutCompressionTopo.TOPOLOGY_NAME = "FileReadWordCountTopo"collides with the existing example, so the two can't coexist on a cluster. Rename.- Tests pass
threshold = 0directly into the conf map. With@IsPositiveNumberthat value may be rejected by real config validation, which is fine for the test, just noting it isn't a usable production value.
|
I'll review this ASAP |
|
Thanks for the detailed and insightful review @rzo1! I have addressed all your points and pushed the updates: Refactored the catch block in @reiabreu, the PR is ready for your review whenever you have time. Standing by for any further feedback or changes you might suggest! |
| // on the wire, the first byte checked is 0x28. A Kryo writeInt(taskId, true) of 40 yields exactly 0x28. | ||
| // The collision is prevented not by the taskId range, but by the second field (streamId), | ||
| // which would rigidly have to equal 6069 to match the remaining magic bytes. | ||
| // Branch retained for correctness in case of an accidental collision. |
There was a problem hiding this comment.
do we want a log message here?
| byte[] rawBytes = kryoOut.getBuffer(); | ||
| int dataLength = kryoOut.position(); | ||
|
|
||
| if (this.isCompressionEnabled && dataLength > this.compressionThreshold) { |
There was a problem hiding this comment.
I ran this by AI and it proposed a low level optimization that is very interesting
Minor area where we can optimize memory allocations and garbage collection overhead. In the current compression path, we perform two allocations and two copy operations before the final payload is returned:
- Arrays.copyOf(rawBytes, dataLength) inside KryoTupleSerializer to extract the active slice.
- bos.toByteArray() inside Utils.ZstdUtils.compress to extract the compressed data.
We can cut this in half by introducing an offset-aware signature for Utils.ZstdUtils.compress . This lets us stream the active slice of the pre-allocated Kryo buffer directly into Zstd, avoiding the first copy entirely.
Here is the proposed change:
1. In Utils.java ( ZstdUtils class)
Add a helper signature that accepts an offset and length :
public static byte[] compress(byte[] data, int offset, int length, int compressionLevel) {
if (data == null || length == 0) {
return new byte[0];
}
try (ByteArrayOutputStream bos = new ByteArrayOutputStream(length)) {
try (ZstdCompressorOutputStream zstdOut = ZstdCompressorOutputStream.builder()
.setOutputStream(bos)
.setBufferSize(BUFFER_SIZE)
.setLevel(compressionLevel)
.get()) {
zstdOut.write(data, offset, length); // Write the slice directly
zstdOut.finish();
}
return bos.toByteArray();
} catch (IOException e) {
throw new RuntimeException("Zstd compression failed", e);
}
}
// Keep the existing signature for backwards compatibility
public static byte[] compress(byte[] data, int compressionLevel) {
return compress(data, 0, data == null ? 0 : data.length, compressionLevel);
}
2. In KryoTupleSerializer.java
Update the serialization call site to pass the raw buffer along with its offset and length directly:
- if (this.isCompressionEnabled && dataLength > this.compressionThreshold) {
- byte[] bytesToCompress = Arrays.copyOf(rawBytes, dataLength);
- return Utils.ZstdUtils.compress(bytesToCompress, this.zstdCompressionLevel);
- } else {
- return Arrays.copyOf(rawBytes, dataLength);
- }
+ if (this.isCompressionEnabled && dataLength > this.compressionThreshold) {
+ return Utils.ZstdUtils.compress(rawBytes, 0, dataLength, this.zstdCompressionLevel);
+ } else {
+ return Arrays.copyOf(rawBytes, dataLength);
+ }
This simple optimization reduces GC allocation and copy overhead by 50% for every compressed tuple on the serialization sid
|
@GGraziadei Thanks for this contribution. It's elegant, clean and a very nice addition to Storm |
| - **Intra-worker (local) traffic** bypasses `KryoTupleSerializer` altogether, so it is never compressed regardless of configuration. You do not pay any CPU cost for tuples that stay inside a worker process. | ||
| - **Inter-worker (remote) traffic** is compressed only when compression is enabled for the source component *and* the serialized tuple is larger than the configured threshold. Small tuples (single words, IDs, etc.) are left uncompressed, since the framing overhead of a compressed payload can exceed the original size. | ||
|
|
||
| #### Enabling compression per component |
There was a problem hiding this comment.
Since you did some benchmarks what do you think about including them? With the obvious disclaimer that the tests were done in a limited capacity within the PR and should only used as a guide.
What is the purpose of the change
I would like to address the scenarios where compressing serialized tuples actually provides a performance benefit and introduce a way to control it.
The only scenario where compressing a serialized tuple makes sense is during inter-worker communication where the developer expects very large tuple sizes.
A perfect example within the codebase is examples/FileReadWordCountTopo. In this topology, the FileReadSpout emits entire lines/sentences of text to the SplitSentenceBolt. If these two components end up on different remote workers, compressing the serialized tuples on this specific stream would drastically reduce network I/O.
I added more details in
docs/Serialization.mdHow was the change tested
Benchmark
I executed a benchmark to evaluate the improvements introduced by this PR.
I used the dev Storm cluster environment proposed in this PR:
#8706
I executed the WordCount topology in two versions:
FileReadWordCountSpoutCompressionTopo(with compression)FileReadWordCountTopo(without compression)The transmitted sentences are all approximately 1500 bytes (contained in
longrandomwords.txt).This value was intentionally selected to reproduce cases where the payload spans more than one TCP segment.
I simulated a network with 10 ms latency and 0.5 ms jitter between workers.
This setup does not exactly represent a typical intra-DC network, but it emphasizes the maximum potential advantage that this PR can provide when properly configured. Consider that I am using the minimum tuple size that permits a real advantage.
Simulated Network Ping
This is a sample ping between two supervisors in the docker network (round-trip)
Benchmark Results
Grafana snapshots with v2 metrics per task
FileReadWordCountTopo
https://snapshots.raintank.io/dashboard/snapshot/T0Z6BqAnQlA0aMQMypNgAaHN70NZa0T4?orgId=0&from=2026-05-23T11:08:29.369Z&to=2026-05-23T11:16:41.770Z&timezone=browser&var-topology=FileReadWordCountTopo-2-1779534460&var-host=$__all&var-component=$__all&var-task=$__all&refresh=10s
FileReadWordCountSpoutCompressionTopo
https://snapshots.raintank.io/dashboard/snapshot/hIZ7uQuN4w0C6hunVmY1C9Hh0sGUYg0y?orgId=0&from=2026-05-23T11:03:22.690Z&to=2026-05-23T11:11:48.769Z&timezone=browser&var-topology=FileReadWordCountTopo-1-1779534165&var-host=$__all&var-component=$__all&var-task=$__all&refresh=10s
In the context of #8701