Skip to content
Open
3 changes: 3 additions & 0 deletions conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,12 @@ storm.nimbus.zookeeper.acls.fixup: true
storm.auth.simple-white-list.users: [ ]
storm.cluster.state.store: "org.apache.storm.cluster.ZKStateStorageFactory"
storm.meta.serialization.delegate: "org.apache.storm.serialization.ZstdBridgeThriftSerializationDelegate"
topology.tuple.compression.threshold: 1460
topology.tuple.compression.enable: false
storm.compression.zstd.level: 3
storm.compression.zstd.max.decompressed.bytes: 104857600
storm.compression.gzip.max.decompressed.bytes: 104857600
topology.tuple.compression.max.decompressed.bytes: 10485760
storm.codedistributor.class: "org.apache.storm.codedistributor.LocalFileSystemCodeDistributor"
storm.workers.artifacts.dir: "workers-artifacts"
storm.health.check.dir: "healthchecks"
Expand Down
56 changes: 56 additions & 0 deletions docs/Serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,62 @@ Beware that Java serialization is extremely expensive, both in terms of CPU cost

You can turn on/off the behavior to fall back on Java serialization by setting the `Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION` config to true/false. The default value is false for security reasons.

### Tuple compression

For inter-worker (remote) traffic, Storm can optionally compress serialized tuples with [Zstandard](https://facebook.github.io/zstd/) before they are sent over the network. This is intended for one specific scenario: components that emit **large** payloads to a remote worker, where the bytes saved on the wire outweigh the CPU cost of compression. A good example is a spout that emits entire lines of text to a downstream bolt running on a different worker.

Compression is **disabled by default** and follows the serialization lifecycle exactly:

- **Intra-worker (local) traffic** bypasses `KryoTupleSerializer` altogether, so it is never compressed regardless of configuration. You do not pay any CPU cost for tuples that stay inside a worker process.
- **Inter-worker (remote) traffic** is compressed only when compression is enabled for the source component *and* the serialized tuple is larger than the configured threshold. Small tuples (single words, IDs, etc.) are left uncompressed, since the framing overhead of a compressed payload can exceed the original size.

#### Enabling compression per component
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you did some benchmarks what do you think about including them? With the obvious disclaimer that the tests were done in a limited capacity within the PR and should only used as a guide.


Compression is controlled by the component-specific configuration `topology.tuple.compression.enable`. Because Storm merges component-specific configuration over the topology configuration, you can enable it for just the components that emit large tuples, leaving the rest of the topology untouched:

```java
TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(SPOUT_ID, new FileReadSpout(inputFile), spoutNum)
.addConfiguration(Config.TOPOLOGY_TUPLE_COMPRESSION_ENABLE, true);

builder.setBolt(SPLIT_ID, new SplitSentenceBolt(), spBoltNum)
.localOrShuffleGrouping(SPOUT_ID);
builder.setBolt(COUNT_ID, new CountBolt(), cntBoltNum)
.fieldsGrouping(SPLIT_ID, new Fields(SplitSentenceBolt.FIELDS));
```

You can also enable it topology-wide (or cluster-wide via `storm.yaml`) by setting `topology.tuple.compression.enable: true`, but enabling it only where large tuples are actually emitted is recommended.

#### Flux

> **Note:** With [Flux](flux.html), only **topology-wide** enablement is currently possible. Flux has no per-component configuration mechanism — `FluxBuilder` applies only parallelism, number of tasks, memory/CPU load, and groupings to the underlying declarers, and the `config:` block is topology-scoped. There is no Flux equivalent of `declarer.addConfiguration(...)`, so the per-component approach recommended above cannot be expressed in a Flux YAML definition.

To enable compression for a Flux topology, set it in the topology-level `config:` block:

```yaml
config:
topology.tuple.compression.enable: true
topology.tuple.compression.threshold: 1460
```

Be aware that this enables compression for *every* remote-bound tuple in the topology that exceeds the threshold.

#### Configuration reference

| Config | Default | Description |
| --- | --- | --- |
| `topology.tuple.compression.enable` | `false` | Enables Zstd compression of serialized tuples before remote transfer. Best set per component via `addConfiguration`. |
| `topology.tuple.compression.threshold` | `1460` | Minimum serialized tuple size, in bytes, before compression is attempted. Tuples at or below this size are sent uncompressed. The default matches the typical Ethernet TCP MSS, so payloads that already fit in a single network frame are never compressed. |
| `storm.compression.zstd.level` | `3` | Zstd compression level. Supported range is 1–19; levels 20–22 (ultra mode) are prohibited because of their memory requirements. |
| `topology.tuple.compression.max.decompressed.bytes` | `10485760` (10 MB) | Upper bound on the decompressed size of a single tuple. Decompression that would exceed this limit fails, guarding against malicious or corrupt payloads. |

#### How decompression works

Compression is self-describing on the wire, so **no extra configuration is required on the receiving side**. The deserializer inspects the leading bytes of each incoming payload: if they match the Zstd magic header it decompresses the payload (bounded by `topology.tuple.compression.max.decompressed.bytes`) before deserializing, otherwise it deserializes the bytes directly. A single deserializer therefore transparently handles a mix of compressed and uncompressed tuples.

As an optimization, the deserializer determines once — when the worker starts — whether *any* component in the topology enables compression (by scanning the merged per-component configurations). If none does, the magic-header check is skipped entirely and the Zstd code path is never touched, so topologies that do not use the feature pay no per-tuple cost. The corollary is that compression must be enabled somewhere in the topology config for compressed tuples to be decompressed on receipt; since the setting is part of the topology configuration shared by all of its workers, this is always the case for tuples produced within the same topology.

### Component-specific serialization registrations

Storm 0.7.0 lets you set component-specific configurations (read more about this at [Configuration](Configuration.html)). Of course, if one component defines a serialization that serialization will need to be available to other bolts -- otherwise they won't be able to receive messages from that component!
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/

package org.apache.storm.perf;

import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.perf.bolt.CountBolt;
import org.apache.storm.perf.bolt.SplitSentenceBolt;
import org.apache.storm.perf.spout.FileReadSpout;
import org.apache.storm.perf.utils.Helper;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;

/**
* This topo helps measure speed of word count.
*
* <p>Spout loads a file into memory on initialization, then emits the lines in an endless loop.
*/
public class FileReadWordCountSpoutCompressionTopo {
public static final String SPOUT_ID = "spout";
public static final String COUNT_ID = "counter";
public static final String SPLIT_ID = "splitter";
public static final String TOPOLOGY_NAME = "FileReadWordCountSpoutCompressionTopo";

// Config settings
public static final String SPOUT_NUM = "spout.count";
public static final String SPLIT_NUM = "splitter.count";
public static final String COUNT_NUM = "counter.count";
public static final String INPUT_FILE = "input.file";

public static final int DEFAULT_SPOUT_NUM = 1;
public static final int DEFAULT_SPLIT_BOLT_NUM = 2;
public static final int DEFAULT_COUNT_BOLT_NUM = 2;


static StormTopology getTopology(Map<String, Object> config) {

final int spoutNum = Helper.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
final int spBoltNum = Helper.getInt(config, SPLIT_NUM, DEFAULT_SPLIT_BOLT_NUM);
final int cntBoltNum = Helper.getInt(config, COUNT_NUM, DEFAULT_COUNT_BOLT_NUM);
final String inputFile = Helper.getStr(config, INPUT_FILE);

TopologyBuilder builder = new TopologyBuilder();
// sampledata/longrandomwords.txt contains sentences with at least 1500 bytes
builder.setSpout(SPOUT_ID, new FileReadSpout(inputFile), spoutNum)
.addConfiguration(Config.TOPOLOGY_TUPLE_COMPRESSION_ENABLE, true);
builder.setBolt(SPLIT_ID, new SplitSentenceBolt(), spBoltNum).localOrShuffleGrouping(SPOUT_ID);
builder.setBolt(COUNT_ID, new CountBolt(), cntBoltNum).fieldsGrouping(SPLIT_ID, new Fields(SplitSentenceBolt.FIELDS));

return builder.createTopology();
}

public static void main(String[] args) throws Exception {
int runTime = -1;
Config topoConf = new Config();
if (args.length > 0) {
runTime = Integer.parseInt(args[0]);
}
if (args.length > 1) {
topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
}
topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000);
topoConf.put(Config.TOPOLOGY_BOLT_WAIT_STRATEGY, "org.apache.storm.policy.WaitStrategyPark");
topoConf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 0);
topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);

topoConf.putAll(Utils.readCommandLineOpts());
if (args.length > 2) {
System.err.println("args: [runDurationSec] [optionalConfFile]");
return;
}
// Submit topology to storm cluster
Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
}
}
Loading
Loading