Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private GrpcOpenTelemetry(Builder builder) {
this.optionalLabels = ImmutableList.copyOf(builder.optionalLabels);
this.openTelemetryMetricsModule = new OpenTelemetryMetricsModule(
STOPWATCH_SUPPLIER, resource, optionalLabels, builder.plugins,
builder.targetFilter);
openTelemetrySdk.getPropagators(), builder.targetFilter);
this.openTelemetryTracingModule = new OpenTelemetryTracingModule(openTelemetrySdk);
this.sink = new OpenTelemetryMetricSink(meter, enableMetrics, disableDefault, optionalLabels);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.opentelemetry.api.baggage.Baggage;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.ContextPropagators;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -100,24 +101,28 @@ final class OpenTelemetryMetricsModule {
private final boolean localityEnabled;
private final boolean backendServiceEnabled;
private final ImmutableList<OpenTelemetryPlugin> plugins;
private final ContextPropagators contextPropagators;
@Nullable
private final TargetFilter targetAttributeFilter;

OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
OpenTelemetryMetricsResource resource,
Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins) {
this(stopwatchSupplier, resource, optionalLabels, plugins, null);
Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins,
ContextPropagators contextPropagators) {
this(stopwatchSupplier, resource, optionalLabels, plugins, contextPropagators, null);
}

OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
OpenTelemetryMetricsResource resource,
Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins,
@Nullable TargetFilter targetAttributeFilter) {
ContextPropagators contextPropagators,
@Nullable TargetFilter targetAttributeFilter) {
this.resource = checkNotNull(resource, "resource");
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey());
this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey());
this.plugins = ImmutableList.copyOf(plugins);
this.contextPropagators = checkNotNull(contextPropagators, "contextPropagators");
this.targetAttributeFilter = targetAttributeFilter;
}

Expand Down Expand Up @@ -159,8 +164,7 @@ static String recordMethodName(String fullMethodName, boolean isGeneratedMethod)
return isGeneratedMethod ? fullMethodName : "other";
}

private static Context otelContextWithBaggage() {
Baggage baggage = BAGGAGE_KEY.get();
private static Context otelContextWithBaggage(Baggage baggage) {
if (baggage == null) {
return Context.current();
}
Expand Down Expand Up @@ -282,7 +286,7 @@ public void streamClosed(Status status) {
}

void recordFinishedAttempt() {
Context otelContext = otelContextWithBaggage();
Context otelContext = otelContextWithBaggage(BAGGAGE_KEY.get());
AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
.put(METHOD_KEY, fullMethodName)
.put(TARGET_KEY, target)
Expand Down Expand Up @@ -448,7 +452,7 @@ void callEnded(Status status) {
}

void recordFinishedCall() {
Context otelContext = otelContextWithBaggage();
Context otelContext = otelContextWithBaggage(BAGGAGE_KEY.get());
if (attemptsPerCall.get() == 0) {
ClientTracer tracer = newClientTracer(null);
tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS);
Expand Down Expand Up @@ -553,13 +557,15 @@ private static final class ServerTracer extends ServerStreamTracer {
private final Stopwatch stopwatch;
private volatile long outboundWireSize;
private volatile long inboundWireSize;
private final Context otelContext;

ServerTracer(OpenTelemetryMetricsModule module, String fullMethodName,
List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins) {
List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins, Context otelContext) {
this.module = checkNotNull(module, "module");
this.fullMethodName = fullMethodName;
this.streamPlugins = checkNotNull(streamPlugins, "streamPlugins");
this.stopwatch = module.stopwatchSupplier.get().start();
this.otelContext = checkNotNull(otelContext, "otelContext");
}

@Override
Expand All @@ -574,7 +580,7 @@ public void serverCallStarted(ServerCallInfo<?, ?> callInfo) {
METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing));

if (module.resource.serverCallCountCounter() != null) {
module.resource.serverCallCountCounter().add(1, attribute);
module.resource.serverCallCountCounter().add(1, attribute, otelContext);
}
}

Expand Down Expand Up @@ -606,7 +612,6 @@ public void inboundWireSize(long bytes) {
*/
@Override
public void streamClosed(Status status) {
Context otelContext = otelContextWithBaggage();
if (streamClosedUpdater != null) {
if (streamClosedUpdater.getAndSet(this, 1) != 0) {
return;
Expand Down Expand Up @@ -657,7 +662,10 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata
}
streamPlugins = Collections.unmodifiableList(streamPluginsMutable);
}
return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins);
Context context = contextPropagators.getTextMapPropagator().extract(
Context.current(), headers, MetadataGetter.getInstance());
return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins,
context);
}
}

Expand Down Expand Up @@ -717,3 +725,4 @@ public void onClose(Status status, Metadata trailers) {
}
}
}

Loading