From 8aa7457aea147daa23a82dec7121598756d4212b Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Mon, 30 Mar 2026 14:10:29 -0400 Subject: [PATCH 1/2] CSS v1.3 initial implementation --- .../ddagent/DDAgentFeaturesDiscovery.java | 12 ++ .../DDAgentFeaturesDiscoveryTest.groovy | 16 ++ ...gent-info-with-additional-metric-tags.json | 70 +++++++++ .../metrics/ConflatingMetricsAggregator.java | 41 +++++ .../trace/common/metrics/MetricKey.java | 42 ++++++ .../metrics/SerializingMetricWriter.java | 12 +- .../ConflatingMetricAggregatorTest.groovy | 142 ++++++++++++++++++ .../SerializingMetricWriterTest.groovy | 67 ++++++++- 8 files changed, 400 insertions(+), 2 deletions(-) create mode 100644 communication/src/test/resources/agent-features/agent-info-with-additional-metric-tags.json diff --git a/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java b/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java index 755094cc2e4..33075891145 100644 --- a/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java +++ b/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.security.NoSuchAlgorithmException; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -98,6 +99,7 @@ private static class State { String version; String telemetryProxyEndpoint; Set peerTags = emptySet(); + Set additionalMetricTags = emptySet(); long lastTimeDiscovered; } @@ -308,6 +310,12 @@ private boolean processInfoResponse(State newState, String response) { peer_tags instanceof List ? unmodifiableSet(new HashSet<>((List) peer_tags)) : emptySet(); + + Object span_derived_primary_tags = map.get("span_derived_primary_tags"); + newState.additionalMetricTags = + span_derived_primary_tags instanceof List + ? unmodifiableSet(new LinkedHashSet<>((List) span_derived_primary_tags)) + : emptySet(); } try { newState.state = Strings.sha256(response); @@ -396,6 +404,10 @@ public Set peerTags() { return discoveryState.peerTags; } + public Set additionalMetricTags() { + return discoveryState.additionalMetricTags; + } + public String getMetricsEndpoint() { return discoveryState.metricsEndpoint; } diff --git a/communication/src/test/groovy/datadog/communication/ddagent/DDAgentFeaturesDiscoveryTest.groovy b/communication/src/test/groovy/datadog/communication/ddagent/DDAgentFeaturesDiscoveryTest.groovy index 505595e55e7..ac0f7f308fa 100644 --- a/communication/src/test/groovy/datadog/communication/ddagent/DDAgentFeaturesDiscoveryTest.groovy +++ b/communication/src/test/groovy/datadog/communication/ddagent/DDAgentFeaturesDiscoveryTest.groovy @@ -37,6 +37,8 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification { static final String INFO_STATE = Strings.sha256(INFO_RESPONSE) static final String INFO_WITH_PEER_TAG_BACK_PROPAGATION_RESPONSE = loadJsonFile("agent-info-with-peer-tag-back-propagation.json") static final String INFO_WITH_PEER_TAG_BACK_PROPAGATION_STATE = Strings.sha256(INFO_WITH_PEER_TAG_BACK_PROPAGATION_RESPONSE) + static final String INFO_WITH_ADDITIONAL_METRIC_TAGS_RESPONSE = loadJsonFile("agent-info-with-additional-metric-tags.json") + static final String INFO_WITH_ADDITIONAL_METRIC_TAGS_STATE = Strings.sha256(INFO_WITH_ADDITIONAL_METRIC_TAGS_RESPONSE) static final String INFO_WITH_CLIENT_DROPPING_RESPONSE = loadJsonFile("agent-info-with-client-dropping.json") static final String INFO_WITH_CLIENT_DROPPING_STATE = Strings.sha256(INFO_WITH_CLIENT_DROPPING_RESPONSE) static final String INFO_WITHOUT_METRICS_RESPONSE = loadJsonFile("agent-info-without-metrics.json") @@ -507,6 +509,20 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification { ) } + def "test parse /info response with additional-metric-tags"() { + setup: + OkHttpClient client = Mock(OkHttpClient) + DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, true, true) + + when: "/info available with additional-metric-tags" + features.discover() + + then: + 1 * client.newCall(_) >> { Request request -> infoResponse(request, INFO_WITH_ADDITIONAL_METRIC_TAGS_RESPONSE) } + features.state() == INFO_WITH_ADDITIONAL_METRIC_TAGS_STATE + features.additionalMetricTags() == ["region", "priority"] as Set + } + def "test metrics disabled for agent version below 7.65"() { setup: OkHttpClient client = Mock(OkHttpClient) diff --git a/communication/src/test/resources/agent-features/agent-info-with-additional-metric-tags.json b/communication/src/test/resources/agent-features/agent-info-with-additional-metric-tags.json new file mode 100644 index 00000000000..af971f01991 --- /dev/null +++ b/communication/src/test/resources/agent-features/agent-info-with-additional-metric-tags.json @@ -0,0 +1,70 @@ +{ + "version": "7.67.0", + "git_commit": "bdf863ccc9", + "endpoints": [ + "/v0.3/traces", + "/v0.3/services", + "/v0.4/traces", + "/v0.4/services", + "/v0.5/traces", + "/v0.7/traces", + "/profiling/v1/input", + "/telemetry/proxy/", + "/v0.6/stats", + "/v0.1/pipeline_stats", + "/openlineage/api/v1/lineage", + "/evp_proxy/v1/", + "/evp_proxy/v2/", + "/evp_proxy/v3/", + "/evp_proxy/v4/", + "/debugger/v1/input", + "/debugger/v1/diagnostics", + "/symdb/v1/input", + "/dogstatsd/v1/proxy", + "/dogstatsd/v2/proxy", + "/tracer_flare/v1", + "/config/set" + ], + "client_drop_p0s": true, + "span_meta_structs": true, + "long_running_spans": true, + "span_events": true, + "config": { + "default_env": "prod", + "target_tps": 10, + "max_eps": 200, + "receiver_port": 8127, + "receiver_socket": "/var/run/datadog/apm.socket", + "connection_limit": 12, + "receiver_timeout": 100, + "max_request_bytes": 26214400, + "statsd_port": 8125, + "max_memory": 0, + "max_cpu": 0, + "analyzed_spans_by_service": {}, + "obfuscation": { + "elastic_search": true, + "mongo": true, + "sql_exec_plan": true, + "sql_exec_plan_normalize": true, + "http": { + "remove_query_string": true, + "remove_path_digits": true + }, + "remove_stack_traces": false, + "redis": true, + "memcached": false + } + }, + "span_derived_primary_tags": [ + "region", + "priority" + ], + "span_kinds_stats_computed": [ + "server", + "consumer", + "client", + "producer" + ], + "obfuscation_version": 1 +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 010993efe50..498acef47b7 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -41,6 +41,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -80,6 +81,16 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve Pair.of( DDCaches.newFixedSizeCache(512), value -> UTF8BytesString.create(key + ":" + value)); + private static final DDCache< + String, Pair, Function>> + SPAN_DERIVED_PRIMARY_TAGS_CACHE = DDCaches.newFixedSizeCache(64); + private static final Function< + String, Pair, Function>> + SPAN_DERIVED_PRIMARY_TAGS_CACHE_ADDER = + key -> + Pair.of( + DDCaches.newFixedSizeCache(512), + value -> UTF8BytesString.create(key + ":" + value)); private static final CharSequence SYNTHETICS_ORIGIN = "synthetics"; private static final Set ELIGIBLE_SPAN_KINDS_FOR_METRICS = @@ -347,6 +358,7 @@ private boolean publish(CoreSpan span, boolean isTopLevel, CharSequence spanK SPAN_KINDS.computeIfAbsent( spanKind, UTF8BytesString::create), // save repeated utf8 conversions getPeerTags(span, spanKind.toString()), + getAdditionalMetricTags(span), httpMethod, httpEndpoint, grpcStatusCode); @@ -412,6 +424,35 @@ private List getPeerTags(CoreSpan span, String spanKind) { return Collections.emptyList(); } + // TODO: This method is very similar to getPeerTags. We can probably consolidate to a helper. + private List getAdditionalMetricTags(CoreSpan span) { + Set additionalMetricTags = features.additionalMetricTags(); + if (additionalMetricTags == null || additionalMetricTags.isEmpty()) { + return Collections.emptyList(); + } + List tagValues = new ArrayList<>(additionalMetricTags.size()); + for (String tagKey : additionalMetricTags) { + Object value = span.unsafeGetTag(tagKey); + if (value != null) { + final Pair, Function> + cacheAndCreator = + SPAN_DERIVED_PRIMARY_TAGS_CACHE.computeIfAbsent( + tagKey, SPAN_DERIVED_PRIMARY_TAGS_CACHE_ADDER); + tagValues.add( + cacheAndCreator + .getLeft() + .computeIfAbsent(value.toString(), cacheAndCreator.getRight())); + } + } + if (tagValues.isEmpty()) { + return Collections.emptyList(); + } + if (tagValues.size() > 1) { + tagValues.sort(Comparator.comparing(UTF8BytesString::toString)); + } + return tagValues; + } + private static boolean isSynthetic(CoreSpan span) { return span.getOrigin() != null && SYNTHETICS_ORIGIN.equals(span.getOrigin().toString()); } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java index 9e2e2098d1f..1cb59367045 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java @@ -36,6 +36,7 @@ public final class MetricKey { private final boolean isTraceRoot; private final UTF8BytesString spanKind; private final List peerTags; + private final List additionalMetricTags; private final UTF8BytesString httpMethod; private final UTF8BytesString httpEndpoint; private final UTF8BytesString grpcStatusCode; @@ -54,6 +55,39 @@ public MetricKey( CharSequence httpMethod, CharSequence httpEndpoint, CharSequence grpcStatusCode) { + this( + resource, + service, + operationName, + serviceSource, + type, + httpStatusCode, + synthetics, + isTraceRoot, + spanKind, + peerTags, + Collections.emptyList(), + httpMethod, + httpEndpoint, + grpcStatusCode); + } + + // TODO: Should we keep one constructor? We'd need to refactor all of the old calls. + public MetricKey( + CharSequence resource, + CharSequence service, + CharSequence operationName, + CharSequence serviceSource, + CharSequence type, + int httpStatusCode, + boolean synthetics, + boolean isTraceRoot, + CharSequence spanKind, + List peerTags, + List additionalMetricTags, + CharSequence httpMethod, + CharSequence httpEndpoint, + CharSequence grpcStatusCode) { this.resource = null == resource ? EMPTY : utf8(RESOURCE_CACHE, resource); this.service = null == service ? EMPTY : utf8(SERVICE_CACHE, service); this.serviceSource = null == serviceSource ? null : utf8(SERVICE_SOURCE_CACHE, serviceSource); @@ -64,6 +98,8 @@ public MetricKey( this.isTraceRoot = isTraceRoot; this.spanKind = null == spanKind ? EMPTY : utf8(KIND_CACHE, spanKind); this.peerTags = peerTags == null ? Collections.emptyList() : peerTags; + this.additionalMetricTags = + additionalMetricTags == null ? Collections.emptyList() : additionalMetricTags; this.httpMethod = httpMethod == null ? null : utf8(HTTP_METHOD_CACHE, httpMethod); this.httpEndpoint = httpEndpoint == null ? null : utf8(HTTP_ENDPOINT_CACHE, httpEndpoint); this.grpcStatusCode = @@ -73,6 +109,7 @@ public MetricKey( tmpHash = HashingUtils.addToHash(tmpHash, this.isTraceRoot); tmpHash = HashingUtils.addToHash(tmpHash, this.spanKind); tmpHash = HashingUtils.addToHash(tmpHash, this.peerTags); + tmpHash = HashingUtils.addToHash(tmpHash, this.additionalMetricTags); tmpHash = HashingUtils.addToHash(tmpHash, this.resource); tmpHash = HashingUtils.addToHash(tmpHash, this.service); tmpHash = HashingUtils.addToHash(tmpHash, this.operationName); @@ -134,6 +171,10 @@ public List getPeerTags() { return peerTags; } + public List getAdditionalMetricTags() { + return additionalMetricTags; + } + public UTF8BytesString getHttpMethod() { return httpMethod; } @@ -163,6 +204,7 @@ public boolean equals(Object o) { && isTraceRoot == metricKey.isTraceRoot && spanKind.equals(metricKey.spanKind) && peerTags.equals(metricKey.peerTags) + && additionalMetricTags.equals(metricKey.additionalMetricTags) && Objects.equals(serviceSource, metricKey.serviceSource) && Objects.equals(httpMethod, metricKey.httpMethod) && Objects.equals(httpEndpoint, metricKey.httpEndpoint) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java index 0f84964e9db..7a050f0139b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java @@ -41,6 +41,8 @@ public final class SerializingMetricWriter implements MetricWriter { private static final byte[] IS_TRACE_ROOT = "IsTraceRoot".getBytes(ISO_8859_1); private static final byte[] SPAN_KIND = "SpanKind".getBytes(ISO_8859_1); private static final byte[] PEER_TAGS = "PeerTags".getBytes(ISO_8859_1); + private static final byte[] SPAN_DERIVED_PRIMARY_TAGS = + "SpanDerivedPrimaryTags".getBytes(ISO_8859_1); private static final byte[] HTTP_METHOD = "HTTPMethod".getBytes(ISO_8859_1); private static final byte[] HTTP_ENDPOINT = "HTTPEndpoint".getBytes(ISO_8859_1); private static final byte[] GRPC_STATUS_CODE = "GRPCStatusCode".getBytes(ISO_8859_1); @@ -149,7 +151,7 @@ public void add(MetricKey key, AggregateMetric aggregate) { final boolean hasServiceSource = key.getServiceSource() != null; final boolean hasGrpcStatusCode = key.getGrpcStatusCode() != null; final int mapSize = - 15 + 16 + (hasServiceSource ? 1 : 0) + (hasHttpMethod ? 1 : 0) + (hasHttpEndpoint ? 1 : 0) @@ -189,6 +191,14 @@ public void add(MetricKey key, AggregateMetric aggregate) { writer.writeUTF8(peerTag); } + writer.writeUTF8(SPAN_DERIVED_PRIMARY_TAGS); + final List additionalMetricTags = key.getAdditionalMetricTags(); + writer.startArray(additionalMetricTags.size()); + + for (UTF8BytesString additionalMetricTag : additionalMetricTags) { + writer.writeUTF8(additionalMetricTag); + } + if (hasServiceSource) { writer.writeUTF8(SERVICE_SOURCE); writer.writeUTF8(key.getServiceSource()); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy index c32763d452b..9aaf76bf139 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy @@ -373,6 +373,148 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "server" | [] } + def "should create bucket for each set of additional-metric-tags"() { + setup: + MetricWriter writer = Mock(MetricWriter) + Sink sink = Stub(Sink) + DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) + features.supportsMetrics() >> true + features.peerTags() >> [] + features.additionalMetricTags() >> (["region", "priority"] as Set) + ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) + aggregator.start() + + when: + CountDownLatch latch = new CountDownLatch(1) + aggregator.publish([ + new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK) + .setTag(SPAN_KIND, "server").setTag("region", "eu-west-1").setTag("priority", "high"), + new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK) + .setTag(SPAN_KIND, "server").setTag("region", "us-east-1").setTag("priority", "high") + ]) + aggregator.report() + def latchTriggered = latch.await(2, SECONDS) + + then: + latchTriggered + 1 * writer.startBucket(2, _, _) + 1 * writer.add( + new MetricKey( + "resource", + "service", + "operation", + null, + "type", + HTTP_OK, + false, + false, + "server", + [], + [UTF8BytesString.create("priority:high"), UTF8BytesString.create("region:eu-west-1")], + null, + null, + null + ), { AggregateMetric aggregateMetric -> + aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 + }) + 1 * writer.add( + new MetricKey( + "resource", + "service", + "operation", + null, + "type", + HTTP_OK, + false, + false, + "server", + [], + [UTF8BytesString.create("priority:high"), UTF8BytesString.create("region:us-east-1")], + null, + null, + null + ), { AggregateMetric aggregateMetric -> + aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 + }) + 1 * writer.finishBucket() >> { latch.countDown() } + + cleanup: + aggregator.close() + } + + def "should sort additional-metric-tags and keep empty buckets stable"() { + setup: + MetricWriter writer = Mock(MetricWriter) + Sink sink = Stub(Sink) + DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) + features.supportsMetrics() >> true + features.peerTags() >> [] + features.additionalMetricTags() >> new LinkedHashSet(["priority", "region"]) + ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) + aggregator.start() + + when: + CountDownLatch latch = new CountDownLatch(1) + aggregator.publish([ + new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK) + .setTag(SPAN_KIND, "server").setTag("region", "eu-west-1").setTag("priority", "high"), + new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK) + .setTag(SPAN_KIND, "server"), + new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK) + .setTag(SPAN_KIND, "server") + ]) + aggregator.report() + def latchTriggered = latch.await(2, SECONDS) + + then: + latchTriggered + 1 * writer.startBucket(2, _, _) + 1 * writer.add( + new MetricKey( + "resource", + "service", + "operation", + null, + "type", + HTTP_OK, + false, + false, + "server", + [], + [UTF8BytesString.create("priority:high"), UTF8BytesString.create("region:eu-west-1")], + null, + null, + null + ), { AggregateMetric aggregateMetric -> + aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 + }) + 1 * writer.add( + new MetricKey( + "resource", + "service", + "operation", + null, + "type", + HTTP_OK, + false, + false, + "server", + [], + [], + null, + null, + null + ), { AggregateMetric aggregateMetric -> + aggregateMetric.getHitCount() == 2 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 200 + }) + 1 * writer.finishBucket() >> { latch.countDown() } + + cleanup: + aggregator.close() + } + def "measured spans do not contribute to top level count"() { setup: MetricWriter writer = Mock(MetricWriter) diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy index 3ff81de9851..2de01367f3e 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy @@ -111,6 +111,7 @@ class SerializingMetricWriterTest extends DDSpecification { true, "server", [], + [UTF8BytesString.create("priority:high"), UTF8BytesString.create("region:eu-west-1")], "GET", "/api/users/:id", null @@ -307,7 +308,7 @@ class SerializingMetricWriterTest extends DDSpecification { boolean hasHttpEndpoint = key.getHttpEndpoint() != null boolean hasServiceSource = key.getServiceSource() != null boolean hasGrpcStatusCode = key.getGrpcStatusCode() != null - int expectedMapSize = 15 + (hasServiceSource ? 1 : 0) + (hasHttpMethod ? 1 : 0) + (hasHttpEndpoint ? 1 : 0) + (hasGrpcStatusCode ? 1 : 0) + int expectedMapSize = 16 + (hasServiceSource ? 1 : 0) + (hasHttpMethod ? 1 : 0) + (hasHttpEndpoint ? 1 : 0) + (hasGrpcStatusCode ? 1 : 0) assert metricMapSize == expectedMapSize int elementCount = 0 assert unpacker.unpackString() == "Name" @@ -342,6 +343,14 @@ class SerializingMetricWriterTest extends DDSpecification { assert unpackedPeerTag == key.getPeerTags()[i].toString() } ++elementCount + assert unpacker.unpackString() == "SpanDerivedPrimaryTags" + int additionalMetricTagsLength = unpacker.unpackArrayHeader() + assert additionalMetricTagsLength == key.getAdditionalMetricTags().size() + for (int i = 0; i < additionalMetricTagsLength; i++) { + def unpackedAdditionalMetricTag = unpacker.unpackString() + assert unpackedAdditionalMetricTag == key.getAdditionalMetricTags()[i].toString() + } + ++elementCount // Service source is only present when the service name has been overridden by the tracer if (hasServiceSource) { assert unpacker.unpackString() == "srv_src" @@ -457,6 +466,62 @@ class SerializingMetricWriterTest extends DDSpecification { sink.validatedInput() } + def "SpanDerivedPrimaryTags field is present in payload even when empty"() { + setup: + long startTime = MILLISECONDS.toNanos(System.currentTimeMillis()) + long duration = SECONDS.toNanos(10) + WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") + + def keyWithTags = new MetricKey( + "resource", + "service", + "operation", + null, + "type", + 200, + false, + false, + "server", + [], + [UTF8BytesString.create("priority:high"), UTF8BytesString.create("region:eu-west-1")], + null, + null, + null) + def keyWithoutTags = new MetricKey( + "resource", + "service", + "operation", + null, + "type", + 200, + false, + false, + "server", + [], + [], + null, + null, + null) + + def content = [ + Pair.of(keyWithTags, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), + Pair.of(keyWithoutTags, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))) + ] + + ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content) + SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128) + + when: + writer.startBucket(content.size(), startTime, duration) + for (Pair pair : content) { + writer.add(pair.getLeft(), pair.getRight()) + } + writer.finishBucket() + + then: + sink.validatedInput() + } + def "HTTPMethod and HTTPEndpoint fields are optional in payload"() { setup: long startTime = MILLISECONDS.toNanos(System.currentTimeMillis()) From 9d2e5b52691b5169fa15490242b24d62fc14c06c Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Tue, 31 Mar 2026 17:15:46 -0400 Subject: [PATCH 2/2] Update implementation to cut out agent --- .../ddagent/DDAgentFeaturesDiscovery.java | 12 --- .../DDAgentFeaturesDiscoveryTest.groovy | 16 ---- ...gent-info-with-additional-metric-tags.json | 70 ---------------- .../trace/api/config/GeneralConfig.java | 1 + .../metrics/ConflatingMetricsAggregator.java | 84 ++++++++++++++++++- .../ConflatingMetricAggregatorTest.groovy | 4 +- .../main/java/datadog/trace/api/Config.java | 7 ++ .../datadog/trace/api/ConfigTest.groovy | 24 ++++++ metadata/supported-configurations.json | 8 ++ 9 files changed, 125 insertions(+), 101 deletions(-) delete mode 100644 communication/src/test/resources/agent-features/agent-info-with-additional-metric-tags.json diff --git a/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java b/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java index 33075891145..755094cc2e4 100644 --- a/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java +++ b/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java @@ -22,7 +22,6 @@ import java.nio.ByteBuffer; import java.security.NoSuchAlgorithmException; import java.util.HashSet; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -99,7 +98,6 @@ private static class State { String version; String telemetryProxyEndpoint; Set peerTags = emptySet(); - Set additionalMetricTags = emptySet(); long lastTimeDiscovered; } @@ -310,12 +308,6 @@ private boolean processInfoResponse(State newState, String response) { peer_tags instanceof List ? unmodifiableSet(new HashSet<>((List) peer_tags)) : emptySet(); - - Object span_derived_primary_tags = map.get("span_derived_primary_tags"); - newState.additionalMetricTags = - span_derived_primary_tags instanceof List - ? unmodifiableSet(new LinkedHashSet<>((List) span_derived_primary_tags)) - : emptySet(); } try { newState.state = Strings.sha256(response); @@ -404,10 +396,6 @@ public Set peerTags() { return discoveryState.peerTags; } - public Set additionalMetricTags() { - return discoveryState.additionalMetricTags; - } - public String getMetricsEndpoint() { return discoveryState.metricsEndpoint; } diff --git a/communication/src/test/groovy/datadog/communication/ddagent/DDAgentFeaturesDiscoveryTest.groovy b/communication/src/test/groovy/datadog/communication/ddagent/DDAgentFeaturesDiscoveryTest.groovy index ac0f7f308fa..505595e55e7 100644 --- a/communication/src/test/groovy/datadog/communication/ddagent/DDAgentFeaturesDiscoveryTest.groovy +++ b/communication/src/test/groovy/datadog/communication/ddagent/DDAgentFeaturesDiscoveryTest.groovy @@ -37,8 +37,6 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification { static final String INFO_STATE = Strings.sha256(INFO_RESPONSE) static final String INFO_WITH_PEER_TAG_BACK_PROPAGATION_RESPONSE = loadJsonFile("agent-info-with-peer-tag-back-propagation.json") static final String INFO_WITH_PEER_TAG_BACK_PROPAGATION_STATE = Strings.sha256(INFO_WITH_PEER_TAG_BACK_PROPAGATION_RESPONSE) - static final String INFO_WITH_ADDITIONAL_METRIC_TAGS_RESPONSE = loadJsonFile("agent-info-with-additional-metric-tags.json") - static final String INFO_WITH_ADDITIONAL_METRIC_TAGS_STATE = Strings.sha256(INFO_WITH_ADDITIONAL_METRIC_TAGS_RESPONSE) static final String INFO_WITH_CLIENT_DROPPING_RESPONSE = loadJsonFile("agent-info-with-client-dropping.json") static final String INFO_WITH_CLIENT_DROPPING_STATE = Strings.sha256(INFO_WITH_CLIENT_DROPPING_RESPONSE) static final String INFO_WITHOUT_METRICS_RESPONSE = loadJsonFile("agent-info-without-metrics.json") @@ -509,20 +507,6 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification { ) } - def "test parse /info response with additional-metric-tags"() { - setup: - OkHttpClient client = Mock(OkHttpClient) - DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, true, true) - - when: "/info available with additional-metric-tags" - features.discover() - - then: - 1 * client.newCall(_) >> { Request request -> infoResponse(request, INFO_WITH_ADDITIONAL_METRIC_TAGS_RESPONSE) } - features.state() == INFO_WITH_ADDITIONAL_METRIC_TAGS_STATE - features.additionalMetricTags() == ["region", "priority"] as Set - } - def "test metrics disabled for agent version below 7.65"() { setup: OkHttpClient client = Mock(OkHttpClient) diff --git a/communication/src/test/resources/agent-features/agent-info-with-additional-metric-tags.json b/communication/src/test/resources/agent-features/agent-info-with-additional-metric-tags.json deleted file mode 100644 index af971f01991..00000000000 --- a/communication/src/test/resources/agent-features/agent-info-with-additional-metric-tags.json +++ /dev/null @@ -1,70 +0,0 @@ -{ - "version": "7.67.0", - "git_commit": "bdf863ccc9", - "endpoints": [ - "/v0.3/traces", - "/v0.3/services", - "/v0.4/traces", - "/v0.4/services", - "/v0.5/traces", - "/v0.7/traces", - "/profiling/v1/input", - "/telemetry/proxy/", - "/v0.6/stats", - "/v0.1/pipeline_stats", - "/openlineage/api/v1/lineage", - "/evp_proxy/v1/", - "/evp_proxy/v2/", - "/evp_proxy/v3/", - "/evp_proxy/v4/", - "/debugger/v1/input", - "/debugger/v1/diagnostics", - "/symdb/v1/input", - "/dogstatsd/v1/proxy", - "/dogstatsd/v2/proxy", - "/tracer_flare/v1", - "/config/set" - ], - "client_drop_p0s": true, - "span_meta_structs": true, - "long_running_spans": true, - "span_events": true, - "config": { - "default_env": "prod", - "target_tps": 10, - "max_eps": 200, - "receiver_port": 8127, - "receiver_socket": "/var/run/datadog/apm.socket", - "connection_limit": 12, - "receiver_timeout": 100, - "max_request_bytes": 26214400, - "statsd_port": 8125, - "max_memory": 0, - "max_cpu": 0, - "analyzed_spans_by_service": {}, - "obfuscation": { - "elastic_search": true, - "mongo": true, - "sql_exec_plan": true, - "sql_exec_plan_normalize": true, - "http": { - "remove_query_string": true, - "remove_path_digits": true - }, - "remove_stack_traces": false, - "redis": true, - "memcached": false - } - }, - "span_derived_primary_tags": [ - "region", - "priority" - ], - "span_kinds_stats_computed": [ - "server", - "consumer", - "client", - "producer" - ], - "obfuscation_version": 1 -} diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java index 60e94418f09..15b6a811d4e 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java @@ -70,6 +70,7 @@ public final class GeneralConfig { "trace.tracer.metrics.buffering.enabled"; public static final String TRACER_METRICS_MAX_AGGREGATES = "trace.tracer.metrics.max.aggregates"; public static final String TRACER_METRICS_MAX_PENDING = "trace.tracer.metrics.max.pending"; + public static final String APM_ADDITIONAL_METRIC_TAGS = "apm.additional.metric.tags"; public static final String TRACER_METRICS_IGNORED_RESOURCES = "trace.tracer.metrics.ignored.resources"; diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 498acef47b7..50fcfe81c8b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -114,11 +114,13 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve private final long reportingInterval; private final TimeUnit reportingIntervalTimeUnit; private final DDAgentFeaturesDiscovery features; + private final Set additionalMetricTags; private final HealthMetrics healthMetrics; private final boolean includeEndpointInMetrics; private volatile AgentTaskScheduler.Scheduled cancellation; + // TODO: Refactor to one / fewer constructors? public ConflatingMetricsAggregator( Config config, SharedCommunicationObjects sharedCommunicationObjects, @@ -126,6 +128,7 @@ public ConflatingMetricsAggregator( this( config.getWellKnownTags(), config.getMetricsIgnoredResources(), + config.getAdditionalMetricTags(), sharedCommunicationObjects.featuresDiscovery(config), healthMetrics, new OkHttpSink( @@ -152,6 +155,7 @@ public ConflatingMetricsAggregator( this( wellKnownTags, ignoredResources, + Collections.emptySet(), features, healthMetric, sink, @@ -174,7 +178,58 @@ public ConflatingMetricsAggregator( TimeUnit timeUnit, boolean includeEndpointInMetrics) { this( + wellKnownTags, ignoredResources, + Collections.emptySet(), + features, + healthMetric, + sink, + maxAggregates, + queueSize, + reportingInterval, + timeUnit, + includeEndpointInMetrics); + } + + ConflatingMetricsAggregator( + WellKnownTags wellKnownTags, + Set ignoredResources, + Set additionalMetricTags, + DDAgentFeaturesDiscovery features, + HealthMetrics healthMetric, + Sink sink, + int maxAggregates, + int queueSize, + boolean includeEndpointInMetrics) { + this( + wellKnownTags, + ignoredResources, + additionalMetricTags, + features, + healthMetric, + sink, + maxAggregates, + queueSize, + 10, + SECONDS, + includeEndpointInMetrics); + } + + ConflatingMetricsAggregator( + WellKnownTags wellKnownTags, + Set ignoredResources, + Set additionalMetricTags, + DDAgentFeaturesDiscovery features, + HealthMetrics healthMetric, + Sink sink, + int maxAggregates, + int queueSize, + long reportingInterval, + TimeUnit timeUnit, + boolean includeEndpointInMetrics) { + this( + ignoredResources, + additionalMetricTags, features, healthMetric, sink, @@ -197,7 +252,35 @@ public ConflatingMetricsAggregator( long reportingInterval, TimeUnit timeUnit, boolean includeEndpointInMetrics) { + this( + ignoredResources, + Collections.emptySet(), + features, + healthMetric, + sink, + metricWriter, + maxAggregates, + queueSize, + reportingInterval, + timeUnit, + includeEndpointInMetrics); + } + + ConflatingMetricsAggregator( + Set ignoredResources, + Set additionalMetricTags, + DDAgentFeaturesDiscovery features, + HealthMetrics healthMetric, + Sink sink, + MetricWriter metricWriter, + int maxAggregates, + int queueSize, + long reportingInterval, + TimeUnit timeUnit, + boolean includeEndpointInMetrics) { this.ignoredResources = ignoredResources; + this.additionalMetricTags = + additionalMetricTags == null ? Collections.emptySet() : additionalMetricTags; this.includeEndpointInMetrics = includeEndpointInMetrics; this.inbox = Queues.mpscArrayQueue(queueSize); this.batchPool = Queues.spmcArrayQueue(maxAggregates); @@ -426,7 +509,6 @@ private List getPeerTags(CoreSpan span, String spanKind) { // TODO: This method is very similar to getPeerTags. We can probably consolidate to a helper. private List getAdditionalMetricTags(CoreSpan span) { - Set additionalMetricTags = features.additionalMetricTags(); if (additionalMetricTags == null || additionalMetricTags.isEmpty()) { return Collections.emptyList(); } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy index 9aaf76bf139..799f0bbb1ee 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy @@ -380,8 +380,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - features.additionalMetricTags() >> (["region", "priority"] as Set) ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ["region", "priority"] as Set, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -450,8 +450,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - features.additionalMetricTags() >> new LinkedHashSet(["priority", "region"]) ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + new LinkedHashSet(["priority", "region"]), features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index fcaac7a9b55..b0b5ad4184c 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -346,6 +346,7 @@ import static datadog.trace.api.config.GeneralConfig.AGENTLESS_LOG_SUBMISSION_URL; import static datadog.trace.api.config.GeneralConfig.API_KEY; import static datadog.trace.api.config.GeneralConfig.API_KEY_FILE; +import static datadog.trace.api.config.GeneralConfig.APM_ADDITIONAL_METRIC_TAGS; import static datadog.trace.api.config.GeneralConfig.APPLICATION_KEY; import static datadog.trace.api.config.GeneralConfig.APPLICATION_KEY_FILE; import static datadog.trace.api.config.GeneralConfig.APP_KEY; @@ -952,6 +953,7 @@ public static String getHostName() { private final boolean tracerMetricsBufferingEnabled; private final int tracerMetricsMaxAggregates; private final int tracerMetricsMaxPending; + private final Set additionalMetricTags; private final boolean reportHostName; @@ -2053,6 +2055,7 @@ private Config(final ConfigProvider configProvider, final InstrumenterConfig ins configProvider.getBoolean(TRACER_METRICS_BUFFERING_ENABLED, false); tracerMetricsMaxAggregates = configProvider.getInteger(TRACER_METRICS_MAX_AGGREGATES, 2048); tracerMetricsMaxPending = configProvider.getInteger(TRACER_METRICS_MAX_PENDING, 2048); + additionalMetricTags = tryMakeImmutableSet(configProvider.getList(APM_ADDITIONAL_METRIC_TAGS)); reportHostName = configProvider.getBoolean(TRACE_REPORT_HOSTNAME, DEFAULT_TRACE_REPORT_HOSTNAME); @@ -3600,6 +3603,10 @@ public int getTracerMetricsMaxPending() { return tracerMetricsMaxPending; } + public Set getAdditionalMetricTags() { + return additionalMetricTags; + } + public boolean isLogsInjectionEnabled() { return logsInjectionEnabled; } diff --git a/internal-api/src/test/groovy/datadog/trace/api/ConfigTest.groovy b/internal-api/src/test/groovy/datadog/trace/api/ConfigTest.groovy index faa4d04311e..6aa71231f50 100644 --- a/internal-api/src/test/groovy/datadog/trace/api/ConfigTest.groovy +++ b/internal-api/src/test/groovy/datadog/trace/api/ConfigTest.groovy @@ -38,6 +38,7 @@ import static datadog.trace.api.config.DebuggerConfig.DYNAMIC_INSTRUMENTATION_VE import static datadog.trace.api.config.DebuggerConfig.EXCEPTION_REPLAY_ENABLED import static datadog.trace.api.config.GeneralConfig.API_KEY import static datadog.trace.api.config.GeneralConfig.API_KEY_FILE +import static datadog.trace.api.config.GeneralConfig.APM_ADDITIONAL_METRIC_TAGS import static datadog.trace.api.config.GeneralConfig.CONFIGURATION_FILE import static datadog.trace.api.config.GeneralConfig.ENV import static datadog.trace.api.config.GeneralConfig.GLOBAL_TAGS @@ -161,6 +162,7 @@ import datadog.trace.util.throwable.FatalAgentMisconfigurationError class ConfigTest extends DDSpecification { private static final String PREFIX = "dd." private static final DD_API_KEY_ENV = "DD_API_KEY" + private static final DD_APM_ADDITIONAL_METRIC_TAGS_ENV = "DD_APM_ADDITIONAL_METRIC_TAGS" private static final DD_SERVICE_NAME_ENV = "DD_SERVICE_NAME" private static final DD_TRACE_ENABLED_ENV = "DD_TRACE_ENABLED" private static final DD_WRITER_TYPE_ENV = "DD_WRITER_TYPE" @@ -2466,6 +2468,28 @@ class ConfigTest extends DDSpecification { config.getMetricsIgnoredResources() == ["GET /healthcheck", "SELECT foo from bar"].toSet() } + def "test get additional metric tags from system property"() { + setup: + System.setProperty(PREFIX + APM_ADDITIONAL_METRIC_TAGS, "region, priority, region") + + when: + def config = new Config() + + then: + config.getAdditionalMetricTags() == ["region", "priority"] as Set + } + + def "test get additional metric tags from env var"() { + setup: + environmentVariables.set(DD_APM_ADDITIONAL_METRIC_TAGS_ENV, "region, priority, region") + + when: + def config = new Config() + + then: + config.getAdditionalMetricTags() == ["region", "priority"] as Set + } + def "appsec state with sys = #sys env = #env"() { setup: if (sys != null) { diff --git a/metadata/supported-configurations.json b/metadata/supported-configurations.json index 6b00677813f..6fed3a8c228 100644 --- a/metadata/supported-configurations.json +++ b/metadata/supported-configurations.json @@ -185,6 +185,14 @@ "aliases": [] } ], + "DD_APM_ADDITIONAL_METRIC_TAGS": [ + { + "version": "A", + "type": "string", + "default": null, + "aliases": [] + } + ], "DD_APM_TRACING_ENABLED": [ { "version": "A",