Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ public ConsumerInvokerProxy<T> createInvoker(ServiceInstance instance) {
value = invokerCache.get(key);
if (value == null || !value.isAvailable()) {
if (value != null && !value.isAvailable()) {
invokerCache.remove(key);
// CAS remove: only evict if the current cache slot is still the stale
// proxy we observed. Avoid blowing away a fresh proxy that another
// thread may have just installed (or that an earlier closeFuture hook
// would otherwise also race against).
invokerCache.remove(key, value);
}
RpcClient rpcClient;
try {
Expand All @@ -100,24 +104,30 @@ public ConsumerInvokerProxy<T> createInvoker(ServiceInstance instance) {
backendConfig.generateProtocolConfig(instance.getHost(), instance.getPort(),
backendConfig.getNetwork(), protocolType.getName(), backendConfig.getExtMap()));
ConsumerInvoker<T> originInvoker = rpcClient.createInvoker(consumerConfig);
value = new ConsumerInvokerProxy<>(FilterChain.buildConsumerChain(consumerConfig,
originInvoker), rpcClient);
invokerCache.put(key, value);
// When the rpcClient is cleaned up and closed during idle time,
// the corresponding map should also be cleaned up.
ConsumerInvokerProxy<T> created = new ConsumerInvokerProxy<>(
FilterChain.buildConsumerChain(consumerConfig, originInvoker), rpcClient);
invokerCache.put(key, created);
// Long-connection mode: the client is no longer evicted by an idle scanner.
// It is only closed on explicit shutdown or fatal transport error (or by
// the reconnect-check timer in RpcClusterClientManager). When that
// happens we still need to clean up the local invoker cache to avoid
// memory leak.
// Use CAS remove: if the cache slot has already been replaced by a newer
// proxy (e.g. after a series of rebuilds), this stale closeFuture must
// NOT evict that newer entry.
rpcClient.closeFuture().whenComplete((r, e) -> {
ConsumerInvokerProxy<T> remove = invokerCache.remove(key);
if (remove != null) {
logger.warn("Service [name=" + consumerConfig.getServiceInterface()
boolean removed = invokerCache.remove(key, created);
if (removed && logger.isDebugEnabled()) {
logger.debug("Service [name=" + consumerConfig.getServiceInterface()
.getName()
+ ", naming=" + backendConfig.getNamingOptions()
.getServiceNaming()
+ ")], remove rpc client invoker["
+ remove.getInvoker().getProtocolConfig().toSimpleString()
+ "], remove rpc client invoker["
+ created.getInvoker().getProtocolConfig().toSimpleString()
+ "], due to rpc client close");
}
});
return value;
return created;
} catch (Exception ex) {
throw TRpcException.newFrameException(ErrorCode.TRPC_INVOKE_UNKNOWN_ERR,
"Service(name=" + consumerConfig.getServiceInterface().getName()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ public class Constants {

public static final String DEFAULT_KEEP_ALIVE = "true";
/**
* Default shared IO thread pool true
* Default shared IO thread pool: true. The shared pool now supports both NIO and
* Epoll: each transport joins one of two reference-counted shared groups based on
* {@code Epoll.isAvailable() && config.useEpoll()}, so enabling epoll no longer forces
* the user to also flip {@code ioThreadGroupShare} off. Server-side transports do not
* consult this flag (they always own their own group).
*/
public static final String DEFAULT_IO_THREAD_GROUPSHARE = "true";
/**
Expand Down Expand Up @@ -119,9 +123,34 @@ public class Constants {
*/
public static final String DEFAULT_BUFFER_SIZE = "16384";
/**
* Default client idle timeout 3 minutes
* Default client idle timeout 3 minutes. Drives the read-idle close handler installed
* by {@code NettyTcpClientTransport}: a long connection that has not received any
* inbound bytes for this duration is recycled by the client (the next request goes
* through lazy reconnect). Half-dead detection in faster paths is delegated to the
* configurable TCP keepalive parameters below; the read-idle handler is the slow
* universal fallback that also covers macOS / Windows where TCP keepalive tuning is
* not available.
*/
public static final String DEFAULT_IDLE_TIMEOUT = "180000";
/**
* Default TCP keepalive idle (Linux {@code TCP_KEEPIDLE}) in seconds: 30 s without
* traffic on the socket before the kernel starts emitting keepalive probes. Together
* with {@link #DEFAULT_TCP_KEEPALIVE_INTVL} and {@link #DEFAULT_TCP_KEEPALIVE_CNT}
* (Dubbo-style 30/10/3) this yields a half-dead detection window of roughly 60 s on
* Linux + epoll (30 + 10 × 3 = 60). Value 0 leaves the OS default (Linux: 7200 s).
*/
public static final String DEFAULT_TCP_KEEPALIVE_IDLE = "30";
/**
* Default TCP keepalive interval (Linux {@code TCP_KEEPINTVL}) in seconds between
* consecutive keepalive probes after the idle window has elapsed.
*/
public static final String DEFAULT_TCP_KEEPALIVE_INTVL = "10";
/**
* Default TCP keepalive probe count (Linux {@code TCP_KEEPCNT}): the number of
* unacknowledged keepalive probes after which the kernel marks the connection dead
* and emits a RST.
*/
public static final String DEFAULT_TCP_KEEPALIVE_CNT = "3";
/**
* Default server idle timeout 4 minutes
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,9 @@ public ProtocolConfig generateProtocolConfig(String host, int port, String netwo
config.setPort(port);
config.setNetwork(network);
config.setIdleTimeout(idleTimeout);
config.setTcpKeepAliveIdle(tcpKeepAliveIdle);
config.setTcpKeepAliveIntvl(tcpKeepAliveIntvl);
config.setTcpKeepAliveCnt(tcpKeepAliveCnt);
config.setCharset(charset);
config.setLazyinit(lazyinit);
config.setConnsPerAddr(connsPerAddr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,28 @@ public class BaseProtocolConfig implements Serializable, Cloneable {
*/
@ConfigProperty(value = Constants.DEFAULT_IDLE_TIMEOUT, type = Integer.class, override = true, moreZero = false)
protected Integer idleTimeout;
/**
* TCP keepalive idle in seconds (Linux {@code TCP_KEEPIDLE}). Applied only on
* Linux + epoll TCP client transports. Value 0 disables tuning and leaves the OS
* default in place (Linux ≈ 7200 s).
*/
@ConfigProperty(value = Constants.DEFAULT_TCP_KEEPALIVE_IDLE, type = Integer.class, override = true,
moreZero = false)
protected Integer tcpKeepAliveIdle;
/**
* TCP keepalive probe interval in seconds (Linux {@code TCP_KEEPINTVL}). Applied only
* on Linux + epoll TCP client transports. Value 0 disables tuning.
*/
@ConfigProperty(value = Constants.DEFAULT_TCP_KEEPALIVE_INTVL, type = Integer.class, override = true,
moreZero = false)
protected Integer tcpKeepAliveIntvl;
/**
* TCP keepalive probe count (Linux {@code TCP_KEEPCNT}). Applied only on Linux +
* epoll TCP client transports. Value 0 disables tuning.
*/
@ConfigProperty(value = Constants.DEFAULT_TCP_KEEPALIVE_CNT, type = Integer.class, override = true,
moreZero = false)
protected Integer tcpKeepAliveCnt;
/**
* Whether to delay initialization.
*/
Expand Down Expand Up @@ -303,6 +325,33 @@ public void setIdleTimeout(Integer idleTimeout) {
this.idleTimeout = idleTimeout;
}

public Integer getTcpKeepAliveIdle() {
return tcpKeepAliveIdle;
}

public void setTcpKeepAliveIdle(Integer tcpKeepAliveIdle) {
checkFiledModifyPrivilege();
this.tcpKeepAliveIdle = tcpKeepAliveIdle;
}

public Integer getTcpKeepAliveIntvl() {
return tcpKeepAliveIntvl;
}

public void setTcpKeepAliveIntvl(Integer tcpKeepAliveIntvl) {
checkFiledModifyPrivilege();
this.tcpKeepAliveIntvl = tcpKeepAliveIntvl;
}

public Integer getTcpKeepAliveCnt() {
return tcpKeepAliveCnt;
}

public void setTcpKeepAliveCnt(Integer tcpKeepAliveCnt) {
checkFiledModifyPrivilege();
this.tcpKeepAliveCnt = tcpKeepAliveCnt;
}

public Boolean getLazyinit() {
return lazyinit;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

package com.tencent.trpc.core.transport;

import com.google.common.collect.Lists;
import com.tencent.trpc.core.common.LifecycleBase;
import com.tencent.trpc.core.common.config.ProtocolConfig;
import com.tencent.trpc.core.exception.LifecycleException;
Expand All @@ -24,6 +23,7 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

Expand Down Expand Up @@ -64,7 +64,11 @@ public abstract class AbstractClientTransport implements ClientTransport {
*/
protected AtomicInteger channelIdx = new AtomicInteger(0);
/**
* List of channels.
* Channel pool. Backed by {@link CopyOnWriteArrayList} so that the slot publication done by
* {@link #ensureChannelActive} (under {@link #connLock}) is visible to concurrent readers in
* {@link #getChannel0} with volatile semantics, eliminating the data race that an
* {@link java.util.ArrayList} would have. Size is bounded by {@code connsPerAddr}; writes
* are infrequent (slot rebuild on disconnect) so the COW copy cost is negligible.
*/
protected List<ChannelFutureItem> channels;
/**
Expand All @@ -80,7 +84,7 @@ public AbstractClientTransport(ProtocolConfig config, ChannelHandler handler,
this.config = Objects.requireNonNull(config, "config is null");
this.handler = Objects.requireNonNull(handler, "handler is null");
this.codec = clientCodec;
this.channels = Lists.newArrayListWithExpectedSize(config.getConnsPerAddr());
this.channels = new CopyOnWriteArrayList<>();
}

/**
Expand Down Expand Up @@ -253,21 +257,93 @@ protected void ensureChannelActive(int chIndex) {
ChannelFutureItem curChannelItem = channels.get(chIndex);
// initiate a new connection creation action when the connection is not yet established or the connection
// is broken
boolean futureIsDoneAndNotConnected =
!curChannelItem.isAvailable() && !curChannelItem.isConnecting();
// not available and not establishing a connection
if (futureIsDoneAndNotConnected || curChannelItem.isNotYetConnect()) {
if (!needsReconnect(curChannelItem)) {
return;
}
connLock.lock();
try {
// Double-check inside the lock: another thread may have already replaced this slot
// with a fresh ChannelFutureItem (either still connecting or already connected).
// Without this check, a thundering-herd of requests arriving right after a
// disconnect would each rebuild the slot, producing a connect/disconnect storm
// against the peer and a burst of short-lived TIME_WAIT sockets.
ChannelFutureItem latest = channels.get(chIndex);
if (!needsReconnect(latest)) {
return;
}
channels.set(chIndex, new ChannelFutureItem(createChannel().toCompletableFuture(), config));
try {
latest.close();
} catch (Exception ex) {
logger.error("close " + latest + " exception", ex);
}
} finally {
connLock.unlock();
}
}

/**
* A slot needs a fresh connection when it is either uninitialized (lazy-init not yet
* triggered) or its previous future has finished without producing a connected channel.
* A slot whose future is still in flight ({@code isConnecting()}) is left alone — the
* in-flight {@code bootstrap.connect} will publish the result into the same item.
*/
private static boolean needsReconnect(ChannelFutureItem item) {
if (item.isNotYetConnect()) {
return true;
}
return !item.isAvailable() && !item.isConnecting();
}

/**
* Invalidate the slot holding {@code target}: replace it with a blank placeholder
* ({@code channelFuture==null}, i.e. {@code isNotYetConnect=true}) so the next request
* unconditionally goes through {@link #ensureChannelActive} and rebuilds a fresh
* connection. The previous item is closed best-effort.
* <p>Called by the client-side idle handler so that <em>before</em> the actual
* {@code channel.close()} runs (asynchronously in the EventLoop), the request thread
* already sees the slot as needing a reconnect — eliminating the "request lands on a
* channel that is about to be closed" race window.</p>
*/
@Override
public void invalidateChannel(Channel target) {
if (target == null || channels == null || channels.isEmpty()) {
return;
}
// The list is bounded by connsPerAddr; a linear scan is fine.
for (int i = 0; i < channels.size(); i++) {
ChannelFutureItem item = channels.get(i);
if (item == null || item.channelFuture == null || !item.channelFuture.isDone()
|| item.channelFuture.isCompletedExceptionally()) {
continue;
}
Channel ch;
try {
ch = item.channelFuture.join();
} catch (Throwable ignore) {
continue;
}
if (ch != target) {
continue;
}
connLock.lock();
try {
channels.set(chIndex, new ChannelFutureItem(createChannel().toCompletableFuture(), config));
// Re-read under the lock to avoid clobbering a slot another thread already
// refreshed (e.g. concurrent reconnect).
ChannelFutureItem latest = channels.get(i);
if (latest != item) {
return;
}
channels.set(i, new ChannelFutureItem(null, config));
try {
curChannelItem.close();
item.close();
} catch (Exception ex) {
logger.error("close " + curChannelItem + " exception", ex);
logger.error("close invalidated " + item + " exception", ex);
}
} finally {
connLock.unlock();
}
return;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,22 @@ public interface ClientTransport {
*/
ChannelHandler getChannelHandler();

/**
* Mark the slot currently holding {@code channel} as invalidated so that the very next
* request observes "needs reconnect" instead of routing onto the about-to-be-closed
* channel. The default no-op keeps the contract optional for transports that do not
* support a slot/pool model.
* <p>This is the hook used by client-side idle handlers: when the idle handler decides to
* tear down a long-idle channel, it calls {@code invalidateChannel} <b>before</b>
* {@code channel.close()} so that the request-thread side cannot read the stale slot any
* longer. The actual TCP close happens asynchronously in the EventLoop afterwards.</p>
*
* @param channel the channel about to be closed; may be {@code null} (no-op).
*/
default void invalidateChannel(Channel channel) {
// default no-op
}

/**
* Get the remote address.
*/
Expand Down
Loading
Loading