Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public void setUpstreamConfiguration(TransportConfiguration transportConfigurati
//The federated server that creates the upstream back will rely on its config from the acceptor for TLS
stripParam(params, TransportConstants.SSL_ENABLED_PROP_NAME);
stripParam(params, TransportConstants.SSL_PROVIDER);
stripParam(params, TransportConstants.SSL_HANDSHAKE_TIMEOUT);
stripParam(params, TransportConstants.KEYSTORE_PATH_PROP_NAME);
stripParam(params, TransportConstants.KEYSTORE_PASSWORD_PROP_NAME);
stripParam(params, TransportConstants.KEYSTORE_PROVIDER_PROP_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ public class TransportConstants {

public static final String SSL_ENABLED_PROP_NAME = "sslEnabled";

public static final String SSL_HANDSHAKE_TIMEOUT = "sslHandshakeTimeout";

public static final int DEFAULT_SSL_HANDSHAKE_TIMEOUT = 10;

public static final String PROXY_PROTOCOL_ENABLED_PROP_NAME = "proxyProtocolEnabled";

public static final String SSL_AUTO_RELOAD_PROP_NAME = "sslAutoReload";
Expand Down Expand Up @@ -436,6 +440,7 @@ private static int parseDefaultVariable(String variableName, int defaultValue) {
Set<String> allowableAcceptorKeys = new HashSet<>();
allowableAcceptorKeys.add(TransportConstants.SSL_ENABLED_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.SSL_AUTO_RELOAD_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.SSL_HANDSHAKE_TIMEOUT);
allowableAcceptorKeys.add(TransportConstants.HTTP_RESPONSE_TIME_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ public class NettyAcceptor extends AbstractAcceptor {

private final boolean sslEnabled;

private final int sslHandshakeTimeout;

private final boolean proxyProtocolEnabled;

private final boolean useInvm;
Expand Down Expand Up @@ -301,6 +303,8 @@ public NettyAcceptor(final String name,

sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, TransportConstants.DEFAULT_SSL_ENABLED, configuration);

sslHandshakeTimeout = ConfigurationHelper.getIntProperty(TransportConstants.SSL_HANDSHAKE_TIMEOUT, TransportConstants.DEFAULT_SSL_HANDSHAKE_TIMEOUT, configuration);

proxyProtocolEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.PROXY_PROTOCOL_ENABLED_PROP_NAME, TransportConstants.DEFAULT_PROXY_PROTOCOL_ENABLED, configuration);

remotingThreads = ConfigurationHelper.getIntProperty(TransportConstants.NIO_REMOTING_THREADS_PROPNAME, Runtime.getRuntime().availableProcessors() * 3, configuration);
Expand Down Expand Up @@ -738,7 +742,9 @@ public SslHandler getSslHandler(ByteBufAllocator alloc, String peerHost, int pee
engine.setSSLParameters(sslParameters);
}

return new SslHandler(engine);
SslHandler sslHandler = new SslHandler(engine);
sslHandler.setHandshakeTimeout(sslHandshakeTimeout, TimeUnit.SECONDS);
return sslHandler;
}

private SSLEngine loadJdkSslEngine(String peerHost, int peerPort) throws Exception {
Expand All @@ -758,6 +764,9 @@ private void checkSSLConfiguration() throws IllegalArgumentException {
if (keyStorePath == null && keyStoreProvider == null) {
throw new IllegalArgumentException("If \"" + TransportConstants.SSL_ENABLED_PROP_NAME + "\" is true then \"" + TransportConstants.KEYSTORE_PATH_PROP_NAME + "\" must be non-null unless an alternative \"" + TransportConstants.KEYSTORE_PROVIDER_PROP_NAME + "\" has been specified.");
}
if (sslHandshakeTimeout < 0) {
throw new IllegalArgumentException("\"" + TransportConstants.SSL_HANDSHAKE_TIMEOUT + "\" value must be >= 0");
}
}

private SSLEngine loadOpenSslEngine(ByteBufAllocator alloc, String peerHost, int peerPort) throws Exception {
Expand Down
6 changes: 6 additions & 0 deletions docs/user-manual/configuring-transports.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,12 @@ Must be `true` to have the broker 'watch' an acceptors keyStorePath and/or trust
The watch period is controlled by xref:config-reload.adoc#configuration-reload[the configuration reload feature].
Default is `false`.

sslHandshakeTimeout::
This is only valid for acceptors.
It is the number of seconds the broker will wait for a client to complete the SSL handshake before closing the connection.
Any non-negative integer value is valid, where `0` disables the timeout.
Default is `10`.

keyStorePath::
When used on an `acceptor` this is the path to the SSL key store on the server which holds the server's certificates (whether self-signed or signed by an authority).
+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,47 @@ public void onTransportError(Throwable cause) {
server.stop();
}
}

@Test
public void testSslHandshakeTimeoutWithValueSet() throws Exception {
final int SSL_HANDSHAKE_TIMEOUT = 20;

String url = "tcp://127.0.0.1:61616?sslEnabled=true;keyStorePath=server-ca-truststore.p12;keyStorePassword=securepass;sslHandshakeTimeout=20;handshake-timeout=0";
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultConfig(false)
.clearAcceptorConfigurations()
.addAcceptorConfiguration("netty", url)));
server.start();

TransportConfiguration tc = server.getConfiguration().getAcceptorConfigurations().iterator().next();
String host = (String) tc.getParams().get(TransportConstants.HOST_PROP_NAME);
String port = (String) tc.getParams().get(TransportConstants.PORT_PROP_NAME);
Object sslHandshakeTimeout = tc.getParams().get(TransportConstants.SSL_HANDSHAKE_TIMEOUT);
assertNotNull(sslHandshakeTimeout);
assertEquals(SSL_HANDSHAKE_TIMEOUT, Integer.parseInt(sslHandshakeTimeout.toString()));

NettyTransport transport = NettyTransportFactory.createTransport(new URI("tcp://" + host + ":" + port));
transport.setTransportListener(new NettyTransportListener() {
@Override
public void onData(ByteBuf incoming) {

}

@Override
public void onTransportClosed() {
}

@Override
public void onTransportError(Throwable cause) {
}

});

try {
transport.connect();
assertTrue(Wait.waitFor(() -> !transport.isConnected(), TimeUnit.SECONDS.toMillis(SSL_HANDSHAKE_TIMEOUT + 1)), "Connection should be closed now");
} finally {
transport.close();
server.stop();
}
}
}