Skip to content

Add tuple compression for inter-worker communication#8707

Open
GGraziadei wants to merge 11 commits into
apache:masterfrom
GGraziadei:8701-component-level-tuple-compression
Open

Add tuple compression for inter-worker communication#8707
GGraziadei wants to merge 11 commits into
apache:masterfrom
GGraziadei:8701-component-level-tuple-compression

Conversation

@GGraziadei
Copy link
Copy Markdown
Contributor

What is the purpose of the change

I would like to address the scenarios where compressing serialized tuples actually provides a performance benefit and introduce a way to control it.
The only scenario where compressing a serialized tuple makes sense is during inter-worker communication where the developer expects very large tuple sizes.
A perfect example within the codebase is examples/FileReadWordCountTopo. In this topology, the FileReadSpout emits entire lines/sentences of text to the SplitSentenceBolt. If these two components end up on different remote workers, compressing the serialized tuples on this specific stream would drastically reduce network I/O.

I added more details in docs/Serialization.md

How was the change tested

  • Unit test
  • Smoke test on cluster env

Benchmark

I executed a benchmark to evaluate the improvements introduced by this PR.

I used the dev Storm cluster environment proposed in this PR:
#8706

I executed the WordCount topology in two versions:

  • FileReadWordCountSpoutCompressionTopo (with compression)
  • FileReadWordCountTopo (without compression)

The transmitted sentences are all approximately 1500 bytes (contained in longrandomwords.txt).
This value was intentionally selected to reproduce cases where the payload spans more than one TCP segment.

I simulated a network with 10 ms latency and 0.5 ms jitter between workers.

This setup does not exactly represent a typical intra-DC network, but it emphasizes the maximum potential advantage that this PR can provide when properly configured. Consider that I am using the minimum tuple size that permits a real advantage.


Simulated Network Ping

This is a sample ping between two supervisors in the docker network (round-trip)

./netsim.sh ping
==> RTT from cluster-supervisor1-1 to cluster-supervisor2-1 (5 pings)

PING cluster-supervisor2-1 (172.22.0.9) 56(84) bytes of data.
64 bytes from cluster-supervisor2-1.cluster_storm (172.22.0.9): icmp_seq=1 ttl=64 time=42.5 ms
64 bytes from cluster-supervisor2-1.cluster_storm (172.22.0.9): icmp_seq=2 ttl=64 time=18.8 ms
64 bytes from cluster-supervisor2-1.cluster_storm (172.22.0.9): icmp_seq=3 ttl=64 time=20.0 ms
64 bytes from cluster-supervisor2-1.cluster_storm (172.22.0.9): icmp_seq=4 ttl=64 time=20.4 ms
64 bytes from cluster-supervisor2-1.cluster_storm (172.22.0.9): icmp_seq=5 ttl=64 time=20.1 ms

--- cluster-supervisor2-1 ping statistics ---
5 packets transmitted, 5 received, 0% packet loss, time 4004ms
rtt min/avg/max/mdev = 18.767/24.353/42.486/9.083 ms

Benchmark Results

Metric FileReadWordCountSpoutCompressionTopo FileReadWordCountTopo Difference Better
Avg Transfer Rate (msg/s) 776,389 744,544 +31,845 (+4.3%) Compression
Peak Transfer Rate (msg/s) 805,700 790,300 +15,400 Compression
Avg Spout Throughput (acks/s) 98,167 92,844 +5,323 (+5.8%) Compression
Peak Spout Throughput (acks/s) 100,300 98,666 +1,634 Compression
Avg Complete Latency (ms) 362.48 376.73 -14.25 ms (-3.8%) Compression
Max Complete Latency (ms) 366.44 385.72 -19.28 ms Compression
Runtime Stability More consistent More fluctuation . Compression (see jitter in detail per each task in Grafana snapshots)

Grafana snapshots with v2 metrics per task

FileReadWordCountTopo

https://snapshots.raintank.io/dashboard/snapshot/T0Z6BqAnQlA0aMQMypNgAaHN70NZa0T4?orgId=0&from=2026-05-23T11:08:29.369Z&to=2026-05-23T11:16:41.770Z&timezone=browser&var-topology=FileReadWordCountTopo-2-1779534460&var-host=$__all&var-component=$__all&var-task=$__all&refresh=10s

FileReadWordCountSpoutCompressionTopo

https://snapshots.raintank.io/dashboard/snapshot/hIZ7uQuN4w0C6hunVmY1C9Hh0sGUYg0y?orgId=0&from=2026-05-23T11:03:22.690Z&to=2026-05-23T11:11:48.769Z&timezone=browser&var-topology=FileReadWordCountTopo-1-1779534165&var-host=$__all&var-component=$__all&var-task=$__all&refresh=10s

In the context of #8701

@GGraziadei GGraziadei changed the title 8701 component level tuple compression Add tuple compression for inter-worker communication May 23, 2026
Copy link
Copy Markdown
Contributor

@rzo1 rzo1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. The feature is well-scoped (opt-in, per-component, threshold-gated) and the test coverage is solid. The decompression-bomb defense via the bounded ZstdUtils.decompress cap is the right control. A few things to address, mostly around the receive path.

1. (Blocking) The blank catch (RuntimeException) in KryoTupleDeserializer.deserialize masks the security-cap signal.
The catch silently swallows decompress()'s "Decompression threshold exceeded! Possible security risk" rejection and re-parses the still-compressed bytes, surfacing it as a generic "Failed to deserialize tuple". A cap-exceeded payload is a valid zstd frame (it decompressed far enough to bust the limit), so it is definitely not a magic false-positive and must propagate, not fall back. Please distinguish "cap exceeded / malformed frame" (propagate the original exception) from a true header collision (the only case where retry-as-raw is justified), so the security-relevant rejection isn't lost.

Relatedly, the justifying comment is incorrect. zstd's magic 0xFD2FB528 is little-endian on the wire, so the byte isZstd matches at offset 0 is 0x28, not 0xFD, and a Kryo writeInt(taskId, true) of 40 produces exactly 0x28. What actually makes a collision astronomically unlikely is the second field (it would require streamId == 6069), not the taskId range. Please correct the comment so the reasoning matches reality.

2. Use a dedicated decompression cap for tuples.
The deserializer reuses storm.compression.zstd.max.decompressed.bytes (default 100 MB), which is shared with cluster-state serialization. 100 MB per tuple on the inter-worker hot path is very high. Please add a dedicated key (e.g. topology.tuple.compression.max.decompressed.bytes, @IsPositiveNumber) so the per-tuple bound can be tuned and lowered independently of cluster-state. A lower default would be worth discussing here.

3. Only enter the decompress branch when the topology actually uses compression.
The magic-byte check itself is effectively free, so no concern there. But the decompress branch is currently reachable for every topology's inter-worker traffic, including ones that never enable the feature. Since workers are per-topology, you can compute once at KryoTupleDeserializer construction whether any component in the topology enables compression (scan the merged component configs from the context's raw topology) and only take the decompress branch if so. Zero per-tuple cost, and topologies that don't use the feature never touch the zstd path at all. Good defense-in-depth alongside items 1 and 2.

4. Flux: the recommended per-component usage isn't expressible today.
The docs recommend enabling this per component via declarer.addConfiguration(...), but Flux has no per-component config mechanism. FluxBuilder only applies parallelism/memory/CPU/numTasks/groupings to the declarers, and config: is topology-scope only. So Flux users can only enable it topology-wide, which is the mode the PR advises against. Preferred fix: add a per-component config: map to BoltDef/SpoutDef, applied via declarer.addConfigurations(...) in FluxBuilder. If that's out of scope for this PR, please open a tracking issue before merge so it doesn't drift, and have docs/Serialization.md state explicitly that, via Flux, only topology-wide enablement is currently possible (with the config: snippet).

Minor:

  • Config.java javadoc for the threshold says "equals or exceeds ... will be compressed", but the code is dataLength > threshold (strict) and docs/Serialization.md says "at or below ... uncompressed". Align the Config.java javadoc to the code.
  • FileReadWordCountSpoutCompressionTopo.TOPOLOGY_NAME = "FileReadWordCountTopo" collides with the existing example, so the two can't coexist on a cluster. Rename.
  • Tests pass threshold = 0 directly into the conf map. With @IsPositiveNumber that value may be rejected by real config validation, which is fine for the test, just noting it isn't a usable production value.

@rzo1 rzo1 added this to the 3.0.0 milestone May 24, 2026
@rzo1 rzo1 requested review from asf-security-reporting and reiabreu and removed request for asf-security-reporting May 24, 2026 17:34
@reiabreu
Copy link
Copy Markdown
Contributor

I'll review this ASAP

@GGraziadei
Copy link
Copy Markdown
Contributor Author

Thanks for the detailed and insightful review @rzo1! I have addressed all your points and pushed the updates:

Refactored the catch block in KryoTupleDeserializer#deserialize by inverting the logic to handle security risks correctly. If a decompression bomb is detected (the exception message contains "Decompression threshold exceeded"), the RuntimeException is now propagated immediately instead of being swallowed. For any other decompression failure, which indicates a true header collision where raw Kryo bytes matched the zstd magic header by chance, the code safely falls back to deserializeTuple(ser). Additionally, the comment was updated to accurately reflect the little-endian nature of the zstd magic header on the wire.
Optimized the deserializer path. It now computes isCompressionEnabled once at construction time by scanning the topology and component configurations, ensuring zero per-tuple cost for topologies that do not use compression.
Since the Flux per-component configuration mechanism is out of scope for this PR, I have opened a tracking issue here: #8710 and updated docs/Serialization.md to explicitly state that Flux currently only supports topology-wide enablement.

@reiabreu, the PR is ready for your review whenever you have time. Standing by for any further feedback or changes you might suggest!

// on the wire, the first byte checked is 0x28. A Kryo writeInt(taskId, true) of 40 yields exactly 0x28.
// The collision is prevented not by the taskId range, but by the second field (streamId),
// which would rigidly have to equal 6069 to match the remaining magic bytes.
// Branch retained for correctness in case of an accidental collision.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want a log message here?

byte[] rawBytes = kryoOut.getBuffer();
int dataLength = kryoOut.position();

if (this.isCompressionEnabled && dataLength > this.compressionThreshold) {
Copy link
Copy Markdown
Contributor

@reiabreu reiabreu May 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran this by AI and it proposed a low level optimization that is very interesting

Minor area where we can optimize memory allocations and garbage collection overhead. In the current compression path, we perform two allocations and two copy operations before the final payload is returned:

  1. Arrays.copyOf(rawBytes, dataLength) inside KryoTupleSerializer to extract the active slice.
  2. bos.toByteArray() inside Utils.ZstdUtils.compress to extract the compressed data.

We can cut this in half by introducing an offset-aware signature for Utils.ZstdUtils.compress . This lets us stream the active slice of the pre-allocated Kryo buffer directly into Zstd, avoiding the first copy entirely.

Here is the proposed change:

1. In Utils.java ( ZstdUtils class)

Add a helper signature that accepts an offset and length :

public static byte[] compress(byte[] data, int offset, int length, int compressionLevel) {
    if (data == null || length == 0) {
        return new byte[0];
    }

    try (ByteArrayOutputStream bos = new ByteArrayOutputStream(length)) {
        try (ZstdCompressorOutputStream zstdOut = ZstdCompressorOutputStream.builder()
                .setOutputStream(bos)
                .setBufferSize(BUFFER_SIZE)
                .setLevel(compressionLevel)
                .get()) {
            zstdOut.write(data, offset, length); // Write the slice directly
            zstdOut.finish();
        }
        return bos.toByteArray();
    } catch (IOException e) {
        throw new RuntimeException("Zstd compression failed", e);
    }
}

// Keep the existing signature for backwards compatibility
public static byte[] compress(byte[] data, int compressionLevel) {
    return compress(data, 0, data == null ? 0 : data.length, compressionLevel);
}

2. In KryoTupleSerializer.java

Update the serialization call site to pass the raw buffer along with its offset and length directly:

-            if (this.isCompressionEnabled && dataLength > this.compressionThreshold) {
-                byte[] bytesToCompress = Arrays.copyOf(rawBytes, dataLength);
-                return Utils.ZstdUtils.compress(bytesToCompress, this.zstdCompressionLevel);
-            } else {
-                return Arrays.copyOf(rawBytes, dataLength);
-            }
+            if (this.isCompressionEnabled && dataLength > this.compressionThreshold) {
+                return Utils.ZstdUtils.compress(rawBytes, 0, dataLength, this.zstdCompressionLevel);
+            } else {
+                return Arrays.copyOf(rawBytes, dataLength);
+            }

This simple optimization reduces GC allocation and copy overhead by 50% for every compressed tuple on the serialization sid

@reiabreu
Copy link
Copy Markdown
Contributor

@GGraziadei Thanks for this contribution. It's elegant, clean and a very nice addition to Storm

Comment thread docs/Serialization.md
- **Intra-worker (local) traffic** bypasses `KryoTupleSerializer` altogether, so it is never compressed regardless of configuration. You do not pay any CPU cost for tuples that stay inside a worker process.
- **Inter-worker (remote) traffic** is compressed only when compression is enabled for the source component *and* the serialized tuple is larger than the configured threshold. Small tuples (single words, IDs, etc.) are left uncompressed, since the framing overhead of a compressed payload can exceed the original size.

#### Enabling compression per component
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you did some benchmarks what do you think about including them? With the obvious disclaimer that the tests were done in a limited capacity within the PR and should only used as a guide.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants