diff --git a/core/src/main/java/io/grpc/internal/ClientTransportFactory.java b/core/src/main/java/io/grpc/internal/ClientTransportFactory.java index 6c10ced4652..dffd18ae5c7 100644 --- a/core/src/main/java/io/grpc/internal/ClientTransportFactory.java +++ b/core/src/main/java/io/grpc/internal/ClientTransportFactory.java @@ -24,6 +24,7 @@ import io.grpc.ChannelCredentials; import io.grpc.ChannelLogger; import io.grpc.HttpConnectProxiedSocketAddress; +import io.grpc.MetricRecorder; import java.io.Closeable; import java.net.SocketAddress; import java.util.Collection; @@ -91,6 +92,7 @@ final class ClientTransportOptions { private Attributes eagAttributes = Attributes.EMPTY; @Nullable private String userAgent; @Nullable private HttpConnectProxiedSocketAddress connectProxiedSocketAddr; + @Nullable private MetricRecorder metricRecorder; public ChannelLogger getChannelLogger() { return channelLogger; @@ -101,6 +103,16 @@ public ClientTransportOptions setChannelLogger(ChannelLogger channelLogger) { return this; } + @Nullable + public MetricRecorder getMetricRecorder() { + return metricRecorder; + } + + public ClientTransportOptions setMetricRecorder(@Nullable MetricRecorder metricRecorder) { + this.metricRecorder = metricRecorder; + return this; + } + public String getAuthority() { return authority; } diff --git a/core/src/main/java/io/grpc/internal/InternalServer.java b/core/src/main/java/io/grpc/internal/InternalServer.java index a6079081233..d8cf764be15 100644 --- a/core/src/main/java/io/grpc/internal/InternalServer.java +++ b/core/src/main/java/io/grpc/internal/InternalServer.java @@ -18,6 +18,7 @@ import io.grpc.InternalChannelz.SocketStats; import io.grpc.InternalInstrumented; +import io.grpc.MetricRecorder; import java.io.IOException; import java.net.SocketAddress; import java.util.List; @@ -71,4 +72,9 @@ public interface InternalServer { */ @Nullable List> getListenSocketStatsList(); + /** + * Sets the MetricRecorder for the server. This optional method allows setting + * the MetricRecorder after construction but before start(). + */ + default void setMetricRecorder(MetricRecorder metricRecorder) {} } diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index 7a48bf642fe..ce31921e316 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -80,6 +80,7 @@ final class InternalSubchannel implements InternalInstrumented, Tr private final InternalChannelz channelz; private final CallTracer callsTracer; private final ChannelTracer channelTracer; + private final MetricRecorder metricRecorder; private final ChannelLogger channelLogger; private final boolean reconnectDisabled; @@ -191,6 +192,7 @@ protected void handleNotInUse() { this.scheduledExecutor = scheduledExecutor; this.connectingTimer = stopwatchSupplier.get(); this.syncContext = syncContext; + this.metricRecorder = metricRecorder; this.callback = callback; this.channelz = channelz; this.callsTracer = callsTracer; @@ -265,6 +267,7 @@ private void startNewTransport() { .setAuthority(eagChannelAuthority != null ? eagChannelAuthority : authority) .setEagAttributes(currentEagAttributes) .setUserAgent(userAgent) + .setMetricRecorder(metricRecorder) .setHttpConnectProxiedSocketAddress(proxiedAddr); TransportLogger transportLogger = new TransportLogger(); // In case the transport logs in the constructor, use the subchannel logId diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index dc0709e1fb8..0c422d3408e 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -48,6 +48,8 @@ import io.grpc.InternalServerInterceptors; import io.grpc.InternalStatus; import io.grpc.Metadata; +import io.grpc.MetricInstrumentRegistry; +import io.grpc.MetricRecorder; import io.grpc.ServerCall; import io.grpc.ServerCallExecutorSupplier; import io.grpc.ServerCallHandler; @@ -97,6 +99,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume private final InternalLogId logId; private final ObjectPool executorPool; + private final MetricRecorder metricRecorder; /** Executor for application processing. Safe to read after {@link #start()}. */ private Executor executor; private final HandlerRegistry registry; @@ -143,6 +146,9 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume InternalServer transportServer, Context rootContext) { this.executorPool = Preconditions.checkNotNull(builder.executorPool, "executorPool"); + this.metricRecorder = + new MetricRecorderImpl(builder.metricSinks, MetricInstrumentRegistry.getDefaultRegistry()); + this.registry = Preconditions.checkNotNull(builder.registryBuilder.build(), "registryBuilder"); this.fallbackRegistry = Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry"); @@ -182,6 +188,7 @@ public ServerImpl start() throws IOException { // Start and wait for any ports to actually be bound. ServerListenerImpl listener = new ServerListenerImpl(); + transportServer.setMetricRecorder(metricRecorder); transportServer.start(listener); executor = Preconditions.checkNotNull(executorPool.getObject(), "executor"); started = true; diff --git a/core/src/main/java/io/grpc/internal/ServerImplBuilder.java b/core/src/main/java/io/grpc/internal/ServerImplBuilder.java index f6566e067db..64cb238fbf4 100644 --- a/core/src/main/java/io/grpc/internal/ServerImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/ServerImplBuilder.java @@ -31,6 +31,7 @@ import io.grpc.HandlerRegistry; import io.grpc.InternalChannelz; import io.grpc.InternalConfiguratorRegistry; +import io.grpc.MetricSink; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.ServerCallExecutorSupplier; @@ -80,6 +81,7 @@ public static ServerBuilder forPort(int port) { final List transportFilters = new ArrayList<>(); final List interceptors = new ArrayList<>(); private final List streamTracerFactories = new ArrayList<>(); + final List metricSinks = new ArrayList<>(); private final ClientTransportServersBuilder clientTransportServersBuilder; HandlerRegistry fallbackRegistry = DEFAULT_FALLBACK_REGISTRY; ObjectPool executorPool = DEFAULT_EXECUTOR_POOL; @@ -157,6 +159,14 @@ public ServerImplBuilder intercept(ServerInterceptor interceptor) { return this; } + /** + * Adds a MetricSink to the server. + */ + public ServerImplBuilder addMetricSink(MetricSink metricSink) { + metricSinks.add(checkNotNull(metricSink, "metricSink")); + return this; + } + @Override public ServerImplBuilder addStreamTracerFactory(ServerStreamTracer.Factory factory) { streamTracerFactories.add(checkNotNull(factory, "factory")); diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java index 258aa15b005..e64f1065681 100644 --- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java @@ -856,6 +856,7 @@ public void run() { localSocketPicker, channelLogger, useGetForSafeMethods, + options.getMetricRecorder(), Ticker.systemTicker()); return transport; } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index 8ebf89842ad..855921d99a2 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -30,6 +30,7 @@ import io.grpc.InternalChannelz; import io.grpc.InternalStatus; import io.grpc.Metadata; +import io.grpc.MetricRecorder; import io.grpc.Status; import io.grpc.StatusException; import io.grpc.internal.ClientStreamListener.RpcProgress; @@ -123,6 +124,7 @@ class NettyClientHandler extends AbstractNettyHandler { private final Supplier stopwatchFactory; private final TransportTracer transportTracer; private final Attributes eagAttributes; + private final TcpMetrics.Tracker tcpMetrics; private final String authority; private final InUseStateAggregator inUseState = new InUseStateAggregator() { @@ -164,7 +166,8 @@ static NettyClientHandler newHandler( Attributes eagAttributes, String authority, ChannelLogger negotiationLogger, - Ticker ticker) { + Ticker ticker, + MetricRecorder metricRecorder) { Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive"); Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize); Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder); @@ -194,7 +197,8 @@ static NettyClientHandler newHandler( eagAttributes, authority, negotiationLogger, - ticker); + ticker, + metricRecorder); } @VisibleForTesting @@ -214,7 +218,8 @@ static NettyClientHandler newHandler( Attributes eagAttributes, String authority, ChannelLogger negotiationLogger, - Ticker ticker) { + Ticker ticker, + MetricRecorder metricRecorder) { Preconditions.checkNotNull(connection, "connection"); Preconditions.checkNotNull(frameReader, "frameReader"); Preconditions.checkNotNull(lifecycleManager, "lifecycleManager"); @@ -269,7 +274,8 @@ static NettyClientHandler newHandler( pingCounter, ticker, maxHeaderListSize, - softLimitHeaderListSize); + softLimitHeaderListSize, + metricRecorder); } private NettyClientHandler( @@ -288,7 +294,8 @@ private NettyClientHandler( PingLimiter pingLimiter, Ticker ticker, int maxHeaderListSize, - int softLimitHeaderListSize) { + int softLimitHeaderListSize, + MetricRecorder metricRecorder) { super( /* channelUnused= */ null, decoder, @@ -350,6 +357,7 @@ public void onStreamClosed(Http2Stream stream) { } } }); + this.tcpMetrics = new TcpMetrics.Tracker(metricRecorder, "client"); } /** @@ -478,6 +486,7 @@ private void onRstStreamRead(int streamId, long errorCode) { @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + tcpMetrics.recordTcpInfo(ctx.channel()); logger.fine("Network channel being closed by the application."); if (ctx.channel().isActive()) { // Ignore notification that the socket was closed lifecycleManager.notifyShutdown( @@ -490,10 +499,17 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce /** * Handler for the Channel shutting down. */ + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + tcpMetrics.channelActive(ctx.channel()); + super.channelActive(ctx); + } + @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { try { logger.fine("Network channel is closed"); + tcpMetrics.channelInactive(ctx.channel()); Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason"); lifecycleManager.notifyShutdown(status, SimpleDisconnectError.UNKNOWN); final Status streamStatus; diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 53914b3c877..6585df42df3 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -34,6 +34,7 @@ import io.grpc.InternalLogId; import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.grpc.MetricRecorder; import io.grpc.Status; import io.grpc.internal.ClientStream; import io.grpc.internal.ConnectionClientTransport; @@ -108,6 +109,7 @@ class NettyClientTransport implements ConnectionClientTransport, private final ChannelLogger channelLogger; private final boolean useGetForSafeMethods; private final Ticker ticker; + private final MetricRecorder metricRecorder; NettyClientTransport( @@ -132,6 +134,7 @@ class NettyClientTransport implements ConnectionClientTransport, LocalSocketPicker localSocketPicker, ChannelLogger channelLogger, boolean useGetForSafeMethods, + MetricRecorder metricRecorder, Ticker ticker) { this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator"); @@ -159,6 +162,7 @@ class NettyClientTransport implements ConnectionClientTransport, this.logId = InternalLogId.allocate(getClass(), remoteAddress.toString()); this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger"); this.useGetForSafeMethods = useGetForSafeMethods; + this.metricRecorder = metricRecorder; this.ticker = Preconditions.checkNotNull(ticker, "ticker"); } @@ -251,7 +255,8 @@ public Runnable start(Listener transportListener) { eagAttributes, authorityString, channelLogger, - ticker); + ticker, + metricRecorder); ChannelHandler negotiationHandler = negotiator.newHandler(handler); diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java index 1cf67ea25ca..03c851333ba 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/netty/NettyServer.java @@ -21,6 +21,7 @@ import static io.netty.channel.ChannelOption.ALLOCATOR; import static io.netty.channel.ChannelOption.SO_KEEPALIVE; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; @@ -31,6 +32,7 @@ import io.grpc.InternalInstrumented; import io.grpc.InternalLogId; import io.grpc.InternalWithLogId; +import io.grpc.MetricRecorder; import io.grpc.ServerStreamTracer; import io.grpc.internal.InternalServer; import io.grpc.internal.ObjectPool; @@ -67,6 +69,7 @@ import java.util.concurrent.Callable; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; /** * Netty-based server implementation. @@ -93,6 +96,7 @@ class NettyServer implements InternalServer, InternalWithLogId { private final int maxMessageSize; private final int maxHeaderListSize; private final int softLimitHeaderListSize; + private MetricRecorder metricRecorder; private final long keepAliveTimeInNanos; private final long keepAliveTimeoutInNanos; private final long maxConnectionIdleInNanos; @@ -136,7 +140,8 @@ class NettyServer implements InternalServer, InternalWithLogId { long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos, int maxRstCount, long maxRstPeriodNanos, - Attributes eagAttributes, InternalChannelz channelz) { + Attributes eagAttributes, InternalChannelz channelz, + @Nullable MetricRecorder metricRecorder) { this.addresses = checkNotNull(addresses, "addresses"); this.channelFactory = checkNotNull(channelFactory, "channelFactory"); checkNotNull(channelOptions, "channelOptions"); @@ -172,6 +177,13 @@ class NettyServer implements InternalServer, InternalWithLogId { this.channelz = Preconditions.checkNotNull(channelz); this.logId = InternalLogId.allocate(getClass(), addresses.isEmpty() ? "No address" : String.valueOf(addresses)); + this.metricRecorder = metricRecorder; + } + + @VisibleForTesting + @Nullable + MetricRecorder getMetricRecorder() { + return metricRecorder; } @Override @@ -272,7 +284,8 @@ public void initChannel(Channel ch) { permitKeepAliveTimeInNanos, maxRstCount, maxRstPeriodNanos, - eagAttributes); + eagAttributes, + metricRecorder); ServerTransportListener transportListener; // This is to order callbacks on the listener, not to guard access to channel. synchronized (NettyServer.this) { diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java index eb3a6d9b538..9420a9c9514 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java @@ -32,6 +32,7 @@ import io.grpc.ExperimentalApi; import io.grpc.ForwardingServerBuilder; import io.grpc.Internal; +import io.grpc.MetricRecorder; import io.grpc.ServerBuilder; import io.grpc.ServerCredentials; import io.grpc.ServerStreamTracer; @@ -60,6 +61,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; import javax.net.ssl.SSLException; /** @@ -116,6 +118,7 @@ public final class NettyServerBuilder extends ForwardingServerBuilder streamTracerFactories) { assertEventLoopsAndChannelType(); @@ -737,7 +754,8 @@ NettyServer buildTransportServers( maxRstCount, maxRstPeriodNanos, eagAttributes, - this.serverImplBuilder.getChannelz()); + this.serverImplBuilder.getChannelz(), + metricRecorder); } @VisibleForTesting diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index 036fde55e2c..fff816dcd6e 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -42,6 +42,7 @@ import io.grpc.InternalMetadata; import io.grpc.InternalStatus; import io.grpc.Metadata; +import io.grpc.MetricRecorder; import io.grpc.ServerStreamTracer; import io.grpc.Status; import io.grpc.internal.GrpcUtil; @@ -127,6 +128,7 @@ class NettyServerHandler extends AbstractNettyHandler { private final Http2Connection.PropertyKey streamKey; private final ServerTransportListener transportListener; private final int maxMessageSize; + private final TcpMetrics.Tracker tcpMetrics; private final long keepAliveTimeInNanos; private final long keepAliveTimeoutInNanos; private final long maxConnectionAgeInNanos; @@ -174,7 +176,8 @@ static NettyServerHandler newHandler( long permitKeepAliveTimeInNanos, int maxRstCount, long maxRstPeriodNanos, - Attributes eagAttributes) { + Attributes eagAttributes, + MetricRecorder metricRecorder) { Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s", maxHeaderListSize); Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class); @@ -208,7 +211,8 @@ static NettyServerHandler newHandler( maxRstCount, maxRstPeriodNanos, eagAttributes, - Ticker.systemTicker()); + Ticker.systemTicker(), + metricRecorder); } static NettyServerHandler newHandler( @@ -234,7 +238,8 @@ static NettyServerHandler newHandler( int maxRstCount, long maxRstPeriodNanos, Attributes eagAttributes, - Ticker ticker) { + Ticker ticker, + MetricRecorder metricRecorder) { Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams); Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s", flowControlWindow); @@ -294,7 +299,8 @@ static NettyServerHandler newHandler( keepAliveEnforcer, autoFlowControl, rstStreamCounter, - eagAttributes, ticker); + eagAttributes, ticker, + metricRecorder); } private NettyServerHandler( @@ -318,7 +324,8 @@ private NettyServerHandler( boolean autoFlowControl, RstStreamCounter rstStreamCounter, Attributes eagAttributes, - Ticker ticker) { + Ticker ticker, + MetricRecorder metricRecorder) { super( channelUnused, decoder, @@ -362,6 +369,7 @@ public void onStreamClosed(Http2Stream stream) { checkArgument(maxMessageSize >= 0, "maxMessageSize must be non-negative: %s", maxMessageSize); this.maxMessageSize = maxMessageSize; + this.tcpMetrics = new TcpMetrics.Tracker(metricRecorder, "server"); this.keepAliveTimeInNanos = keepAliveTimeInNanos; this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos; this.maxConnectionIdleManager = maxConnectionIdleManager; @@ -663,6 +671,7 @@ void setKeepAliveManagerForTest(KeepAliveManager keepAliveManager) { */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { + tcpMetrics.channelInactive(ctx.channel()); try { if (keepAliveManager != null) { keepAliveManager.onTransportTermination(); diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java index 758ffeee5b1..c0e52b75876 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java @@ -25,6 +25,7 @@ import io.grpc.Attributes; import io.grpc.InternalChannelz.SocketStats; import io.grpc.InternalLogId; +import io.grpc.MetricRecorder; import io.grpc.ServerStreamTracer; import io.grpc.Status; import io.grpc.internal.ServerTransport; @@ -81,6 +82,7 @@ class NettyServerTransport implements ServerTransport { private final int maxRstCount; private final long maxRstPeriodNanos; private final Attributes eagAttributes; + private final MetricRecorder metricRecorder; private final List streamTracerFactories; private final TransportTracer transportTracer; @@ -105,7 +107,8 @@ class NettyServerTransport implements ServerTransport { long permitKeepAliveTimeInNanos, int maxRstCount, long maxRstPeriodNanos, - Attributes eagAttributes) { + Attributes eagAttributes, + MetricRecorder metricRecorder) { this.channel = Preconditions.checkNotNull(channel, "channel"); this.channelUnused = channelUnused; this.protocolNegotiator = Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator"); @@ -128,6 +131,7 @@ class NettyServerTransport implements ServerTransport { this.maxRstCount = maxRstCount; this.maxRstPeriodNanos = maxRstPeriodNanos; this.eagAttributes = Preconditions.checkNotNull(eagAttributes, "eagAttributes"); + this.metricRecorder = metricRecorder; SocketAddress remote = channel.remoteAddress(); this.logId = InternalLogId.allocate(getClass(), remote != null ? remote.toString() : null); } @@ -289,6 +293,7 @@ private NettyServerHandler createHandler( permitKeepAliveTimeInNanos, maxRstCount, maxRstPeriodNanos, - eagAttributes); + eagAttributes, + metricRecorder); } } diff --git a/netty/src/main/java/io/grpc/netty/TcpMetrics.java b/netty/src/main/java/io/grpc/netty/TcpMetrics.java new file mode 100644 index 00000000000..95cebec16ca --- /dev/null +++ b/netty/src/main/java/io/grpc/netty/TcpMetrics.java @@ -0,0 +1,385 @@ +/* + * Copyright 2026 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.netty; + +import io.grpc.DoubleHistogramMetricInstrument; +import io.grpc.LongCounterMetricInstrument; +import io.grpc.LongUpDownCounterMetricInstrument; +import io.grpc.MetricInstrument; +import io.grpc.MetricInstrumentRegistry; +import io.grpc.MetricRecorder; +import io.netty.channel.Channel; +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; + +/** + * Utility for collecting TCP metrics from Netty channels. + */ +final class TcpMetrics { + + private static final Metrics DEFAULT_METRICS; + + static { + boolean epollAvailable = false; + try { + Class epollClass = Class.forName("io.netty.channel.epoll.Epoll"); + Method isAvailableMethod = epollClass.getDeclaredMethod("isAvailable"); + epollAvailable = (Boolean) isAvailableMethod.invoke(null); + } catch (Throwable t) { + // Ignored + } + DEFAULT_METRICS = new Metrics(MetricInstrumentRegistry.getDefaultRegistry(), epollAvailable); + } + + static Metrics getDefaultMetrics() { + return DEFAULT_METRICS; + } + + static final class Metrics { + final LongCounterMetricInstrument connectionsCreated; + final LongUpDownCounterMetricInstrument connectionCount; + @Nullable final LongCounterMetricInstrument packetsRetransmitted; + @Nullable final LongCounterMetricInstrument recurringRetransmits; + @Nullable final DoubleHistogramMetricInstrument minRtt; + + Metrics(MetricInstrumentRegistry registry, boolean epollAvailable) { + List requiredLabels = Collections.singletonList("grpc.target"); + List optionalLabels = Arrays.asList( + "network.local.address", + "network.local.port", + "network.peer.address", + "network.peer.port"); + + connectionsCreated = safelyRegisterLongCounter(registry, + "grpc.tcp.connections_created", + "Number of TCP connections created.", + "{connection}", + requiredLabels, + optionalLabels); + + connectionCount = safelyRegisterLongUpDownCounter(registry, + "grpc.tcp.connection_count", + "Number of currently open TCP connections.", + "{connection}", + requiredLabels, + optionalLabels); + + if (epollAvailable) { + packetsRetransmitted = safelyRegisterLongCounter(registry, + "grpc.tcp.packets_retransmitted", + "Total number of packets retransmitted for a single TCP connection.", + "{packet}", + requiredLabels, + optionalLabels); + + recurringRetransmits = safelyRegisterLongCounter(registry, + "grpc.tcp.recurring_retransmits", + "Total number of unacknowledged packets to be retransmitted " + + "since the last acknowledgment.", + "{packet}", + requiredLabels, + optionalLabels); + + minRtt = safelyRegisterDoubleHistogram(registry, + "grpc.tcp.min_rtt", + "Minimum RTT observed for a single TCP connection.", + "s", + Arrays.asList(0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, + 5.0, 10.0, 25.0, 50.0, 100.0, 250.0), + requiredLabels, + optionalLabels); + } else { + packetsRetransmitted = null; + recurringRetransmits = null; + minRtt = null; + } + } + } + + /** + * Safe metric registration or retrieval for environments where TcpMetrics might + * be loaded multiple times (e.g., shaded and unshaded). + */ + private static LongCounterMetricInstrument safelyRegisterLongCounter( + MetricInstrumentRegistry registry, String name, String description, String unit, + List requiredLabelKeys, List optionalLabelKeys) { + try { + return registry.registerLongCounter(name, description, unit, requiredLabelKeys, + optionalLabelKeys, false); + } catch (IllegalStateException e) { + if (e.getMessage() != null && e.getMessage().contains("already exists")) { + for (MetricInstrument instrument : registry.getMetricInstruments()) { + if (instrument.getName().equals(name) + && instrument instanceof LongCounterMetricInstrument) { + return (LongCounterMetricInstrument) instrument; + } + } + } + throw e; + } + } + + private static LongUpDownCounterMetricInstrument safelyRegisterLongUpDownCounter( + MetricInstrumentRegistry registry, String name, String description, String unit, + List requiredLabelKeys, List optionalLabelKeys) { + try { + return registry.registerLongUpDownCounter(name, description, unit, requiredLabelKeys, + optionalLabelKeys, false); + } catch (IllegalStateException e) { + if (e.getMessage() != null && e.getMessage().contains("already exists")) { + for (MetricInstrument instrument : registry.getMetricInstruments()) { + if (instrument.getName().equals(name) + && instrument instanceof LongUpDownCounterMetricInstrument) { + return (LongUpDownCounterMetricInstrument) instrument; + } + } + } + throw e; + } + } + + private static DoubleHistogramMetricInstrument safelyRegisterDoubleHistogram( + MetricInstrumentRegistry registry, String name, String description, String unit, + List bucketBoundaries, List requiredLabelKeys, + List optionalLabelKeys) { + try { + return registry.registerDoubleHistogram(name, description, unit, bucketBoundaries, + requiredLabelKeys, optionalLabelKeys, false); + } catch (IllegalStateException e) { + if (e.getMessage() != null && e.getMessage().contains("already exists")) { + for (MetricInstrument instrument : registry.getMetricInstruments()) { + if (instrument.getName().equals(name) + && instrument instanceof DoubleHistogramMetricInstrument) { + return (DoubleHistogramMetricInstrument) instrument; + } + } + } + throw e; + } + } + + private static final class ChannelReflectionAccessor { + final Method tcpInfoMethod; + final Constructor tcpInfoConstructor; + final Method totalRetransMethod; + final Method retransmitsMethod; + final Method rttMethod; + + ChannelReflectionAccessor( + Method tcpInfoMethod, Constructor tcpInfoConstructor, Method totalRetransMethod, + Method retransmitsMethod, Method rttMethod) { + this.tcpInfoMethod = tcpInfoMethod; + this.tcpInfoConstructor = tcpInfoConstructor; + this.totalRetransMethod = totalRetransMethod; + this.retransmitsMethod = retransmitsMethod; + this.rttMethod = rttMethod; + } + } + + static final class Tracker { + private final MetricRecorder metricRecorder; + private final String target; + private final Metrics metrics; + private final String epollSocketChannelClassName; + private final String epollTcpInfoClassName; + private volatile ChannelReflectionAccessor channelReflectionAccessor; + private long lastTotalRetrans; + + Tracker(MetricRecorder metricRecorder, String target) { + this(metricRecorder, target, DEFAULT_METRICS); + } + + Tracker(MetricRecorder metricRecorder, String target, Metrics metrics) { + this(metricRecorder, target, metrics, + "io.netty.channel.epoll.EpollSocketChannel", + "io.netty.channel.epoll.EpollTcpInfo"); + } + + Tracker(MetricRecorder metricRecorder, String target, Metrics metrics, + String epollSocketChannelClassName, String epollTcpInfoClassName) { + this.metricRecorder = metricRecorder; + this.target = target; + this.metrics = metrics; + this.epollSocketChannelClassName = epollSocketChannelClassName; + this.epollTcpInfoClassName = epollTcpInfoClassName; + } + + private static final Map accessorCache = + new HashMap<>(); + + private static final long RECORD_INTERVAL_MILLIS; + + static { + long interval = 5; + try { + String flagValue = System.getProperty("io.grpc.netty.tcpMetricsRecordIntervalMinutes"); + if (flagValue != null) { + interval = Long.parseLong(flagValue); + } + } catch (NumberFormatException e) { + // Use default + } + RECORD_INTERVAL_MILLIS = java.util.concurrent.TimeUnit.MINUTES.toMillis(interval); + } + + private static final java.util.Random RANDOM = new java.util.Random(); + private io.netty.util.concurrent.ScheduledFuture reportTimer; + + void channelActive(Channel channel) { + if (metricRecorder != null && target != null) { + java.util.List labelValues = getLabelValues(channel); + metricRecorder.addLongCounter(metrics.connectionsCreated, 1, + Collections.singletonList(target), labelValues); + metricRecorder.addLongUpDownCounter(metrics.connectionCount, 1, + Collections.singletonList(target), labelValues); + scheduleNextReport(channel); + } + } + + private void scheduleNextReport(final Channel channel) { + if (RECORD_INTERVAL_MILLIS <= 0) { + return; + } + if (!channel.isActive()) { + return; + } + + double jitter = 0.1 + RANDOM.nextDouble(); // 10% to 110% + long delayMillis = (long) (RECORD_INTERVAL_MILLIS * jitter); + + try { + reportTimer = channel.eventLoop().schedule(new Runnable() { + @Override + public void run() { + if (channel.isActive()) { + Tracker.this.recordTcpInfo(channel); // Renamed from channelInactive to recordTcpInfo + scheduleNextReport(channel); // Re-arm + } + } + }, delayMillis, java.util.concurrent.TimeUnit.MILLISECONDS); + } catch (Throwable t) { + // Channel closed, event loop shut down, etc. + } + } + + void channelInactive(Channel channel) { + if (reportTimer != null) { + reportTimer.cancel(false); + } + if (metricRecorder != null && target != null) { + java.util.List labelValues = getLabelValues(channel); + metricRecorder.addLongUpDownCounter(metrics.connectionCount, -1, + Collections.singletonList(target), labelValues); + // Final collection on close + recordTcpInfo(channel, true); + } + } + + void recordTcpInfo(Channel channel) { + recordTcpInfo(channel, false); + } + + void recordTcpInfo(Channel channel, boolean isClosed) { + if (metricRecorder == null || target == null) { + return; + } + java.util.List labelValues = getLabelValues(channel); + try { + if (channelReflectionAccessor == null) { + if (!channel.getClass().getName().equals(epollSocketChannelClassName)) { + return; + } + synchronized (accessorCache) { + channelReflectionAccessor = accessorCache.get(epollTcpInfoClassName); + if (channelReflectionAccessor == null) { + Class tcpInfoClass = Class.forName(epollTcpInfoClassName); + Method tcpInfoMethod = channel.getClass().getMethod("tcpInfo", tcpInfoClass); + Constructor tcpInfoConstructor = tcpInfoClass.getDeclaredConstructor(); + Method totalRetransMethod = tcpInfoClass.getMethod("totalRetrans"); + Method retransmitsMethod = tcpInfoClass.getMethod("retransmits"); + Method rttMethod = tcpInfoClass.getMethod("rtt"); + + channelReflectionAccessor = new ChannelReflectionAccessor( + tcpInfoMethod, tcpInfoConstructor, totalRetransMethod, retransmitsMethod, + rttMethod); + accessorCache.put(epollTcpInfoClassName, channelReflectionAccessor); + } + } + } + + Object info = channelReflectionAccessor.tcpInfoConstructor.newInstance(); + channelReflectionAccessor.tcpInfoMethod.invoke(channel, info); + + long totalRetrans = (Long) channelReflectionAccessor.totalRetransMethod.invoke(info); + int retransmits = (Integer) channelReflectionAccessor.retransmitsMethod.invoke(info); + long rtt = (Long) channelReflectionAccessor.rttMethod.invoke(info); + + if (metrics.packetsRetransmitted != null) { + long delta = totalRetrans - lastTotalRetrans; + if (delta > 0) { + metricRecorder.addLongCounter(metrics.packetsRetransmitted, delta, + Collections.singletonList(target), labelValues); + } + lastTotalRetrans = totalRetrans; + } + if (isClosed && metrics.recurringRetransmits != null) { + metricRecorder.addLongCounter(metrics.recurringRetransmits, retransmits, + Collections.singletonList(target), labelValues); + } + if (metrics.minRtt != null) { + metricRecorder.recordDoubleHistogram(metrics.minRtt, + rtt / 1000000.0, // Convert microseconds to seconds + Collections.singletonList(target), labelValues); + } + } catch (Throwable t) { + // Epoll not available or error getting tcp_info, just ignore. + } + } + } + + private static java.util.List getLabelValues(Channel channel) { + String localAddress = ""; + String localPort = ""; + String peerAddress = ""; + String peerPort = ""; + + java.net.SocketAddress local = channel.localAddress(); + if (local instanceof java.net.InetSocketAddress) { + java.net.InetSocketAddress inetLocal = (java.net.InetSocketAddress) local; + localAddress = inetLocal.getAddress().getHostAddress(); + localPort = String.valueOf(inetLocal.getPort()); + } + + java.net.SocketAddress remote = channel.remoteAddress(); + if (remote instanceof java.net.InetSocketAddress) { + java.net.InetSocketAddress inetRemote = (java.net.InetSocketAddress) remote; + peerAddress = inetRemote.getAddress().getHostAddress(); + peerPort = String.valueOf(inetRemote.getPort()); + } + + return java.util.Arrays.asList(localAddress, localPort, peerAddress, peerPort); + } + + + private TcpMetrics() {} +} diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index 53598727efd..599aecf80b2 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -1165,7 +1165,8 @@ public Stopwatch get() { Attributes.EMPTY, "someauthority", null, - fakeClock().getTicker()); + fakeClock().getTicker(), + new io.grpc.MetricRecorder() {}); } @Override diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index db44c8f50fd..c4483256e48 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -251,6 +251,7 @@ public void setSoLingerChannelOption() throws IOException, GeneralSecurityExcept new SocketPicker(), new FakeChannelLogger(), false, + new io.grpc.MetricRecorder() {} , Ticker.systemTicker()); transports.add(transport); callMeMaybe(transport.start(clientTransportListener)); @@ -526,6 +527,7 @@ public void failingToConstructChannelShouldFailGracefully() throws Exception { new SocketPicker(), new FakeChannelLogger(), false, + new io.grpc.MetricRecorder() {} , Ticker.systemTicker()); transports.add(transport); @@ -1148,6 +1150,7 @@ private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int max new SocketPicker(), new FakeChannelLogger(), false, + new io.grpc.MetricRecorder() {} , Ticker.systemTicker()); transports.add(transport); return transport; @@ -1195,7 +1198,8 @@ private void startServer(int maxStreamsPerConnection, int maxHeaderListSize, MAX_RST_COUNT_DISABLED, 0, Attributes.EMPTY, - channelz); + channelz, + null); server.start(serverListener); address = TestUtils.testServerAddress((InetSocketAddress) server.getListenSocketAddress()); authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); diff --git a/netty/src/test/java/io/grpc/netty/NettyServerBuilderTest.java b/netty/src/test/java/io/grpc/netty/NettyServerBuilderTest.java index 797cfa95c0e..ca40a647457 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerBuilderTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerBuilderTest.java @@ -189,4 +189,15 @@ public void useNioTransport_shouldNotThrow() { builder.assertEventLoopsAndChannelType(); } + + @Test + public void setMetricRecorder_propagatedToServer() throws Exception { + io.grpc.MetricRecorder recorder = mock(io.grpc.MetricRecorder.class); + builder.setMetricRecorder(recorder); + + NettyServer server = builder.buildTransportServers( + ImmutableList.of()); + + assertThat(server.getMetricRecorder()).isSameInstanceAs(recorder); + } } diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java index 0d5a9bab176..a5ab42d15a6 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java @@ -1416,7 +1416,8 @@ protected NettyServerHandler newHandler() { maxRstCount, maxRstPeriodNanos, Attributes.EMPTY, - fakeClock().getTicker()); + fakeClock().getTicker(), + new io.grpc.MetricRecorder() {}); } @Override diff --git a/netty/src/test/java/io/grpc/netty/NettyServerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerTest.java index f9bda4c5af1..406c370cf58 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerTest.java @@ -161,7 +161,8 @@ class NoHandlerProtocolNegotiator implements ProtocolNegotiator { 0, 0, // ignore Attributes.EMPTY, - channelz); + channelz, + null); final SettableFuture serverShutdownCalled = SettableFuture.create(); ns.start(new ServerListener() { @Override @@ -218,7 +219,8 @@ public void multiPortStartStopGet() throws Exception { 0, 0, // ignore Attributes.EMPTY, - channelz); + channelz, + null); final SettableFuture shutdownCompleted = SettableFuture.create(); ns.start(new ServerListener() { @Override @@ -298,7 +300,8 @@ public void multiPortConnections() throws Exception { 0, 0, // ignore Attributes.EMPTY, - channelz); + channelz, + null); final SettableFuture shutdownCompleted = SettableFuture.create(); ns.start(new ServerListener() { @Override @@ -366,7 +369,8 @@ public void getPort_notStarted() { 0, 0, // ignore Attributes.EMPTY, - channelz); + channelz, + null); assertThat(ns.getListenSocketAddress()).isEqualTo(addr); assertThat(ns.getListenSocketAddresses()).isEqualTo(addresses); @@ -447,7 +451,8 @@ class TestProtocolNegotiator implements ProtocolNegotiator { 0, 0, // ignore eagAttributes, - channelz); + channelz, + null); ns.start(new ServerListener() { @Override public ServerTransportListener transportCreated(ServerTransport transport) { @@ -501,7 +506,8 @@ public void channelzListenSocket() throws Exception { 0, 0, // ignore Attributes.EMPTY, - channelz); + channelz, + null); final SettableFuture shutdownCompleted = SettableFuture.create(); ns.start(new ServerListener() { @Override @@ -649,7 +655,8 @@ private NettyServer getServer(List addr, EventLoopGroup ev) { 0, 0, // ignore Attributes.EMPTY, - channelz); + channelz, + null); } private static class NoopServerTransportListener implements ServerTransportListener { diff --git a/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java b/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java new file mode 100644 index 00000000000..de18137e767 --- /dev/null +++ b/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java @@ -0,0 +1,391 @@ +/* + * Copyright 2026 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.netty; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import io.grpc.MetricRecorder; +import io.netty.channel.Channel; +import io.netty.channel.EventLoop; +import io.netty.util.concurrent.ScheduledFuture; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +@RunWith(JUnit4.class) +public class TcpMetricsTest { + + @Rule public final MockitoRule mocks = MockitoJUnit.rule(); + + @Mock private MetricRecorder metricRecorder; + @Mock private Channel channel; + @Mock + private EventLoop eventLoop; + @Mock + private ScheduledFuture scheduledFuture; + + private TcpMetrics.Tracker metrics; + + @Before + public void setUp() { + when(channel.eventLoop()).thenReturn(eventLoop); + when(eventLoop.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))) + .thenAnswer(invocation -> scheduledFuture); + metrics = new TcpMetrics.Tracker(metricRecorder, "target1"); + } + + @Test + public void metricsInitialization_epollUnavailable() { + TcpMetrics.Metrics metrics = new TcpMetrics.Metrics( + io.grpc.MetricInstrumentRegistry.getDefaultRegistry(), false); + + org.junit.Assert.assertNotNull(metrics.connectionsCreated); + org.junit.Assert.assertNotNull(metrics.connectionCount); + org.junit.Assert.assertNull(metrics.packetsRetransmitted); + org.junit.Assert.assertNull(metrics.recurringRetransmits); + org.junit.Assert.assertNull(metrics.minRtt); + } + + @Test + public void metricsInitialization_epollAvailable() { + TcpMetrics.Metrics metrics = new TcpMetrics.Metrics( + io.grpc.MetricInstrumentRegistry.getDefaultRegistry(), true); + + org.junit.Assert.assertNotNull(metrics.connectionsCreated); + org.junit.Assert.assertNotNull(metrics.connectionCount); + org.junit.Assert.assertNotNull(metrics.packetsRetransmitted); + org.junit.Assert.assertNotNull(metrics.recurringRetransmits); + org.junit.Assert.assertNotNull(metrics.minRtt); + } + + @Test + public void safelyRegister_collision() { + io.grpc.MetricInstrumentRegistry registry = + io.grpc.MetricInstrumentRegistry.getDefaultRegistry(); + + // Explicitly register one metric to ensure collision path is triggered + try { + registry.registerLongCounter("grpc.tcp.connections_created", "desc", "unit", + Collections.emptyList(), Collections.emptyList(), false); + } catch (IllegalStateException e) { + // Already exists, which is fine for this test + } + + TcpMetrics.Metrics metrics = new TcpMetrics.Metrics(registry, true); + + org.junit.Assert.assertNotNull(metrics.connectionsCreated); + org.junit.Assert.assertNotNull(metrics.connectionCount); + org.junit.Assert.assertNotNull(metrics.minRtt); + } + + + + public static class FakeEpollTcpInfo { + long totalRetrans; + int retransmits; + long rtt; + + public void setValues(long totalRetrans, int retransmits, long rtt) { + this.totalRetrans = totalRetrans; + this.retransmits = retransmits; + this.rtt = rtt; + } + + public long totalRetrans() { + return totalRetrans; + } + + public int retransmits() { + return retransmits; + } + + public long rtt() { + return rtt; + } + } + + @Test + public void tracker_recordTcpInfo_reflectionSuccess() throws Exception { + MetricRecorder recorder = org.mockito.Mockito.mock(MetricRecorder.class); + TcpMetrics.Metrics metrics = new TcpMetrics.Metrics( + io.grpc.MetricInstrumentRegistry.getDefaultRegistry(), true); + + String fakeChannelName = ConfigurableFakeWithTcpInfo.class.getName(); + String fakeInfoName = FakeEpollTcpInfo.class.getName(); + + TcpMetrics.Tracker tracker = new TcpMetrics.Tracker(recorder, "target", metrics, + fakeChannelName, fakeInfoName); + + FakeEpollTcpInfo infoSource = new FakeEpollTcpInfo(); + infoSource.setValues(123, 4, 5000); + ConfigurableFakeWithTcpInfo channel = new ConfigurableFakeWithTcpInfo(infoSource); + channel.writeInbound("dummy"); + + tracker.channelInactive(channel); + + verify(recorder).addLongCounter(eq(metrics.packetsRetransmitted), eq(123L), any(), any()); + verify(recorder).addLongCounter(eq(metrics.recurringRetransmits), eq(4L), any(), any()); + verify(recorder).recordDoubleHistogram(eq(metrics.minRtt), eq(0.005), any(), any()); + } + + public static class ConfigurableFakeWithTcpInfo extends + io.netty.channel.embedded.EmbeddedChannel { + private final FakeEpollTcpInfo infoToCopy; + + public ConfigurableFakeWithTcpInfo(FakeEpollTcpInfo infoToCopy) { + this.infoToCopy = infoToCopy; + } + + public void tcpInfo(FakeEpollTcpInfo info) { + info.totalRetrans = infoToCopy.totalRetrans; + info.retransmits = infoToCopy.retransmits; + info.rtt = infoToCopy.rtt; + } + } + + @Test + public void tracker_reportsDeltas_correctly() throws Exception { + MetricRecorder recorder = org.mockito.Mockito.mock(MetricRecorder.class); + TcpMetrics.Metrics metrics = new TcpMetrics.Metrics( + io.grpc.MetricInstrumentRegistry.getDefaultRegistry(), true); + + String fakeChannelName = ConfigurableFakeWithTcpInfo.class.getName(); + String fakeInfoName = FakeEpollTcpInfo.class.getName(); + + TcpMetrics.Tracker tracker = new TcpMetrics.Tracker(recorder, "target", metrics, + fakeChannelName, fakeInfoName); + + FakeEpollTcpInfo infoSource = new FakeEpollTcpInfo(); + ConfigurableFakeWithTcpInfo channel = new ConfigurableFakeWithTcpInfo(infoSource); + + // 10 retransmits total + infoSource.setValues(10, 2, 1000); + tracker.recordTcpInfo(channel); + + verify(recorder).addLongCounter(eq(metrics.packetsRetransmitted), eq(10L), any(), any()); + + // 15 retransmits total (delta 5) + infoSource.setValues(15, 0, 1000); + tracker.recordTcpInfo(channel); + + verify(recorder).addLongCounter(eq(metrics.packetsRetransmitted), eq(5L), any(), any()); + + // 15 retransmits total (delta 0) - should NOT report + tracker.recordTcpInfo(channel); + // Verify no new interactions with this specific metric and value + // We can't easy verify "no interaction" for specific value without capturing. + verify(recorder, org.mockito.Mockito.times(1)).addLongCounter(eq(metrics.packetsRetransmitted), + eq(10L), any(), any()); + verify(recorder, org.mockito.Mockito.times(1)).addLongCounter(eq(metrics.packetsRetransmitted), + eq(5L), any(), any()); + // Total interactions for packetsRetransmitted should be 2 + verify(recorder, org.mockito.Mockito.times(2)).addLongCounter(eq(metrics.packetsRetransmitted), + anyLong(), any(), any()); + + // recurringRetransmits should NOT have been reported yet (periodic calls) + verify(recorder, org.mockito.Mockito.times(0)).addLongCounter(eq(metrics.recurringRetransmits), + anyLong(), any(), any()); + + // Close channel - should report recurringRetransmits + tracker.channelInactive(channel); + verify(recorder, org.mockito.Mockito.times(1)).addLongCounter(eq(metrics.recurringRetransmits), + eq(0L), // From last infoSource setValues(15, 0, 1000) + any(), any()); + } + + @Test + public void tracker_recordTcpInfo_reflectionFailure() { + MetricRecorder recorder = org.mockito.Mockito.mock(MetricRecorder.class); + TcpMetrics.Metrics metrics = new TcpMetrics.Metrics( + io.grpc.MetricInstrumentRegistry.getDefaultRegistry(), true); + + TcpMetrics.Tracker tracker = new TcpMetrics.Tracker(recorder, "target", metrics, + "non.existent.Class", "non.existent.Info"); + + Channel channel = org.mockito.Mockito.mock(Channel.class); + when(channel.isActive()).thenReturn(true); + + // Should catch exception and ignore + tracker.channelInactive(channel); + } + + @Test + public void registeredMetrics_haveCorrectOptionalLabels() { + java.util.List expectedOptionalLabels = Arrays.asList( + "network.local.address", + "network.local.port", + "network.peer.address", + "network.peer.port" + ); + + org.junit.Assert.assertEquals( + expectedOptionalLabels, + TcpMetrics.getDefaultMetrics().connectionsCreated.getOptionalLabelKeys()); + org.junit.Assert.assertEquals( + expectedOptionalLabels, + TcpMetrics.getDefaultMetrics().connectionCount.getOptionalLabelKeys()); + + if (TcpMetrics.getDefaultMetrics().packetsRetransmitted != null) { + org.junit.Assert.assertEquals( + expectedOptionalLabels, + TcpMetrics.getDefaultMetrics().packetsRetransmitted.getOptionalLabelKeys()); + org.junit.Assert.assertEquals( + expectedOptionalLabels, + TcpMetrics.getDefaultMetrics().recurringRetransmits.getOptionalLabelKeys()); + org.junit.Assert.assertEquals( + expectedOptionalLabels, TcpMetrics.getDefaultMetrics().minRtt.getOptionalLabelKeys()); + } + } + + @Test + public void channelActive_extractsLabels_ipv4() throws Exception { + + InetAddress localInet = InetAddress.getByAddress(new byte[] { 127, 0, 0, 1 }); + InetAddress remoteInet = InetAddress.getByAddress(new byte[] { 10, 0, 0, 1 }); + when(channel.localAddress()).thenReturn(new InetSocketAddress(localInet, 8080)); + when(channel.remoteAddress()).thenReturn(new InetSocketAddress(remoteInet, 443)); + + metrics.channelActive(channel); + + verify(metricRecorder).addLongCounter( + eq(TcpMetrics.getDefaultMetrics().connectionsCreated), eq(1L), + eq(Collections.singletonList("target1")), + eq(Arrays.asList( + localInet.getHostAddress(), "8080", remoteInet.getHostAddress(), "443"))); + verify(metricRecorder).addLongUpDownCounter( + eq(TcpMetrics.getDefaultMetrics().connectionCount), eq(1L), + eq(Collections.singletonList("target1")), + eq(Arrays.asList( + localInet.getHostAddress(), "8080", remoteInet.getHostAddress(), "443"))); + verifyNoMoreInteractions(metricRecorder); + } + + @Test + public void channelInactive_extractsLabels_ipv6() throws Exception { + + InetAddress localInet = InetAddress.getByAddress( + new byte[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1 }); + InetAddress remoteInet = InetAddress.getByAddress( + new byte[] { 32, 1, 13, -72, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1 }); + when(channel.localAddress()).thenReturn(new InetSocketAddress(localInet, 8080)); + when(channel.remoteAddress()).thenReturn(new InetSocketAddress(remoteInet, 443)); + + metrics.channelInactive(channel); + + verify(metricRecorder).addLongUpDownCounter( + eq(TcpMetrics.getDefaultMetrics().connectionCount), eq(-1L), + eq(Collections.singletonList("target1")), + eq(Arrays.asList( + localInet.getHostAddress(), "8080", remoteInet.getHostAddress(), "443"))); + verifyNoMoreInteractions(metricRecorder); + } + + @Test + public void channelActive_extractsLabels_nonInetAddress() throws Exception { + SocketAddress dummyAddress = new SocketAddress() {}; + when(channel.localAddress()).thenReturn(dummyAddress); + when(channel.remoteAddress()).thenReturn(dummyAddress); + + metrics.channelActive(channel); + + verify(metricRecorder).addLongCounter( + eq(TcpMetrics.getDefaultMetrics().connectionsCreated), eq(1L), + eq(Collections.singletonList("target1")), + eq(Arrays.asList("", "", "", ""))); + verify(metricRecorder).addLongUpDownCounter( + eq(TcpMetrics.getDefaultMetrics().connectionCount), eq(1L), + eq(Collections.singletonList("target1")), + eq(Arrays.asList("", "", "", ""))); + verifyNoMoreInteractions(metricRecorder); + } + + @Test + public void channelActive_incrementsCounts() { + metrics.channelActive(channel); + verify(metricRecorder).addLongCounter( + eq(TcpMetrics.getDefaultMetrics().connectionsCreated), eq(1L), + eq(Collections.singletonList("target1")), + eq(Arrays.asList("", "", "", ""))); + verify(metricRecorder).addLongUpDownCounter( + eq(TcpMetrics.getDefaultMetrics().connectionCount), eq(1L), + eq(Collections.singletonList("target1")), + eq(Arrays.asList("", "", "", ""))); + verifyNoMoreInteractions(metricRecorder); + } + + @Test + public void channelInactive_decrementsCount_noEpoll_noError() { + metrics.channelInactive(channel); + verify(metricRecorder).addLongUpDownCounter( + eq(TcpMetrics.getDefaultMetrics().connectionCount), eq(-1L), + eq(Collections.singletonList("target1")), + eq(Arrays.asList("", "", "", ""))); + verifyNoMoreInteractions(metricRecorder); + } + + @Test + public void channelActive_schedulesReportTimer() { + when(channel.isActive()).thenReturn(true); + metrics.channelActive(channel); + + ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class); + ArgumentCaptor delayCaptor = ArgumentCaptor.forClass(Long.class); + verify(eventLoop).schedule( + runnableCaptor.capture(), delayCaptor.capture(), eq(TimeUnit.MILLISECONDS)); + + Runnable task = runnableCaptor.getValue(); + long delay = delayCaptor.getValue(); + + // Default RECORD_INTERVAL_MILLIS is 5 minutes (300,000 ms) + // Jitter is 10% to 110%, so 30,000 ms to 330,000 ms + org.junit.Assert.assertTrue("Delay should be >= 30000 but was " + delay, delay >= 30_000); + org.junit.Assert.assertTrue("Delay should be <= 330000 but was " + delay, delay <= 330_000); + + // Run the task to verify rescheduling + task.run(); + + verify(eventLoop, org.mockito.Mockito.times(2)) + .schedule(any(Runnable.class), anyLong(), eq(TimeUnit.MILLISECONDS)); + } + + @Test + public void channelInactive_cancelsReportTimer() { + when(channel.isActive()).thenReturn(true); + metrics.channelActive(channel); + + metrics.channelInactive(channel); + + verify(scheduledFuture).cancel(false); + } +}