From 9cd3b0fc5f59c2c86c3b5e2483fd663d8ca2335a Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Thu, 21 May 2026 12:18:37 -0600 Subject: [PATCH 1/2] Report Kafka client connection status in DSM payloads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, the DSM payload only carried Kafka client configs once `Metadata.update` fired with a valid cluster ID — so clients that never authenticated or never reached a broker were silently dropped, and we couldn't compare their configs against working clients. Now every config is also reported with a `connectionStatus` field ("connected" / "failed") on the per-bucket `Configs` entry, including on `Metadata.failedUpdate`. Also expands the value-allowlist with non-secret auth selectors (`sasl.mechanism`, `ssl.protocol`, `ssl.endpoint.identification.algorithm`, etc.) so the comparison flow can surface mechanism typos without leaking credentials. tag: ai generated Co-Authored-By: Claude Opus 4.7 (1M context) --- .../MetadataInstrumentation.java | 18 ++++ .../MetadataInstrumentation.java | 2 + .../MetadataFailedUpdateAdvice.java | 23 +++++ .../kafka_common/KafkaConfigHelper.java | 30 ++++++- .../kafka_common/PendingConfig.java | 5 +- .../DefaultDataStreamsMonitoring.java | 7 +- .../MsgPackDatastreamsPayloadWriter.java | 6 +- .../datastreams/DataStreamsWritingTest.java | 16 ++-- .../DefaultDataStreamsMonitoringTest.java | 83 ++++++++++++++----- .../AgentDataStreamsMonitoring.java | 9 +- .../api/datastreams/KafkaConfigReport.java | 11 ++- .../NoopDataStreamsMonitoring.java | 6 +- 12 files changed, 182 insertions(+), 34 deletions(-) create mode 100644 dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/MetadataFailedUpdateAdvice.java diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/MetadataInstrumentation.java b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/MetadataInstrumentation.java index d6acfe30369..ed95968312f 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/MetadataInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/MetadataInstrumentation.java @@ -80,6 +80,9 @@ public void methodAdvice(MethodTransformer transformer) { .and(named("update")) .and(takesArgument(1, named("org.apache.kafka.common.requests.MetadataResponse"))), MetadataInstrumentation.class.getName() + "$MetadataUpdate22AndAfterAdvice"); + transformer.applyAdvice( + isMethod().and(named("failedUpdate")), + MetadataInstrumentation.class.getName() + "$FailedUpdateAdvice"); } public static class MetadataUpdateBefore22Advice { @@ -103,6 +106,21 @@ public static void muzzleCheck(ConsumerRecord record) { } } + public static class FailedUpdateAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter(@Advice.This final Metadata metadata) { + MetadataState state = + InstrumentationContext.get(Metadata.class, MetadataState.class).get(metadata); + if (state != null) { + KafkaConfigHelper.reportPendingConfigAsFailed(state); + } + } + + public static void muzzleCheck(ConsumerRecord record) { + record.headers(); + } + } + public static class MetadataUpdate22AndAfterAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static void onEnter( diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/MetadataInstrumentation.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/MetadataInstrumentation.java index 3907ad0c18c..a29b8b69f94 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/MetadataInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/MetadataInstrumentation.java @@ -66,5 +66,7 @@ public void methodAdvice(MethodTransformer transformer) { .and(named("update")) .and(takesArgument(1, named("org.apache.kafka.common.requests.MetadataResponse"))), packageName + ".MetadataUpdate22AndAfterAdvice"); + transformer.applyAdvice( + isMethod().and(named("failedUpdate")), packageName + ".MetadataFailedUpdateAdvice"); } } diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/MetadataFailedUpdateAdvice.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/MetadataFailedUpdateAdvice.java new file mode 100644 index 00000000000..5e0b4c7ceac --- /dev/null +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/MetadataFailedUpdateAdvice.java @@ -0,0 +1,23 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.instrumentation.kafka_common.KafkaConfigHelper; +import datadog.trace.instrumentation.kafka_common.MetadataState; +import net.bytebuddy.asm.Advice; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +public class MetadataFailedUpdateAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter(@Advice.This final Metadata metadata) { + MetadataState state = + InstrumentationContext.get(Metadata.class, MetadataState.class).get(metadata); + if (state != null) { + KafkaConfigHelper.reportPendingConfigAsFailed(state); + } + } + + public static void muzzleCheck(ConsumerRecord record) { + record.headers(); + } +} diff --git a/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/KafkaConfigHelper.java b/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/KafkaConfigHelper.java index b9c5089f70a..4d5e83d14e2 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/KafkaConfigHelper.java +++ b/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/KafkaConfigHelper.java @@ -43,6 +43,17 @@ public class KafkaConfigHelper { "socket.connection.setup.timeout.ms", "socket.connection.setup.timeout.max.ms", "security.protocol", + // Non-secret auth selectors — values name the mechanism / algorithm in use, + // never carry credentials. Useful for spotting typos like SCRAM-SHA-256 vs -512. + "sasl.mechanism", + "sasl.kerberos.service.name", + "sasl.login.callback.handler.class", + "ssl.protocol", + "ssl.enabled.protocols", + "ssl.endpoint.identification.algorithm", + "ssl.truststore.type", + "ssl.keystore.type", + "ssl.cipher.suites", "metrics.sample.window.ms", "metrics.num.samples", "metrics.recording.level", @@ -107,13 +118,28 @@ public static void storePendingConsumerConfig( /** Called from metadata update advice when the cluster ID becomes available. */ public static void reportPendingConfig(MetadataState state, String clusterId) { + reportPendingConfig(state, clusterId, PendingConfig.STATUS_CONNECTED); + } + + /** Called from failure advice when the client cannot reach / authenticate to the cluster. */ + public static void reportPendingConfigAsFailed(MetadataState state) { + // clusterId may be unknown on auth/connect failure — emit with whatever we have (often "") + reportPendingConfig(state, state.clusterId, PendingConfig.STATUS_FAILED); + } + + private static void reportPendingConfig(MetadataState state, String clusterId, String status) { PendingConfig pending = state.takePendingConfig(); if (pending != null) { - log.debug("Received cluster ID, reporting {} config", pending.type); + log.debug("Reporting {} config with status={}", pending.type, status); if (Config.get().isDataStreamsEnabled()) { AgentTracer.get() .getDataStreamsMonitoring() - .reportKafkaConfig(pending.type, clusterId, pending.consumerGroup, pending.config); + .reportKafkaConfig( + pending.type, + clusterId != null ? clusterId : "", + pending.consumerGroup, + pending.config, + status); } } } diff --git a/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/PendingConfig.java b/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/PendingConfig.java index e0798d00ecf..f5eb143d6fb 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/PendingConfig.java +++ b/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/PendingConfig.java @@ -2,8 +2,11 @@ import java.util.Map; -/** Holds pending Kafka config info until the cluster ID becomes available from metadata. */ +/** Holds pending Kafka config info until the client's connection lifecycle resolves. */ public class PendingConfig { + public static final String STATUS_CONNECTED = "connected"; + public static final String STATUS_FAILED = "failed"; + public final String type; public final String consumerGroup; public final Map config; diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index 32db178bc1a..95146f2fe12 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -293,13 +293,18 @@ public void reportSchemaRegistryUsage( @Override public void reportKafkaConfig( - String type, String kafkaClusterId, String consumerGroup, Map config) { + String type, + String kafkaClusterId, + String consumerGroup, + Map config, + String connectionStatus) { inbox.offer( new KafkaConfigReport( type, kafkaClusterId, consumerGroup, config, + connectionStatus, timeSource.getCurrentTimeNanos(), getThreadServiceName())); } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java index 6b7c5ad32ea..67700d55423 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java @@ -58,6 +58,7 @@ public class MsgPackDatastreamsPayloadWriter implements DatastreamsPayloadWriter private static final byte[] CONFIG_KAFKA_CLUSTER_ID = "KafkaClusterId".getBytes(ISO_8859_1); private static final byte[] CONFIG_CONSUMER_GROUP = "ConsumerGroup".getBytes(ISO_8859_1); private static final byte[] CONFIG_ENTRIES = "Config".getBytes(ISO_8859_1); + private static final byte[] CONFIG_CONNECTION_STATUS = "ConnectionStatus".getBytes(ISO_8859_1); private static final int INITIAL_CAPACITY = 512 * 1024; @@ -290,7 +291,7 @@ private void writeKafkaConfigs(List configs, Writable packer) packer.writeUTF8(CONFIGS); packer.startArray(configs.size()); for (KafkaConfigReport config : configs) { - packer.startMap(4); // Type, KafkaClusterId, ConsumerGroup, Config + packer.startMap(5); // Type, KafkaClusterId, ConsumerGroup, ConnectionStatus, Config packer.writeUTF8(CONFIG_TYPE); packer.writeString(config.getType(), null); @@ -301,6 +302,9 @@ private void writeKafkaConfigs(List configs, Writable packer) packer.writeUTF8(CONFIG_CONSUMER_GROUP); packer.writeString(config.getConsumerGroup(), null); + packer.writeUTF8(CONFIG_CONNECTION_STATUS); + packer.writeString(config.getConnectionStatus(), null); + packer.writeUTF8(CONFIG_ENTRIES); Map entries = config.getConfig(); packer.startMap(entries.size()); diff --git a/dd-trace-core/src/test/java/datadog/trace/core/datastreams/DataStreamsWritingTest.java b/dd-trace-core/src/test/java/datadog/trace/core/datastreams/DataStreamsWritingTest.java index 52ad1a11447..4aaf1db72bb 100644 --- a/dd-trace-core/src/test/java/datadog/trace/core/datastreams/DataStreamsWritingTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/core/datastreams/DataStreamsWritingTest.java @@ -296,13 +296,13 @@ void writeKafkaConfigsToMockServer() throws InterruptedException, IOException { producerConfig.put("bootstrap.servers", "localhost:9092"); producerConfig.put("acks", "all"); producerConfig.put("linger.ms", "5"); - dataStreams.reportKafkaConfig("kafka_producer", "", "", producerConfig); + dataStreams.reportKafkaConfig("kafka_producer", "", "", producerConfig, "connected"); Map consumerConfig = new HashMap<>(); consumerConfig.put("bootstrap.servers", "localhost:9092"); consumerConfig.put("group.id", "test-group"); consumerConfig.put("auto.offset.reset", "earliest"); - dataStreams.reportKafkaConfig("kafka_consumer", "", "test-group", consumerConfig); + dataStreams.reportKafkaConfig("kafka_consumer", "", "test-group", consumerConfig, "connected"); // Also add a stats point so the bucket is not empty of stats dataStreams.add( @@ -355,8 +355,8 @@ void duplicateKafkaConfigsAreEachSerializedInPayload() throws InterruptedExcepti Map producerConfig = new HashMap<>(); producerConfig.put("bootstrap.servers", "localhost:9092"); producerConfig.put("acks", "all"); - dataStreams.reportKafkaConfig("kafka_producer", "", "", producerConfig); - dataStreams.reportKafkaConfig("kafka_producer", "", "", producerConfig); + dataStreams.reportKafkaConfig("kafka_producer", "", "", producerConfig, "connected"); + dataStreams.reportKafkaConfig("kafka_producer", "", "", producerConfig, "connected"); // Also add a stats point so the bucket has content dataStreams.add( @@ -407,13 +407,15 @@ private void validateKafkaConfigMessage(byte[] message) throws IOException { // Collect configs in a map keyed by type Map> configsByType = new HashMap<>(); for (int n = 0; n < numConfigs; n++) { - assertEquals(4, unpacker.unpackMapHeader()); + assertEquals(5, unpacker.unpackMapHeader()); assertEquals("Type", unpacker.unpackString()); String type = unpacker.unpackString(); assertEquals("KafkaClusterId", unpacker.unpackString()); unpacker.unpackString(); // skip cluster id value assertEquals("ConsumerGroup", unpacker.unpackString()); unpacker.unpackString(); // skip consumer group value + assertEquals("ConnectionStatus", unpacker.unpackString()); + assertEquals("connected", unpacker.unpackString()); assertEquals("Config", unpacker.unpackString()); int configSize = unpacker.unpackMapHeader(); Map configEntries = new HashMap<>(); @@ -481,13 +483,15 @@ private void validateDuplicateKafkaConfigMessage(byte[] message) throws IOExcept assertEquals(2, numConfigs); for (int n = 0; n < numConfigs; n++) { - assertEquals(4, unpacker.unpackMapHeader()); + assertEquals(5, unpacker.unpackMapHeader()); assertEquals("Type", unpacker.unpackString()); assertEquals("kafka_producer", unpacker.unpackString()); assertEquals("KafkaClusterId", unpacker.unpackString()); unpacker.unpackString(); // skip cluster id value assertEquals("ConsumerGroup", unpacker.unpackString()); unpacker.unpackString(); // skip consumer group value + assertEquals("ConnectionStatus", unpacker.unpackString()); + assertEquals("connected", unpacker.unpackString()); assertEquals("Config", unpacker.unpackString()); int configSize = unpacker.unpackMapHeader(); Map configEntries = new HashMap<>(); diff --git a/dd-trace-core/src/test/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.java b/dd-trace-core/src/test/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.java index c1c29026c49..6148aa6c7fc 100644 --- a/dd-trace-core/src/test/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.java @@ -1087,7 +1087,7 @@ void kafkaProducerConfigIsReportedInBucket() throws InterruptedException { Map kafkaConfig = new HashMap<>(); kafkaConfig.put("bootstrap.servers", "localhost:9092"); kafkaConfig.put("acks", "all"); - dataStreams.reportKafkaConfig("kafka_producer", "", "", kafkaConfig); + dataStreams.reportKafkaConfig("kafka_producer", "", "", kafkaConfig, "connected"); timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS); dataStreams.report(); @@ -1100,6 +1100,48 @@ void kafkaProducerConfigIsReportedInBucket() throws InterruptedException { assertEquals("kafka_producer", report.getType()); assertEquals("localhost:9092", report.getConfig().get("bootstrap.servers")); assertEquals("all", report.getConfig().get("acks")); + assertEquals("connected", report.getConnectionStatus()); + + // cleanup + payloadWriter.close(); + dataStreams.close(); + } + + @Test + void kafkaProducerConfigWithFailedStatusIsReportedInBucket() throws InterruptedException { + DDAgentFeaturesDiscovery features = stubFeatures(true); + ControllableTimeSource timeSource = new ControllableTimeSource(); + Sink sink = mock(Sink.class); + CapturingPayloadWriter payloadWriter = new CapturingPayloadWriter(); + TraceConfig traceConfig = stubTraceConfig(true); + + DefaultDataStreamsMonitoring dataStreams = + new DefaultDataStreamsMonitoring( + sink, + features, + timeSource, + () -> traceConfig, + payloadWriter, + DEFAULT_BUCKET_DURATION_NANOS); + dataStreams.start(); + Map kafkaConfig = new HashMap<>(); + kafkaConfig.put("bootstrap.servers", "localhost:9092"); + kafkaConfig.put("sasl.mechanism", "SCRAM-SHA-512"); + // clusterId is empty when the client never reached the broker + dataStreams.reportKafkaConfig("kafka_producer", "", "", kafkaConfig, "failed"); + timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS); + dataStreams.report(); + + awaitBuckets(dataStreams, payloadWriter, 1); + + StatsBucket bucket = payloadWriter.buckets.get(0); + List kafkaConfigs = bucket.getKafkaConfigs(); + assertEquals(1, kafkaConfigs.size()); + KafkaConfigReport report = kafkaConfigs.get(0); + assertEquals("kafka_producer", report.getType()); + assertEquals("", report.getKafkaClusterId()); + assertEquals("failed", report.getConnectionStatus()); + assertEquals("SCRAM-SHA-512", report.getConfig().get("sasl.mechanism")); // cleanup payloadWriter.close(); @@ -1127,7 +1169,7 @@ void kafkaConsumerConfigIsReportedInBucket() throws InterruptedException { kafkaConfig.put("bootstrap.servers", "localhost:9092"); kafkaConfig.put("group.id", "test-group"); kafkaConfig.put("auto.offset.reset", "earliest"); - dataStreams.reportKafkaConfig("kafka_consumer", "", "test-group", kafkaConfig); + dataStreams.reportKafkaConfig("kafka_consumer", "", "test-group", kafkaConfig, "connected"); timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS); dataStreams.report(); @@ -1171,8 +1213,8 @@ void duplicateKafkaConfigsAreEachReportedInTheBucket() throws InterruptedExcepti Map config2 = new HashMap<>(); config2.put("bootstrap.servers", "localhost:9092"); config2.put("acks", "all"); - dataStreams.reportKafkaConfig("kafka_producer", "", "", config1); - dataStreams.reportKafkaConfig("kafka_producer", "", "", config2); + dataStreams.reportKafkaConfig("kafka_producer", "", "", config1, "connected"); + dataStreams.reportKafkaConfig("kafka_producer", "", "", config2, "connected"); timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS); dataStreams.report(); @@ -1214,7 +1256,7 @@ void kafkaConfigsReportedInSeparateBucketsAppearInEachBucket() throws Interrupte Map kafkaConfig = new HashMap<>(); kafkaConfig.put("bootstrap.servers", "localhost:9092"); kafkaConfig.put("acks", "all"); - dataStreams.reportKafkaConfig("kafka_producer", "", "", kafkaConfig); + dataStreams.reportKafkaConfig("kafka_producer", "", "", kafkaConfig, "connected"); timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS); dataStreams.report(); @@ -1224,7 +1266,7 @@ void kafkaConfigsReportedInSeparateBucketsAppearInEachBucket() throws Interrupte // reporting the same config again in a new bucket payloadWriter.buckets.clear(); - dataStreams.reportKafkaConfig("kafka_producer", "", "", kafkaConfig); + dataStreams.reportKafkaConfig("kafka_producer", "", "", kafkaConfig, "connected"); timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS); dataStreams.report(); @@ -1265,11 +1307,11 @@ void differentKafkaConfigsAreBothReported() throws InterruptedException { Map producerConfig = new HashMap<>(); producerConfig.put("bootstrap.servers", "localhost:9092"); producerConfig.put("acks", "all"); - dataStreams.reportKafkaConfig("kafka_producer", "", "", producerConfig); + dataStreams.reportKafkaConfig("kafka_producer", "", "", producerConfig, "connected"); Map consumerConfig = new HashMap<>(); consumerConfig.put("bootstrap.servers", "localhost:9092"); consumerConfig.put("group.id", "my-group"); - dataStreams.reportKafkaConfig("kafka_consumer", "", "my-group", consumerConfig); + dataStreams.reportKafkaConfig("kafka_consumer", "", "my-group", consumerConfig, "connected"); timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS); dataStreams.report(); @@ -1325,11 +1367,11 @@ void kafkaConfigsWithDifferentValuesForSameTypeAreNotDeduplicated() throws Inter Map config1 = new HashMap<>(); config1.put("bootstrap.servers", "localhost:9092"); config1.put("acks", "all"); - dataStreams.reportKafkaConfig("kafka_producer", "", "", config1); + dataStreams.reportKafkaConfig("kafka_producer", "", "", config1, "connected"); Map config2 = new HashMap<>(); config2.put("bootstrap.servers", "localhost:9093"); config2.put("acks", "1"); - dataStreams.reportKafkaConfig("kafka_producer", "", "", config2); + dataStreams.reportKafkaConfig("kafka_producer", "", "", config2, "connected"); timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS); dataStreams.report(); @@ -1364,7 +1406,7 @@ void kafkaConfigsAreReportedAlongsideStatsPoints() throws InterruptedException { dataStreams.add(new StatsPoint(tags, 1, 2, 3, timeSource.getCurrentTimeNanos(), 0, 0, 0, null)); Map kafkaConfig = new HashMap<>(); kafkaConfig.put("bootstrap.servers", "localhost:9092"); - dataStreams.reportKafkaConfig("kafka_producer", "", "", kafkaConfig); + dataStreams.reportKafkaConfig("kafka_producer", "", "", kafkaConfig, "connected"); timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS); dataStreams.report(); @@ -1415,7 +1457,7 @@ void kafkaConfigsNotReportedWhenDSMIsDisabled(boolean enabledAtAgent, boolean en dataStreams.start(); Map kafkaConfig = new HashMap<>(); kafkaConfig.put("bootstrap.servers", "localhost:9092"); - dataStreams.reportKafkaConfig("kafka_producer", "", "", kafkaConfig); + dataStreams.reportKafkaConfig("kafka_producer", "", "", kafkaConfig, "connected"); timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS); dataStreams.report(); @@ -1446,7 +1488,7 @@ void kafkaConfigsFlushedOnClose() throws InterruptedException { dataStreams.start(); Map kafkaConfig = new HashMap<>(); kafkaConfig.put("bootstrap.servers", "localhost:9092"); - dataStreams.reportKafkaConfig("kafka_producer", "", "", kafkaConfig); + dataStreams.reportKafkaConfig("kafka_producer", "", "", kafkaConfig, "connected"); timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS - 100L); dataStreams.close(); @@ -1476,19 +1518,20 @@ void kafkaConfigReportEqualsAndHashCodeWorkCorrectly() { differentConfig.put("bootstrap.servers", "localhost:9093"); KafkaConfigReport config1 = - new KafkaConfigReport("kafka_producer", "", "", sameConfig, 1000L, null); + new KafkaConfigReport("kafka_producer", "", "", sameConfig, "connected", 1000L, null); // different timestamp KafkaConfigReport config2 = - new KafkaConfigReport("kafka_producer", "", "", sameConfig2, 2000L, null); + new KafkaConfigReport("kafka_producer", "", "", sameConfig2, "connected", 2000L, null); // different type KafkaConfigReport config3 = - new KafkaConfigReport("kafka_consumer", "", "", sameConfig, 1000L, null); + new KafkaConfigReport("kafka_consumer", "", "", sameConfig, "connected", 1000L, null); // different config values KafkaConfigReport config4 = - new KafkaConfigReport("kafka_producer", "", "", differentConfig, 1000L, null); + new KafkaConfigReport("kafka_producer", "", "", differentConfig, "connected", 1000L, null); // different serviceNameOverride KafkaConfigReport config5 = - new KafkaConfigReport("kafka_producer", "", "", sameConfig, 1000L, "other-service"); + new KafkaConfigReport( + "kafka_producer", "", "", sameConfig, "connected", 1000L, "other-service"); // Reflexive assertEquals(config1, config1); @@ -1526,11 +1569,11 @@ void statsBucketStoresKafkaConfigs() { Map producerCfg = new HashMap<>(); producerCfg.put("acks", "all"); KafkaConfigReport config1 = - new KafkaConfigReport("kafka_producer", "", "", producerCfg, 1000L, null); + new KafkaConfigReport("kafka_producer", "", "", producerCfg, "connected", 1000L, null); Map consumerCfg = new HashMap<>(); consumerCfg.put("group.id", "test"); KafkaConfigReport config2 = - new KafkaConfigReport("kafka_consumer", "", "test", consumerCfg, 2000L, null); + new KafkaConfigReport("kafka_consumer", "", "test", consumerCfg, "connected", 2000L, null); bucket.addKafkaConfig(config1); bucket.addKafkaConfig(config2); diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java b/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java index 42654dfda7c..5161e64ac5a 100644 --- a/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java @@ -18,9 +18,16 @@ public interface AgentDataStreamsMonitoring * @param kafkaClusterId the Kafka cluster identifier, or empty string if not yet known * @param consumerGroup the consumer group name, or empty string for producers * @param config the configuration key-value pairs + * @param connectionStatus "connected" if the client successfully fetched metadata from a broker, + * "failed" if metadata fetch failed (e.g. auth error, broker unreachable). Empty string if + * unknown. */ void reportKafkaConfig( - String type, String kafkaClusterId, String consumerGroup, Map config); + String type, + String kafkaClusterId, + String consumerGroup, + Map config, + String connectionStatus); /** * Tracks Schema Registry usage for Data Streams Monitoring. diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/KafkaConfigReport.java b/internal-api/src/main/java/datadog/trace/api/datastreams/KafkaConfigReport.java index d940847e22f..c76d9dd4396 100644 --- a/internal-api/src/main/java/datadog/trace/api/datastreams/KafkaConfigReport.java +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/KafkaConfigReport.java @@ -12,6 +12,7 @@ public class KafkaConfigReport implements InboxItem { private final String kafkaClusterId; private final String consumerGroup; private final Map config; + private final String connectionStatus; // "connected" or "failed" private final long timestampNanos; private final String serviceNameOverride; @@ -20,12 +21,14 @@ public KafkaConfigReport( String kafkaClusterId, String consumerGroup, Map config, + String connectionStatus, long timestampNanos, String serviceNameOverride) { this.type = type; this.kafkaClusterId = kafkaClusterId != null ? kafkaClusterId : ""; this.consumerGroup = consumerGroup != null ? consumerGroup : ""; this.config = config; + this.connectionStatus = connectionStatus != null ? connectionStatus : ""; this.timestampNanos = timestampNanos; this.serviceNameOverride = serviceNameOverride; } @@ -46,6 +49,10 @@ public Map getConfig() { return config; } + public String getConnectionStatus() { + return connectionStatus; + } + public long getTimestampNanos() { return timestampNanos; } @@ -62,7 +69,8 @@ public boolean equals(Object o) { return Objects.equals(type, that.type) && Objects.equals(kafkaClusterId, that.kafkaClusterId) && Objects.equals(consumerGroup, that.consumerGroup) - && Objects.equals(config, that.config); + && Objects.equals(config, that.config) + && Objects.equals(connectionStatus, that.connectionStatus); } @Override @@ -71,6 +79,7 @@ public int hashCode() { result = 31 * result + (kafkaClusterId != null ? kafkaClusterId.hashCode() : 0); result = 31 * result + (consumerGroup != null ? consumerGroup.hashCode() : 0); result = 31 * result + (config != null ? config.hashCode() : 0); + result = 31 * result + (connectionStatus != null ? connectionStatus.hashCode() : 0); return result; } } diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java b/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java index 09bd3ee1e3a..c9657c60271 100644 --- a/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java @@ -68,7 +68,11 @@ public void trackTransaction( @Override public void reportKafkaConfig( - String type, String kafkaClusterId, String consumerGroup, Map config) {} + String type, + String kafkaClusterId, + String consumerGroup, + Map config, + String connectionStatus) {} @Override public void setConsumeCheckpoint(String type, String source, DataStreamsContextCarrier carrier) {} From 054b71e532c14043e48448cb9750bbb36f5305dc Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Tue, 26 May 2026 14:04:34 -0600 Subject: [PATCH 2/2] Don't consume pending config on Kafka transient metadata failure reportPendingConfigAsFailed used to call takePendingConfig(), so a transient failedUpdate would permanently record the client as failed and prevent a later successful update from emitting "connected". Switch to a peek + one-shot flag: emit at most one "failed" report per pending config without consuming it, so recovery still flows through the normal update path. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../kafka_common/KafkaConfigHelper.java | 39 +++++++++++-------- .../kafka_common/MetadataState.java | 5 +++ .../kafka_common/MetadataStateTest.java | 29 ++++++++++++++ 3 files changed, 57 insertions(+), 16 deletions(-) create mode 100644 dd-java-agent/instrumentation/kafka/kafka-common/src/test/java/datadog/trace/instrumentation/kafka_common/MetadataStateTest.java diff --git a/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/KafkaConfigHelper.java b/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/KafkaConfigHelper.java index 4d5e83d14e2..f3efc12415e 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/KafkaConfigHelper.java +++ b/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/KafkaConfigHelper.java @@ -43,8 +43,6 @@ public class KafkaConfigHelper { "socket.connection.setup.timeout.ms", "socket.connection.setup.timeout.max.ms", "security.protocol", - // Non-secret auth selectors — values name the mechanism / algorithm in use, - // never carry credentials. Useful for spotting typos like SCRAM-SHA-256 vs -512. "sasl.mechanism", "sasl.kerberos.service.name", "sasl.login.callback.handler.class", @@ -121,26 +119,35 @@ public static void reportPendingConfig(MetadataState state, String clusterId) { reportPendingConfig(state, clusterId, PendingConfig.STATUS_CONNECTED); } - /** Called from failure advice when the client cannot reach / authenticate to the cluster. */ + /** + * Called from failure advice when the client cannot reach / authenticate to the cluster. Peeks + * (does not consume) the pending config, so a later successful update can still emit "connected". + */ public static void reportPendingConfigAsFailed(MetadataState state) { - // clusterId may be unknown on auth/connect failure — emit with whatever we have (often "") - reportPendingConfig(state, state.clusterId, PendingConfig.STATUS_FAILED); + PendingConfig pending = state.peekPendingConfig(); + if (pending != null) { + emitKafkaConfig(pending, state.clusterId, PendingConfig.STATUS_FAILED); + } } private static void reportPendingConfig(MetadataState state, String clusterId, String status) { PendingConfig pending = state.takePendingConfig(); if (pending != null) { - log.debug("Reporting {} config with status={}", pending.type, status); - if (Config.get().isDataStreamsEnabled()) { - AgentTracer.get() - .getDataStreamsMonitoring() - .reportKafkaConfig( - pending.type, - clusterId != null ? clusterId : "", - pending.consumerGroup, - pending.config, - status); - } + emitKafkaConfig(pending, clusterId, status); + } + } + + private static void emitKafkaConfig(PendingConfig pending, String clusterId, String status) { + log.debug("Reporting {} config with status={}", pending.type, status); + if (Config.get().isDataStreamsEnabled()) { + AgentTracer.get() + .getDataStreamsMonitoring() + .reportKafkaConfig( + pending.type, + clusterId != null ? clusterId : "", + pending.consumerGroup, + pending.config, + status); } } diff --git a/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/MetadataState.java b/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/MetadataState.java index a4444a844ce..c129ca44d40 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/MetadataState.java +++ b/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/MetadataState.java @@ -15,4 +15,9 @@ public void setPendingConfig(PendingConfig config) { public PendingConfig takePendingConfig() { return pendingConfig.getAndSet(null); } + + /** Snapshot the pending config without clearing it. */ + public PendingConfig peekPendingConfig() { + return pendingConfig.get(); + } } diff --git a/dd-java-agent/instrumentation/kafka/kafka-common/src/test/java/datadog/trace/instrumentation/kafka_common/MetadataStateTest.java b/dd-java-agent/instrumentation/kafka/kafka-common/src/test/java/datadog/trace/instrumentation/kafka_common/MetadataStateTest.java new file mode 100644 index 00000000000..3cd87453bff --- /dev/null +++ b/dd-java-agent/instrumentation/kafka/kafka-common/src/test/java/datadog/trace/instrumentation/kafka_common/MetadataStateTest.java @@ -0,0 +1,29 @@ +package datadog.trace.instrumentation.kafka_common; + +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; + +import java.util.HashMap; +import org.junit.jupiter.api.Test; + +class MetadataStateTest { + + private static PendingConfig newPending() { + return new PendingConfig("kafka_producer", "", new HashMap<>()); + } + + @Test + void peekDoesNotConsumePendingConfig() { + MetadataState state = new MetadataState(); + PendingConfig pending = newPending(); + state.setPendingConfig(pending); + + // Reviewer's concern: a transient failedUpdate must leave the pending config in place + // so a later successful update can still take it and emit "connected". + assertSame(pending, state.peekPendingConfig()); + assertSame(pending, state.peekPendingConfig()); + + assertSame(pending, state.takePendingConfig()); + assertNull(state.peekPendingConfig()); + } +}