From cf78cef966bfbd73c643396059b5b95c16ee5770 Mon Sep 17 00:00:00 2001 From: agravator Date: Mon, 23 Feb 2026 22:00:27 +0530 Subject: [PATCH 1/5] fix otel baggage prop --- .../grpc/opentelemetry/GrpcOpenTelemetry.java | 2 +- .../OpenTelemetryMetricsModule.java | 26 ++++++++++++------- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java index 6904340ac74..444a0246391 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java @@ -101,7 +101,7 @@ private GrpcOpenTelemetry(Builder builder) { this.optionalLabels = ImmutableList.copyOf(builder.optionalLabels); this.openTelemetryMetricsModule = new OpenTelemetryMetricsModule( STOPWATCH_SUPPLIER, resource, optionalLabels, builder.plugins, - builder.targetFilter); + openTelemetrySdk.getPropagators(), builder.targetFilter); this.openTelemetryTracingModule = new OpenTelemetryTracingModule(openTelemetrySdk); this.sink = new OpenTelemetryMetricSink(meter, enableMetrics, disableDefault, optionalLabels); } diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java index b05884305dc..55fb19739a9 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java @@ -49,6 +49,7 @@ import io.opentelemetry.api.baggage.Baggage; import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.ContextPropagators; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -100,24 +101,28 @@ final class OpenTelemetryMetricsModule { private final boolean localityEnabled; private final boolean backendServiceEnabled; private final ImmutableList plugins; + private final ContextPropagators aggregators; @Nullable private final TargetFilter targetAttributeFilter; OpenTelemetryMetricsModule(Supplier stopwatchSupplier, OpenTelemetryMetricsResource resource, - Collection optionalLabels, List plugins) { - this(stopwatchSupplier, resource, optionalLabels, plugins, null); + Collection optionalLabels, List plugins, + ContextPropagators aggregators) { + this(stopwatchSupplier, resource, optionalLabels, plugins, aggregators, null); } OpenTelemetryMetricsModule(Supplier stopwatchSupplier, OpenTelemetryMetricsResource resource, Collection optionalLabels, List plugins, + ContextPropagators aggregators, @Nullable TargetFilter targetAttributeFilter) { this.resource = checkNotNull(resource, "resource"); this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey()); this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey()); this.plugins = ImmutableList.copyOf(plugins); + this.aggregators = checkNotNull(aggregators, "aggregators"); this.targetAttributeFilter = targetAttributeFilter; } @@ -159,8 +164,7 @@ static String recordMethodName(String fullMethodName, boolean isGeneratedMethod) return isGeneratedMethod ? fullMethodName : "other"; } - private static Context otelContextWithBaggage() { - Baggage baggage = BAGGAGE_KEY.get(); + private static Context otelContextWithBaggage(Baggage baggage) { if (baggage == null) { return Context.current(); } @@ -282,7 +286,7 @@ public void streamClosed(Status status) { } void recordFinishedAttempt() { - Context otelContext = otelContextWithBaggage(); + Context otelContext = otelContextWithBaggage(BAGGAGE_KEY.get()); AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder() .put(METHOD_KEY, fullMethodName) .put(TARGET_KEY, target) @@ -448,7 +452,7 @@ void callEnded(Status status) { } void recordFinishedCall() { - Context otelContext = otelContextWithBaggage(); + Context otelContext = otelContextWithBaggage(BAGGAGE_KEY.get()); if (attemptsPerCall.get() == 0) { ClientTracer tracer = newClientTracer(null); tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS); @@ -553,13 +557,15 @@ private static final class ServerTracer extends ServerStreamTracer { private final Stopwatch stopwatch; private volatile long outboundWireSize; private volatile long inboundWireSize; + private final Context otelContext; ServerTracer(OpenTelemetryMetricsModule module, String fullMethodName, - List streamPlugins) { + List streamPlugins, Context otelContext) { this.module = checkNotNull(module, "module"); this.fullMethodName = fullMethodName; this.streamPlugins = checkNotNull(streamPlugins, "streamPlugins"); this.stopwatch = module.stopwatchSupplier.get().start(); + this.otelContext = checkNotNull(otelContext, "otelContext"); } @Override @@ -606,7 +612,6 @@ public void inboundWireSize(long bytes) { */ @Override public void streamClosed(Status status) { - Context otelContext = otelContextWithBaggage(); if (streamClosedUpdater != null) { if (streamClosedUpdater.getAndSet(this, 1) != 0) { return; @@ -657,7 +662,10 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata } streamPlugins = Collections.unmodifiableList(streamPluginsMutable); } - return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins); + Context context = aggregators.getTextMapPropagator().extract( + Context.current(), headers, MetadataGetter.getInstance()); + return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins, + context); } } From 661c9411503022ceca073a5997ff55357fde8c36 Mon Sep 17 00:00:00 2001 From: agravator Date: Tue, 24 Feb 2026 09:40:44 +0530 Subject: [PATCH 2/5] fix test case --- .../OpenTelemetryMetricsModuleTest.java | 63 +++++++++++-------- 1 file changed, 36 insertions(+), 27 deletions(-) diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java index 391f94cefea..d14e596beb1 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java @@ -65,6 +65,8 @@ import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.metrics.DoubleHistogram; import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapSetter; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; @@ -1245,7 +1247,8 @@ public void clientLocalityMetrics_present() { OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter, enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), emptyList()); + fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), + emptyList(), openTelemetryTesting.getOpenTelemetry().getPropagators()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); @@ -1313,7 +1316,8 @@ public void clientLocalityMetrics_missing() { OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter, enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), emptyList()); + fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), + emptyList(), openTelemetryTesting.getOpenTelemetry().getPropagators()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); @@ -1378,7 +1382,7 @@ public void clientBackendServiceMetrics_present() { enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.backend_service"), - emptyList()); + emptyList(), openTelemetryTesting.getOpenTelemetry().getPropagators()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); @@ -1447,7 +1451,7 @@ public void clientBackendServiceMetrics_missing() { enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.backend_service"), - emptyList()); + emptyList(), openTelemetryTesting.getOpenTelemetry().getPropagators()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); @@ -1631,37 +1635,39 @@ public void serverBasicMetrics() { @Test public void serverBaggagePropagationToMetrics() { - // 1. Create module and tracer factory using the mock resource - OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList()); - ServerStreamTracer.Factory tracerFactory = module.getServerTracerFactory(); - ServerStreamTracer tracer = - tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata()); - - // 2. Define the test baggage and gRPC context + // 1. Define the test baggage Baggage testBaggage = Baggage.builder() .put("user-id", "67") .build(); - // This simulates the context that the Tracing module would have created - io.grpc.Context grpcContext = io.grpc.Context.current() - .withValue(OpenTelemetryConstants.BAGGAGE_KEY, testBaggage); + // 2. Inject baggage into headers + Metadata headers = new Metadata(); + openTelemetryTesting.getOpenTelemetry().getPropagators().getTextMapPropagator() + .inject(Context.root().with(testBaggage), headers, new TextMapSetter() { + @Override + public void set(Metadata carrier, String key, String value) { + carrier.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value); + } + }); + + // 3. Create module and tracer factory using the mock resource + OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( + fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), + openTelemetryTesting.getOpenTelemetry().getPropagators()); + ServerStreamTracer.Factory tracerFactory = module.getServerTracerFactory(); + ServerStreamTracer tracer = tracerFactory.newServerStreamTracer(method.getFullMethodName(), + headers); - // 3. Attach the gRPC context, trigger metric recording, and detach - io.grpc.Context previousContext = grpcContext.attach(); - try { - tracer.streamClosed(Status.OK); - } finally { - grpcContext.detach(previousContext); - } + // 4. Trigger metric recording + tracer.streamClosed(Status.OK); - // 4. Verify the record call and capture the OTel Context + // 5. Verify the record call and capture the OTel Context verify(mockServerCallDurationHistogram).record( anyDouble(), any(io.opentelemetry.api.common.Attributes.class), contextCaptor.capture()); - // 5. Assert on the captured OTel Context + // 6. Assert on the captured OTel Context io.opentelemetry.context.Context capturedOtelContext = contextCaptor.getValue(); Baggage capturedBaggage = Baggage.fromContext(capturedOtelContext); @@ -1802,13 +1808,15 @@ public void targetAttributeFilter_rejectsTarget_mapsToOther() { private OpenTelemetryMetricsModule newOpenTelemetryMetricsModule( OpenTelemetryMetricsResource resource) { return new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList()); + fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), + openTelemetryTesting.getOpenTelemetry().getPropagators()); } private OpenTelemetryMetricsModule newOpenTelemetryMetricsModule( OpenTelemetryMetricsResource resource, TargetFilter filter) { return new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), filter); + fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), + openTelemetryTesting.getOpenTelemetry().getPropagators(), filter); } static class CallInfo extends ServerCallInfo { @@ -1848,7 +1856,8 @@ public void serverBaggagePropagation_EndToEnd() throws Exception { OpenTelemetry otel = openTelemetryTesting.getOpenTelemetry(); OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule(otel); OpenTelemetryMetricsModule metricsModule = new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList()); + fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), + otel.getPropagators()); // 2. Create Server with *both* tracer factories server = InProcessServerBuilder.forName(serverName) From 396adea656550ac547103ff8bea68bcf5f819d65 Mon Sep 17 00:00:00 2001 From: agravator Date: Wed, 25 Feb 2026 13:41:23 +0530 Subject: [PATCH 3/5] otel: add tests --- .../OpenTelemetryMetricsModule.java | 17 +- .../OpenTelemetryMetricsModuleTest.java | 180 ++++++++++++++++++ 2 files changed, 189 insertions(+), 8 deletions(-) diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java index 55fb19739a9..df8230cda83 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java @@ -101,28 +101,28 @@ final class OpenTelemetryMetricsModule { private final boolean localityEnabled; private final boolean backendServiceEnabled; private final ImmutableList plugins; - private final ContextPropagators aggregators; + private final ContextPropagators contextPropagators; @Nullable private final TargetFilter targetAttributeFilter; OpenTelemetryMetricsModule(Supplier stopwatchSupplier, OpenTelemetryMetricsResource resource, Collection optionalLabels, List plugins, - ContextPropagators aggregators) { - this(stopwatchSupplier, resource, optionalLabels, plugins, aggregators, null); + ContextPropagators contextPropagators) { + this(stopwatchSupplier, resource, optionalLabels, plugins, contextPropagators, null); } OpenTelemetryMetricsModule(Supplier stopwatchSupplier, OpenTelemetryMetricsResource resource, Collection optionalLabels, List plugins, - ContextPropagators aggregators, - @Nullable TargetFilter targetAttributeFilter) { + ContextPropagators contextPropagators, + @Nullable TargetFilter targetAttributeFilter) { this.resource = checkNotNull(resource, "resource"); this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey()); this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey()); this.plugins = ImmutableList.copyOf(plugins); - this.aggregators = checkNotNull(aggregators, "aggregators"); + this.contextPropagators = checkNotNull(contextPropagators, "contextPropagators"); this.targetAttributeFilter = targetAttributeFilter; } @@ -580,7 +580,7 @@ public void serverCallStarted(ServerCallInfo callInfo) { METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing)); if (module.resource.serverCallCountCounter() != null) { - module.resource.serverCallCountCounter().add(1, attribute); + module.resource.serverCallCountCounter().add(1, attribute, otelContext); } } @@ -662,7 +662,7 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata } streamPlugins = Collections.unmodifiableList(streamPluginsMutable); } - Context context = aggregators.getTextMapPropagator().extract( + Context context = contextPropagators.getTextMapPropagator().extract( Context.current(), headers, MetadataGetter.getInstance()); return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins, context); @@ -725,3 +725,4 @@ public void onClose(Status status, Metadata trailers) { } } } + diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java index d14e596beb1..3b4ab1732b1 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java @@ -28,8 +28,13 @@ import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.grpc.Attributes; import io.grpc.CallOptions; @@ -38,6 +43,7 @@ import io.grpc.ClientInterceptor; import io.grpc.ClientInterceptors; import io.grpc.ClientStreamTracer; +import io.grpc.ForwardingClientCall; import io.grpc.KnownLength; import io.grpc.Metadata; import io.grpc.MethodDescriptor; @@ -62,10 +68,15 @@ import io.grpc.testing.protobuf.SimpleServiceGrpc; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.baggage.Baggage; +import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongHistogram; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.context.propagation.TextMapPropagator; import io.opentelemetry.context.propagation.TextMapSetter; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.metrics.data.MetricData; @@ -76,6 +87,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; @@ -96,6 +109,7 @@ */ @RunWith(JUnit4.class) public class OpenTelemetryMetricsModuleTest { + // ... existing code ... private static final CallOptions.Key CUSTOM_OPTION = CallOptions.Key.createWithDefault("option1", "default"); @@ -1910,4 +1924,170 @@ public void unaryRpc(SimpleRequest request, StreamObserver respo responseObserver.onCompleted(); } } + + @Test + public void serverMetricsShouldRecordContextWithBaggage() { + // Mocks + DoubleHistogram serverCallDurationCounter = mock(DoubleHistogram.class); + OpenTelemetryMetricsResource resource = mock(OpenTelemetryMetricsResource.class); + when(resource.serverCallDurationCounter()).thenReturn(serverCallDurationCounter); + + // ContextPropagators with Baggage + ContextPropagators propagators = ContextPropagators.create( + TextMapPropagator.composite(W3CBaggagePropagator.getInstance())); + + // Module + OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( + Stopwatch::createUnstarted, + resource, + ImmutableList.of(), + ImmutableList.of(), + propagators); + + // Baggage to inject + Baggage baggage = Baggage.builder().put("my-baggage-key", "my-baggage-value").build(); + Metadata headers = new Metadata(); + propagators.getTextMapPropagator().inject(Context.root().with(baggage), headers, + new MetadataSetter()); + + // Create Tracer + io.grpc.ServerStreamTracer.Factory factory = module.getServerTracerFactory(); + io.grpc.ServerStreamTracer tracer = factory.newServerStreamTracer("test/method", headers); + + // Close stream logic + tracer.streamClosed(Status.OK); + + // Verify record called with context (which should have baggage) + verify(serverCallDurationCounter).record( + anyDouble(), + any(), + org.mockito.ArgumentMatchers.argThat(ctx -> { + Baggage b = Baggage.fromContext(ctx); + return "my-baggage-value".equals(b.getEntryValue("my-baggage-key")); + })); + } + + @Test + public void serverMetrics_withExternalExecutor_propagatesBaggage() throws Exception { + // Setup Mocks & Resource + DoubleHistogram serverCallDurationCounter = mock(DoubleHistogram.class); + LongCounter serverCallCountCounter = mock(LongCounter.class); + LongHistogram serverTotalSentCompressedMessageSizeCounter = mock(LongHistogram.class); + LongHistogram serverTotalReceivedCompressedMessageSizeCounter = mock(LongHistogram.class); + OpenTelemetryMetricsResource resource = mock(OpenTelemetryMetricsResource.class); + when(resource.serverCallDurationCounter()).thenReturn(serverCallDurationCounter); + when(resource.serverCallCountCounter()).thenReturn(serverCallCountCounter); + when(resource.serverTotalSentCompressedMessageSizeCounter()) + .thenReturn(serverTotalSentCompressedMessageSizeCounter); + when(resource.serverTotalReceivedCompressedMessageSizeCounter()) + .thenReturn(serverTotalReceivedCompressedMessageSizeCounter); + + // Setup Propagators + ContextPropagators propagators = ContextPropagators.create(W3CBaggagePropagator.getInstance()); + + // Initialize Module + OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( + Stopwatch::createUnstarted, + resource, + ImmutableList.of(), + ImmutableList.of(), + propagators); + + // Setup Server with Wrapped Executor (Custom Executor) + ExecutorService customExecutor = Executors.newFixedThreadPool(2); + java.util.concurrent.Executor rawExecutor = customExecutor; + + String serverName = InProcessServerBuilder.generateName(); + io.grpc.Server server = InProcessServerBuilder.forName(serverName) + .executor(rawExecutor) + .addService(new SimpleServiceGrpc.SimpleServiceImplBase() { + @Override + public void unaryRpc( + SimpleRequest request, + StreamObserver responseObserver) { + responseObserver.onNext(SimpleResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + }) + .addStreamTracerFactory(module.getServerTracerFactory()) + .build().start(); + + // Client Interceptor to inject baggage + ClientInterceptor baggageInterceptor = new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return new ForwardingClientCall.SimpleForwardingClientCall( + next.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + propagators.getTextMapPropagator().inject(Context.current(), headers, + new MetadataSetter()); + super.start(responseListener, headers); + } + }; + } + }; + + // Setup Client and Inject Baggage + io.grpc.ManagedChannel channel = InProcessChannelBuilder.forName(serverName) + .intercept(baggageInterceptor) + .directExecutor() + .build(); + SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc + .newBlockingStub(channel); + + // Define multiple Baggage items + Baggage testBaggage = Baggage.builder() + .put("key1", "value1") + .put("key2", "value/with/special:chars") + .build(); + + // Make the call with Baggage in Context + try (io.opentelemetry.context.Scope scope = Context.current().with(testBaggage).makeCurrent()) { + stub.unaryRpc(SimpleRequest.getDefaultInstance()); + } + + // Shutdown and Wait + channel.shutdownNow(); + server.shutdown().awaitTermination(5, TimeUnit.SECONDS); + customExecutor.shutdownNow(); + + // Verification Logic for Baggage + org.mockito.ArgumentMatcher baggageMatcher = ctx -> { + Baggage b = Baggage.fromContext(ctx); + return "value1".equals(b.getEntryValue("key1")) + && "value/with/special:chars".equals(b.getEntryValue("key2")); + }; + + // Verify all metrics recorded with correct baggage + // Use timeout to avoid race conditions as metrics might be recorded + // asynchronously + verify(serverCallDurationCounter, timeout(5000)).record( + anyDouble(), + any(), // Attributes + org.mockito.ArgumentMatchers.argThat(baggageMatcher)); + + verify(serverCallCountCounter, timeout(5000)).add( + org.mockito.ArgumentMatchers.eq(1L), + any(), // Attributes + org.mockito.ArgumentMatchers.argThat(baggageMatcher)); + + verify(serverTotalSentCompressedMessageSizeCounter, timeout(5000)).record( + org.mockito.ArgumentMatchers.anyLong(), + any(), // Attributes + org.mockito.ArgumentMatchers.argThat(baggageMatcher)); + + verify(serverTotalReceivedCompressedMessageSizeCounter, timeout(5000)).record( + org.mockito.ArgumentMatchers.anyLong(), + any(), // Attributes + org.mockito.ArgumentMatchers.argThat(baggageMatcher)); + } + + private static class MetadataSetter implements TextMapSetter { + @Override + public void set(Metadata carrier, String key, String value) { + carrier.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value); + } + } } From 96dccf448a8bf81f77b3b9d655b56764d6657562 Mon Sep 17 00:00:00 2001 From: agravator Date: Wed, 25 Feb 2026 14:10:13 +0530 Subject: [PATCH 4/5] fix format --- .../OpenTelemetryMetricsModuleTest.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java index 3b4ab1732b1..5e4c11804e7 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java @@ -31,7 +31,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; @@ -1929,8 +1928,9 @@ public void unaryRpc(SimpleRequest request, StreamObserver respo public void serverMetricsShouldRecordContextWithBaggage() { // Mocks DoubleHistogram serverCallDurationCounter = mock(DoubleHistogram.class); - OpenTelemetryMetricsResource resource = mock(OpenTelemetryMetricsResource.class); - when(resource.serverCallDurationCounter()).thenReturn(serverCallDurationCounter); + OpenTelemetryMetricsResource resource = OpenTelemetryMetricsResource.builder() + .serverCallDurationCounter(serverCallDurationCounter) + .build(); // ContextPropagators with Baggage ContextPropagators propagators = ContextPropagators.create( @@ -1974,13 +1974,14 @@ public void serverMetrics_withExternalExecutor_propagatesBaggage() throws Except LongCounter serverCallCountCounter = mock(LongCounter.class); LongHistogram serverTotalSentCompressedMessageSizeCounter = mock(LongHistogram.class); LongHistogram serverTotalReceivedCompressedMessageSizeCounter = mock(LongHistogram.class); - OpenTelemetryMetricsResource resource = mock(OpenTelemetryMetricsResource.class); - when(resource.serverCallDurationCounter()).thenReturn(serverCallDurationCounter); - when(resource.serverCallCountCounter()).thenReturn(serverCallCountCounter); - when(resource.serverTotalSentCompressedMessageSizeCounter()) - .thenReturn(serverTotalSentCompressedMessageSizeCounter); - when(resource.serverTotalReceivedCompressedMessageSizeCounter()) - .thenReturn(serverTotalReceivedCompressedMessageSizeCounter); + OpenTelemetryMetricsResource resource = OpenTelemetryMetricsResource.builder() + .serverCallDurationCounter(serverCallDurationCounter) + .serverCallCountCounter(serverCallCountCounter) + .serverTotalSentCompressedMessageSizeCounter( + serverTotalSentCompressedMessageSizeCounter) + .serverTotalReceivedCompressedMessageSizeCounter( + serverTotalReceivedCompressedMessageSizeCounter) + .build(); // Setup Propagators ContextPropagators propagators = ContextPropagators.create(W3CBaggagePropagator.getInstance()); From 04daa070c71c7b01f7b73c51d261919eead2a75a Mon Sep 17 00:00:00 2001 From: agravator Date: Thu, 26 Feb 2026 15:00:45 +0530 Subject: [PATCH 5/5] cover missing cases --- .../OpenTelemetryMetricsModule.java | 12 +- .../OpenTelemetryMetricsModuleTest.java | 247 ++++++++++++++++++ 2 files changed, 256 insertions(+), 3 deletions(-) diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java index df8230cda83..d14ffb9abc6 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java @@ -632,17 +632,23 @@ public void streamClosed(Status status) { } io.opentelemetry.api.common.Attributes attributes = builder.build(); + Context ctxToRecord = otelContext; + Baggage currentBaggage = BAGGAGE_KEY.get(); + if (currentBaggage != null && !currentBaggage.isEmpty()) { + ctxToRecord = ctxToRecord.with(currentBaggage); + } + if (module.resource.serverCallDurationCounter() != null) { module.resource.serverCallDurationCounter() - .record(elapsedTimeNanos * SECONDS_PER_NANO, attributes, otelContext); + .record(elapsedTimeNanos * SECONDS_PER_NANO, attributes, ctxToRecord); } if (module.resource.serverTotalSentCompressedMessageSizeCounter() != null) { module.resource.serverTotalSentCompressedMessageSizeCounter() - .record(outboundWireSize, attributes, otelContext); + .record(outboundWireSize, attributes, ctxToRecord); } if (module.resource.serverTotalReceivedCompressedMessageSizeCounter() != null) { module.resource.serverTotalReceivedCompressedMessageSizeCounter() - .record(inboundWireSize, attributes, otelContext); + .record(inboundWireSize, attributes, ctxToRecord); } } } diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java index 5e4c11804e7..cd3ba2dd1c0 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java @@ -17,6 +17,7 @@ package io.grpc.opentelemetry; import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED; +import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BAGGAGE_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY; @@ -42,12 +43,14 @@ import io.grpc.ClientInterceptor; import io.grpc.ClientInterceptors; import io.grpc.ClientStreamTracer; +import io.grpc.Contexts; import io.grpc.ForwardingClientCall; import io.grpc.KnownLength; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; import io.grpc.ServerServiceDefinition; import io.grpc.ServerStreamTracer; import io.grpc.ServerStreamTracer.ServerCallInfo; @@ -59,7 +62,9 @@ import io.grpc.opentelemetry.GrpcOpenTelemetry.TargetFilter; import io.grpc.opentelemetry.OpenTelemetryMetricsModule.CallAttemptsTracerFactory; import io.grpc.opentelemetry.internal.OpenTelemetryConstants; +import io.grpc.stub.ClientCalls; import io.grpc.stub.MetadataUtils; +import io.grpc.stub.ServerCalls; import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcServerRule; import io.grpc.testing.protobuf.SimpleRequest; @@ -83,6 +88,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -2091,4 +2097,245 @@ public void set(Metadata carrier, String key, String value) { carrier.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value); } } + + @Test + public void clientMetric_baggagePropagation_externalExecutor() throws Exception { + String target = "target:///"; + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + // Mock the instrument to verify Context + DoubleHistogram mockHistogram = mock(DoubleHistogram.class); + OpenTelemetryMetricsResource resource = OpenTelemetryMetricsResource.builder() + .clientAttemptDurationCounter(mockHistogram) + .build(); + + OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource); + + MethodDescriptor methodDescriptor = + MethodDescriptor.newBuilder() + .setType(MethodDescriptor.MethodType.UNARY) + .setFullMethodName("service/method") + .setRequestMarshaller(MARSHALLER) + .setResponseMarshaller(MARSHALLER) + .setSampledToLocalTracing(true) + .build(); + + ServerServiceDefinition serviceDef = ServerServiceDefinition.builder("service") + .addMethod(methodDescriptor, ServerCalls.asyncUnaryCall( + new ServerCalls.UnaryMethod() { + @Override + public void invoke(String req, StreamObserver responseObserver) { + responseObserver.onNext("Response"); + responseObserver.onCompleted(); + } + })) + .build(); + + server = InProcessServerBuilder.forName("client-baggage-test") + .directExecutor() + .addService(serviceDef) + .build().start(); + + InProcessChannelBuilder channelBuilder = + InProcessChannelBuilder.forName("client-baggage-test") + .executor(executor); + + // Use the module's interceptor + ClientInterceptor interceptor = module.getClientInterceptor(target); + channel = channelBuilder.intercept(interceptor).build(); + + Baggage baggage = Baggage.builder().put("client_key", "client_value").build(); + try (io.opentelemetry.context.Scope scope = Context.current().with(baggage).makeCurrent()) { + ClientCalls.blockingUnaryCall(channel, methodDescriptor, CallOptions.DEFAULT, "Request"); + } + + ArgumentCaptor contextCaptor = ArgumentCaptor.forClass(Context.class); + + // Use atLeastOnce() and timeout to allow for async execution + verify(mockHistogram, timeout(1000).atLeastOnce()) + .record(anyDouble(), any(io.opentelemetry.api.common.Attributes.class), + contextCaptor.capture()); + + boolean found = false; + for (Context ctx : contextCaptor.getAllValues()) { + Baggage b = Baggage.fromContext(ctx); + if ("client_value".equals(b.getEntryValue("client_key"))) { + found = true; + break; + } + } + assertTrue("Client baggage not found in metrics context", found); + } finally { + executor.shutdown(); + } + } + + @Test + public void serverMetric_baggagePropagation_externalExecutor() throws Exception { + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + // Mock the instrument + DoubleHistogram mockHistogram = mock(DoubleHistogram.class); + OpenTelemetryMetricsResource resource = OpenTelemetryMetricsResource.builder() + .serverCallDurationCounter(mockHistogram) + .build(); + + // Configure module with propagation + ContextPropagators propagators = + ContextPropagators.create(W3CBaggagePropagator.getInstance()); + + OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( + fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), propagators); + + MethodDescriptor methodDescriptor = + MethodDescriptor.newBuilder() + .setType(MethodDescriptor.MethodType.UNARY) + .setFullMethodName("service/method") + .setRequestMarshaller(MARSHALLER) + .setResponseMarshaller(MARSHALLER) + .setSampledToLocalTracing(true) + .build(); + + ServerServiceDefinition serviceDef = ServerServiceDefinition.builder("service") + .addMethod(methodDescriptor, ServerCalls.asyncUnaryCall( + new ServerCalls.UnaryMethod() { + @Override + public void invoke(String req, StreamObserver responseObserver) { + responseObserver.onNext("Response"); + responseObserver.onCompleted(); + } + })) + .build(); + + // Use external executor by setting it on builder + InProcessServerBuilder serverBuilder = InProcessServerBuilder.forName("server-baggage-test") + .executor(executor) + .addService(serviceDef) + .addStreamTracerFactory(module.getServerTracerFactory()); + + server = serverBuilder.build().start(); + + channel = InProcessChannelBuilder.forName("server-baggage-test") + .directExecutor() + .build(); + + // We need to inject Baggage into the call. + ClientInterceptor baggageInjector = new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return new ForwardingClientCall.SimpleForwardingClientCall( + next.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + // Inject baggage manually into headers + Baggage baggage = Baggage.builder().put("server_key", "server_value").build(); + propagators.getTextMapPropagator().inject(Context.current().with(baggage), headers, + new MetadataSetter()); + super.start(responseListener, headers); + } + }; + } + }; + + ClientCalls.blockingUnaryCall( + ClientInterceptors.intercept(channel, baggageInjector), + methodDescriptor, CallOptions.DEFAULT, "Request"); + + ArgumentCaptor contextCaptor = ArgumentCaptor.forClass(Context.class); + + verify(mockHistogram, timeout(1000).atLeastOnce()) + .record(anyDouble(), any(io.opentelemetry.api.common.Attributes.class), + contextCaptor.capture()); + + boolean found = false; + for (Context ctx : contextCaptor.getAllValues()) { + Baggage b = Baggage.fromContext(ctx); + if ("server_value".equals(b.getEntryValue("server_key"))) { + found = true; + break; + } + } + assertTrue("Server baggage not found in metrics context", found); + } finally { + executor.shutdown(); + } + } + + @Test + public void serverMetric_interceptedBaggage() throws Exception { + // This test verifies if baggage added by a ServerInterceptor is visible to + // OpenTelemetry metrics Context. + + OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( + () -> Stopwatch.createUnstarted(), + resource, + Collections.emptyList(), /* optionalLabels */ + Collections.emptyList(), /* plugins */ + ContextPropagators.noop()); + + ServerInterceptor baggageInterceptor = new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall call, Metadata headers, ServerCallHandler next) { + // Add baggage to the context + Baggage baggage = Baggage.builder() + .put("interceptor_key", "interceptor_value") + .build(); + io.grpc.Context ctx = io.grpc.Context.current().withValue(BAGGAGE_KEY, baggage); + return Contexts.interceptCall(ctx, call, headers, next); + } + }; + + MethodDescriptor methodDescriptor = + MethodDescriptor.newBuilder() + .setType(MethodDescriptor.MethodType.UNARY) + // Matching existing method name in test class + .setFullMethodName("package1.service2/method3") + .setRequestMarshaller(MARSHALLER) // Use existing MARSHALLER + .setResponseMarshaller(MARSHALLER) + .setSampledToLocalTracing(true) + .build(); + + ServerServiceDefinition serviceDef = ServerServiceDefinition.builder("package1.service2") + .addMethod(methodDescriptor, ServerCalls.asyncUnaryCall( + new ServerCalls.UnaryMethod() { + @Override + public void invoke(String req, StreamObserver responseObserver) { + responseObserver.onNext("Response"); + responseObserver.onCompleted(); + } + })) + .build(); + + InProcessServerBuilder serverBuilder = InProcessServerBuilder.forName("interceptor-test") + .addService(serviceDef) + .intercept(baggageInterceptor) + .addStreamTracerFactory(module.getServerTracerFactory()); + + server = serverBuilder.build().start(); + + // Use a real channel but we don't need metrics on client side for this test + InProcessChannelBuilder channelBuilder = InProcessChannelBuilder.forName("interceptor-test"); + channel = channelBuilder.build(); + + ClientCalls.blockingUnaryCall(channel, methodDescriptor, CallOptions.DEFAULT, "Request"); + + // Verify that record was called with a Context containing the baggage + // Reusing contextCaptor from the class + verify(mockServerCallDurationHistogram, timeout(1000).atLeastOnce()) + .record(anyDouble(), any(io.opentelemetry.api.common.Attributes.class), + contextCaptor.capture()); + + boolean found = false; + for (io.opentelemetry.context.Context ctx : contextCaptor.getAllValues()) { + // Baggage from OTEL Context + Baggage otelBaggage = Baggage.fromContext(ctx); + if ("interceptor_value".equals(otelBaggage.getEntryValue("interceptor_key"))) { + found = true; + break; + } + } + assertThat(found).isTrue(); + } }