From 45057c58bf73f60f5200ea73b85886bb72bc3772 Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Wed, 18 Feb 2026 20:05:34 +0100 Subject: [PATCH 01/20] initial spark launcher instrumentation --- .../spark/spark-common/build.gradle | 2 + .../spark/SparkExitAdvice.java | 8 + .../spark/SparkLauncherAdvice.java | 144 ++++++++++++++++++ .../spark/SparkLauncherInstrumentation.java | 52 +++++++ .../spark/AbstractSparkTest.groovy | 67 ++++++++ 5 files changed, 273 insertions(+) create mode 100644 dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java create mode 100644 dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java diff --git a/dd-java-agent/instrumentation/spark/spark-common/build.gradle b/dd-java-agent/instrumentation/spark/spark-common/build.gradle index 84dd5cca6a5..724bfec3f67 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/build.gradle +++ b/dd-java-agent/instrumentation/spark/spark-common/build.gradle @@ -10,6 +10,7 @@ configurations.configureEach { dependencies { compileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0' compileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0' + compileOnly group: 'org.apache.spark', name: 'spark-launcher_2.12', version: '2.4.0' testFixturesImplementation group: 'com.datadoghq', name: 'sketches-java', version: '0.8.2' testFixturesImplementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.14.0' @@ -21,6 +22,7 @@ dependencies { testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0' testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0' testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-yarn_2.12', version: '2.4.0' + testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-launcher_2.12', version: '2.4.0' testFixturesCompileOnly(libs.bundles.groovy) testFixturesCompileOnly(libs.bundles.spock) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkExitAdvice.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkExitAdvice.java index 1306da2b373..6a26391af1c 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkExitAdvice.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkExitAdvice.java @@ -21,6 +21,14 @@ public static void enter(@Advice.Argument(0) int exitCode) { .getMethod( "finishApplication", long.class, Throwable.class, int.class, String.class); method.invoke(datadogListener, System.currentTimeMillis(), null, exitCode, null); + } else { + // No Spark listener means we may be in a launcher-only process (e.g. SparkLauncher on EMR) + Class adviceClass = + Thread.currentThread() + .getContextClassLoader() + .loadClass("datadog.trace.instrumentation.spark.SparkLauncherAdvice"); + Method finishMethod = adviceClass.getDeclaredMethod("finishLauncherSpan", int.class); + finishMethod.invoke(null, exitCode); } } catch (Exception ignored) { } diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java new file mode 100644 index 00000000000..e245a4a16c4 --- /dev/null +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java @@ -0,0 +1,144 @@ +package datadog.trace.instrumentation.spark; + +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import net.bytebuddy.asm.Advice; +import org.apache.spark.launcher.SparkAppHandle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class SparkLauncherAdvice { + + private static final Logger log = LoggerFactory.getLogger(SparkLauncherAdvice.class); + + /** The launcher span, accessible from SparkExitAdvice via reflection. */ + static volatile AgentSpan launcherSpan; + + private static volatile boolean shutdownHookRegistered = false; + + private static synchronized void createLauncherSpan(String resource) { + if (launcherSpan != null) { + return; + } + + AgentTracer.TracerAPI tracer = AgentTracer.get(); + AgentSpan span = + tracer + .buildSpan("spark.launcher") + .withSpanType("spark") + .withResourceName(resource) + .start(); + span.setSamplingPriority( + datadog.trace.api.sampling.PrioritySampling.USER_KEEP, + datadog.trace.api.sampling.SamplingMechanism.DATA_JOBS); + launcherSpan = span; + + if (!shutdownHookRegistered) { + shutdownHookRegistered = true; + Runtime.getRuntime() + .addShutdownHook( + new Thread( + () -> { + AgentSpan s = launcherSpan; + if (s != null) { + log.info("Finishing spark.launcher span from shutdown hook"); + s.finish(); + launcherSpan = null; + } + })); + } + } + + static synchronized void finishLauncherSpan(int exitCode) { + AgentSpan span = launcherSpan; + if (span == null) { + return; + } + if (exitCode != 0) { + span.setError(true); + span.setTag("error.type", "Launcher process failed with exit code " + exitCode); + } + span.finish(); + launcherSpan = null; + } + + public static class StartApplicationAdvice { + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void exit( + @Advice.Return SparkAppHandle handle, @Advice.Thrown Throwable throwable) { + createLauncherSpan("SparkLauncher.startApplication"); + + if (throwable != null) { + AgentSpan span = launcherSpan; + if (span != null) { + span.setError(true); + span.addThrowable(throwable); + span.finish(); + launcherSpan = null; + } + return; + } + + if (handle != null) { + try { + handle.addListener(new AppHandleListener()); + } catch (Exception e) { + log.debug("Failed to register SparkAppHandle listener", e); + } + } + } + } + + public static class LaunchAdvice { + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void exit(@Advice.Thrown Throwable throwable) { + createLauncherSpan("SparkLauncher.launch"); + + if (throwable != null) { + AgentSpan span = launcherSpan; + if (span != null) { + span.setError(true); + span.addThrowable(throwable); + span.finish(); + launcherSpan = null; + } + } + } + } + + static class AppHandleListener implements SparkAppHandle.Listener { + @Override + public void stateChanged(SparkAppHandle handle) { + SparkAppHandle.State state = handle.getState(); + AgentSpan span = launcherSpan; + if (span != null) { + span.setTag("spark.launcher.app_state", state.toString()); + + String appId = handle.getAppId(); + if (appId != null) { + span.setTag("spark.app_id", appId); + } + + if (state.isFinal()) { + if (state == SparkAppHandle.State.FAILED + || state == SparkAppHandle.State.KILLED + || state == SparkAppHandle.State.LOST) { + span.setError(true); + span.setTag("error.type", "Spark application " + state); + } + } + } + } + + @Override + public void infoChanged(SparkAppHandle handle) { + AgentSpan span = launcherSpan; + if (span != null) { + String appId = handle.getAppId(); + if (appId != null) { + span.setTag("spark.app_id", appId); + } + } + } + } +} diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java new file mode 100644 index 00000000000..669643912ea --- /dev/null +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java @@ -0,0 +1,52 @@ +package datadog.trace.instrumentation.spark; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.api.InstrumenterConfig; + +@AutoService(InstrumenterModule.class) +public class SparkLauncherInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + + public SparkLauncherInstrumentation() { + super("spark-launcher"); + } + + @Override + protected boolean defaultEnabled() { + return InstrumenterConfig.get().isDataJobsEnabled(); + } + + @Override + public String instrumentedType() { + return "org.apache.spark.launcher.SparkLauncher"; + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".SparkLauncherAdvice", + packageName + ".SparkLauncherAdvice$AppHandleListener", + }; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod() + .and(named("startApplication")) + .and(isDeclaredBy(named("org.apache.spark.launcher.SparkLauncher"))), + packageName + ".SparkLauncherAdvice$StartApplicationAdvice"); + + transformer.applyAdvice( + isMethod() + .and(named("launch")) + .and(isDeclaredBy(named("org.apache.spark.launcher.SparkLauncher"))), + packageName + ".SparkLauncherAdvice$LaunchAdvice"); + } +} diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy index c39f42f473c..5280d43e49c 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy @@ -780,4 +780,71 @@ abstract class AbstractSparkTest extends InstrumentationSpecification { } } } + + def "generate spark.launcher span from startApplication"() { + setup: + // Reset any previous launcher span + SparkLauncherAdvice.launcherSpan = null + + when: + // Directly invoke the advice to simulate what would happen when startApplication is called + def tracer = datadog.trace.bootstrap.instrumentation.api.AgentTracer.get() + def span = tracer + .buildSpan("spark.launcher") + .withSpanType("spark") + .withResourceName("SparkLauncher.startApplication") + .start() + span.setSamplingPriority( + datadog.trace.api.sampling.PrioritySampling.USER_KEEP, + datadog.trace.api.sampling.SamplingMechanism.DATA_JOBS) + SparkLauncherAdvice.launcherSpan = span + + // Simulate a non-zero exit finishing the launcher span + SparkLauncherAdvice.finishLauncherSpan(1) + + then: + assertTraces(1) { + trace(1) { + span { + operationName "spark.launcher" + spanType "spark" + resourceName "SparkLauncher.startApplication" + errored true + assert span.tags["error.type"] == "Launcher process failed with exit code 1" + } + } + } + } + + def "generate spark.launcher span with successful exit"() { + setup: + SparkLauncherAdvice.launcherSpan = null + + when: + def tracer = datadog.trace.bootstrap.instrumentation.api.AgentTracer.get() + def span = tracer + .buildSpan("spark.launcher") + .withSpanType("spark") + .withResourceName("SparkLauncher.launch") + .start() + span.setSamplingPriority( + datadog.trace.api.sampling.PrioritySampling.USER_KEEP, + datadog.trace.api.sampling.SamplingMechanism.DATA_JOBS) + SparkLauncherAdvice.launcherSpan = span + + // Simulate a successful exit + SparkLauncherAdvice.finishLauncherSpan(0) + + then: + assertTraces(1) { + trace(1) { + span { + operationName "spark.launcher" + spanType "spark" + resourceName "SparkLauncher.launch" + errored false + } + } + } + } } From ae18996e4a42992131d87d49abc80041db40d228 Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Wed, 18 Feb 2026 20:21:02 +0100 Subject: [PATCH 02/20] use ddtags --- .../spark/SparkLauncherAdvice.java | 19 +++++++------------ .../spark/SparkLauncherInstrumentation.java | 3 +-- .../spark/AbstractSparkTest.groovy | 2 +- 3 files changed, 9 insertions(+), 15 deletions(-) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java index e245a4a16c4..cfd14c65a81 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java @@ -1,5 +1,8 @@ package datadog.trace.instrumentation.spark; +import datadog.trace.api.DDTags; +import datadog.trace.api.sampling.PrioritySampling; +import datadog.trace.api.sampling.SamplingMechanism; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import net.bytebuddy.asm.Advice; @@ -23,14 +26,8 @@ private static synchronized void createLauncherSpan(String resource) { AgentTracer.TracerAPI tracer = AgentTracer.get(); AgentSpan span = - tracer - .buildSpan("spark.launcher") - .withSpanType("spark") - .withResourceName(resource) - .start(); - span.setSamplingPriority( - datadog.trace.api.sampling.PrioritySampling.USER_KEEP, - datadog.trace.api.sampling.SamplingMechanism.DATA_JOBS); + tracer.buildSpan("spark.launcher").withSpanType("spark").withResourceName(resource).start(); + span.setSamplingPriority(PrioritySampling.USER_KEEP, SamplingMechanism.DATA_JOBS); launcherSpan = span; if (!shutdownHookRegistered) { @@ -56,7 +53,7 @@ static synchronized void finishLauncherSpan(int exitCode) { } if (exitCode != 0) { span.setError(true); - span.setTag("error.type", "Launcher process failed with exit code " + exitCode); + span.setTag(DDTags.ERROR_TYPE, "Spark Launcher Failed with exit code " + exitCode); } span.finish(); launcherSpan = null; @@ -71,7 +68,6 @@ public static void exit( if (throwable != null) { AgentSpan span = launcherSpan; if (span != null) { - span.setError(true); span.addThrowable(throwable); span.finish(); launcherSpan = null; @@ -97,7 +93,6 @@ public static void exit(@Advice.Thrown Throwable throwable) { if (throwable != null) { AgentSpan span = launcherSpan; if (span != null) { - span.setError(true); span.addThrowable(throwable); span.finish(); launcherSpan = null; @@ -124,7 +119,7 @@ public void stateChanged(SparkAppHandle handle) { || state == SparkAppHandle.State.KILLED || state == SparkAppHandle.State.LOST) { span.setError(true); - span.setTag("error.type", "Spark application " + state); + span.setTag(DDTags.ERROR_TYPE, "Spark Application " + state); } } } diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java index 669643912ea..861a37175f4 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java @@ -30,8 +30,7 @@ public String instrumentedType() { @Override public String[] helperClassNames() { return new String[] { - packageName + ".SparkLauncherAdvice", - packageName + ".SparkLauncherAdvice$AppHandleListener", + packageName + ".SparkLauncherAdvice", packageName + ".SparkLauncherAdvice$AppHandleListener", }; } diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy index 5280d43e49c..93b7502abf3 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy @@ -810,7 +810,7 @@ abstract class AbstractSparkTest extends InstrumentationSpecification { spanType "spark" resourceName "SparkLauncher.startApplication" errored true - assert span.tags["error.type"] == "Launcher process failed with exit code 1" + assert span.tags["error.type"] == "Spark Launcher Failed with exit code 1" } } } From edbea75c384dcc14c6a15d58c02d10d519e4220a Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Thu, 19 Feb 2026 10:37:50 +0100 Subject: [PATCH 03/20] Fix tess --- .../instrumentation/spark/AbstractSparkTest.groovy | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy index 93b7502abf3..e89800d2c8f 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy @@ -789,15 +789,15 @@ abstract class AbstractSparkTest extends InstrumentationSpecification { when: // Directly invoke the advice to simulate what would happen when startApplication is called def tracer = datadog.trace.bootstrap.instrumentation.api.AgentTracer.get() - def span = tracer + def launcherSpan = tracer .buildSpan("spark.launcher") .withSpanType("spark") .withResourceName("SparkLauncher.startApplication") .start() - span.setSamplingPriority( + launcherSpan.setSamplingPriority( datadog.trace.api.sampling.PrioritySampling.USER_KEEP, datadog.trace.api.sampling.SamplingMechanism.DATA_JOBS) - SparkLauncherAdvice.launcherSpan = span + SparkLauncherAdvice.launcherSpan = launcherSpan // Simulate a non-zero exit finishing the launcher span SparkLauncherAdvice.finishLauncherSpan(1) @@ -822,15 +822,15 @@ abstract class AbstractSparkTest extends InstrumentationSpecification { when: def tracer = datadog.trace.bootstrap.instrumentation.api.AgentTracer.get() - def span = tracer + def launcherSpan = tracer .buildSpan("spark.launcher") .withSpanType("spark") .withResourceName("SparkLauncher.launch") .start() - span.setSamplingPriority( + launcherSpan.setSamplingPriority( datadog.trace.api.sampling.PrioritySampling.USER_KEEP, datadog.trace.api.sampling.SamplingMechanism.DATA_JOBS) - SparkLauncherAdvice.launcherSpan = span + SparkLauncherAdvice.launcherSpan = launcherSpan // Simulate a successful exit SparkLauncherAdvice.finishLauncherSpan(0) From 9794da840c3f21ed0b96f90e7e78c9ccfa6873d3 Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Thu, 19 Feb 2026 12:22:37 +0100 Subject: [PATCH 04/20] move test to the right /test dir --- .../spark/spark-common/build.gradle | 4 ++ .../spark/SparkLauncherTest.groovy | 72 +++++++++++++++++++ .../spark/AbstractSparkTest.groovy | 67 ----------------- 3 files changed, 76 insertions(+), 67 deletions(-) create mode 100644 dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy diff --git a/dd-java-agent/instrumentation/spark/spark-common/build.gradle b/dd-java-agent/instrumentation/spark/spark-common/build.gradle index 724bfec3f67..f19ebd38b4a 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/build.gradle +++ b/dd-java-agent/instrumentation/spark/spark-common/build.gradle @@ -26,4 +26,8 @@ dependencies { testFixturesCompileOnly(libs.bundles.groovy) testFixturesCompileOnly(libs.bundles.spock) + + testImplementation project(':dd-java-agent:instrumentation-testing') + testImplementation group: 'org.apache.spark', name: 'spark-launcher_2.12', version: '2.4.0' } + diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy new file mode 100644 index 00000000000..cf1ffcf25ed --- /dev/null +++ b/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy @@ -0,0 +1,72 @@ +package datadog.trace.instrumentation.spark + +import datadog.trace.agent.test.InstrumentationSpecification +import datadog.trace.api.sampling.PrioritySampling +import datadog.trace.api.sampling.SamplingMechanism +import datadog.trace.bootstrap.instrumentation.api.AgentTracer + +class SparkLauncherTest extends InstrumentationSpecification { + + def "generate spark.launcher span from startApplication"() { + setup: + SparkLauncherAdvice.launcherSpan = null + + when: + def tracer = AgentTracer.get() + def launcherSpan = tracer + .buildSpan("spark.launcher") + .withSpanType("spark") + .withResourceName("SparkLauncher.startApplication") + .start() + launcherSpan.setSamplingPriority( + PrioritySampling.USER_KEEP, + SamplingMechanism.DATA_JOBS) + SparkLauncherAdvice.launcherSpan = launcherSpan + + SparkLauncherAdvice.finishLauncherSpan(1) + + then: + assertTraces(1) { + trace(1) { + span { + operationName "spark.launcher" + spanType "spark" + resourceName "SparkLauncher.startApplication" + errored true + assert span.tags["error.type"] == "Spark Launcher Failed with exit code 1" + } + } + } + } + + def "generate spark.launcher span with successful exit"() { + setup: + SparkLauncherAdvice.launcherSpan = null + + when: + def tracer = AgentTracer.get() + def launcherSpan = tracer + .buildSpan("spark.launcher") + .withSpanType("spark") + .withResourceName("SparkLauncher.launch") + .start() + launcherSpan.setSamplingPriority( + PrioritySampling.USER_KEEP, + SamplingMechanism.DATA_JOBS) + SparkLauncherAdvice.launcherSpan = launcherSpan + + SparkLauncherAdvice.finishLauncherSpan(0) + + then: + assertTraces(1) { + trace(1) { + span { + operationName "spark.launcher" + spanType "spark" + resourceName "SparkLauncher.launch" + errored false + } + } + } + } +} diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy index e89800d2c8f..c39f42f473c 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy @@ -780,71 +780,4 @@ abstract class AbstractSparkTest extends InstrumentationSpecification { } } } - - def "generate spark.launcher span from startApplication"() { - setup: - // Reset any previous launcher span - SparkLauncherAdvice.launcherSpan = null - - when: - // Directly invoke the advice to simulate what would happen when startApplication is called - def tracer = datadog.trace.bootstrap.instrumentation.api.AgentTracer.get() - def launcherSpan = tracer - .buildSpan("spark.launcher") - .withSpanType("spark") - .withResourceName("SparkLauncher.startApplication") - .start() - launcherSpan.setSamplingPriority( - datadog.trace.api.sampling.PrioritySampling.USER_KEEP, - datadog.trace.api.sampling.SamplingMechanism.DATA_JOBS) - SparkLauncherAdvice.launcherSpan = launcherSpan - - // Simulate a non-zero exit finishing the launcher span - SparkLauncherAdvice.finishLauncherSpan(1) - - then: - assertTraces(1) { - trace(1) { - span { - operationName "spark.launcher" - spanType "spark" - resourceName "SparkLauncher.startApplication" - errored true - assert span.tags["error.type"] == "Spark Launcher Failed with exit code 1" - } - } - } - } - - def "generate spark.launcher span with successful exit"() { - setup: - SparkLauncherAdvice.launcherSpan = null - - when: - def tracer = datadog.trace.bootstrap.instrumentation.api.AgentTracer.get() - def launcherSpan = tracer - .buildSpan("spark.launcher") - .withSpanType("spark") - .withResourceName("SparkLauncher.launch") - .start() - launcherSpan.setSamplingPriority( - datadog.trace.api.sampling.PrioritySampling.USER_KEEP, - datadog.trace.api.sampling.SamplingMechanism.DATA_JOBS) - SparkLauncherAdvice.launcherSpan = launcherSpan - - // Simulate a successful exit - SparkLauncherAdvice.finishLauncherSpan(0) - - then: - assertTraces(1) { - trace(1) { - span { - operationName "spark.launcher" - spanType "spark" - resourceName "SparkLauncher.launch" - errored false - } - } - } - } } From bf992601073d913a52f26e9de407f29179c10efd Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Thu, 19 Feb 2026 17:15:05 +0100 Subject: [PATCH 05/20] advice should be public --- .../trace/instrumentation/spark/SparkLauncherAdvice.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java index cfd14c65a81..e183140e4a9 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java @@ -10,16 +10,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class SparkLauncherAdvice { +public class SparkLauncherAdvice { private static final Logger log = LoggerFactory.getLogger(SparkLauncherAdvice.class); /** The launcher span, accessible from SparkExitAdvice via reflection. */ - static volatile AgentSpan launcherSpan; + public static volatile AgentSpan launcherSpan; private static volatile boolean shutdownHookRegistered = false; - private static synchronized void createLauncherSpan(String resource) { + public static synchronized void createLauncherSpan(String resource) { if (launcherSpan != null) { return; } @@ -46,7 +46,7 @@ private static synchronized void createLauncherSpan(String resource) { } } - static synchronized void finishLauncherSpan(int exitCode) { + public static synchronized void finishLauncherSpan(int exitCode) { AgentSpan span = launcherSpan; if (span == null) { return; From 74326e0291793932aeb557c625b3854e4b848619 Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Thu, 19 Feb 2026 18:44:44 +0100 Subject: [PATCH 06/20] finish launcher span with error via RunMainAdvice --- .../spark/AbstractSparkInstrumentation.java | 11 +++++++++++ .../instrumentation/spark/SparkLauncherAdvice.java | 14 +++++++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java index 80c71a4f64b..fe777e7f276 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java @@ -106,6 +106,17 @@ public static void exit(@Advice.Thrown Throwable throwable) { if (AbstractDatadogSparkListener.listener != null) { AbstractDatadogSparkListener.listener.finishApplication( System.currentTimeMillis(), throwable, 0, null); + } else { + try { + Class adviceClass = + Thread.currentThread() + .getContextClassLoader() + .loadClass("datadog.trace.instrumentation.spark.SparkLauncherAdvice"); + java.lang.reflect.Method finishMethod = + adviceClass.getMethod("finishLauncherSpan", Throwable.class); + finishMethod.invoke(null, throwable); + } catch (Exception ignored) { + } } } } diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java index e183140e4a9..302de156ae6 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java @@ -26,7 +26,7 @@ public static synchronized void createLauncherSpan(String resource) { AgentTracer.TracerAPI tracer = AgentTracer.get(); AgentSpan span = - tracer.buildSpan("spark.launcher").withSpanType("spark").withResourceName(resource).start(); + tracer.buildSpan("spark.launcher.launch").withSpanType("spark").withResourceName(resource).start(); span.setSamplingPriority(PrioritySampling.USER_KEEP, SamplingMechanism.DATA_JOBS); launcherSpan = span; @@ -59,6 +59,18 @@ public static synchronized void finishLauncherSpan(int exitCode) { launcherSpan = null; } + public static synchronized void finishLauncherSpan(Throwable throwable) { + AgentSpan span = launcherSpan; + if (span == null) { + return; + } + if (throwable != null) { + span.addThrowable(throwable); + } + span.finish(); + launcherSpan = null; + } + public static class StartApplicationAdvice { @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) public static void exit( From 83bdee0ef23f1b0480eefdc6663b512f7747209b Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Fri, 20 Feb 2026 10:29:19 +0100 Subject: [PATCH 07/20] sportLess --- .../trace/instrumentation/spark/SparkLauncherAdvice.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java index 302de156ae6..5387ff95912 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java @@ -26,7 +26,11 @@ public static synchronized void createLauncherSpan(String resource) { AgentTracer.TracerAPI tracer = AgentTracer.get(); AgentSpan span = - tracer.buildSpan("spark.launcher.launch").withSpanType("spark").withResourceName(resource).start(); + tracer + .buildSpan("spark.launcher.launch") + .withSpanType("spark") + .withResourceName(resource) + .start(); span.setSamplingPriority(PrioritySampling.USER_KEEP, SamplingMechanism.DATA_JOBS); launcherSpan = span; From b6406c28b01f662520c6508720fda2c228863461 Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Fri, 20 Feb 2026 10:34:04 +0100 Subject: [PATCH 08/20] synchronize shutdown hook --- .../instrumentation/spark/SparkLauncherAdvice.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java index 5387ff95912..4c272a28804 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java @@ -40,11 +40,13 @@ public static synchronized void createLauncherSpan(String resource) { .addShutdownHook( new Thread( () -> { - AgentSpan s = launcherSpan; - if (s != null) { - log.info("Finishing spark.launcher span from shutdown hook"); - s.finish(); - launcherSpan = null; + synchronized (SparkLauncherAdvice.class) { + AgentSpan s = launcherSpan; + if (s != null) { + log.info("Finishing spark.launcher span from shutdown hook"); + s.finish(); + launcherSpan = null; + } } })); } From 5c57154c2d60cb2f415207f7494bb8ab5e106de7 Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Fri, 20 Feb 2026 10:55:57 +0100 Subject: [PATCH 09/20] Capture more spark relevant attrs --- .../spark/SparkLauncherAdvice.java | 83 +++++++++++++++++-- 1 file changed, 78 insertions(+), 5 deletions(-) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java index 4c272a28804..0b15d150f30 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java @@ -5,6 +5,9 @@ import datadog.trace.api.sampling.SamplingMechanism; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import java.lang.reflect.Field; +import java.util.Map; +import java.util.regex.Pattern; import net.bytebuddy.asm.Advice; import org.apache.spark.launcher.SparkAppHandle; import org.slf4j.Logger; @@ -19,7 +22,70 @@ public class SparkLauncherAdvice { private static volatile boolean shutdownHookRegistered = false; - public static synchronized void createLauncherSpan(String resource) { + /** Extract SparkLauncher configuration via reflection and set as span tags. */ + private static void setLauncherConfigTags(AgentSpan span, Object launcher) { + try { + // SparkLauncher extends AbstractLauncher which has a 'builder' field + Field builderField = launcher.getClass().getSuperclass().getDeclaredField("builder"); + builderField.setAccessible(true); + Object builder = builderField.get(launcher); + if (builder == null) { + return; + } + + Class builderClass = builder.getClass(); + // Fields are on AbstractCommandBuilder (parent of SparkSubmitCommandBuilder) + Class abstractBuilderClass = builderClass.getSuperclass(); + + setStringFieldAsTag(span, builder, abstractBuilderClass, "master", "master"); + setStringFieldAsTag(span, builder, abstractBuilderClass, "deployMode", "deploy_mode"); + setStringFieldAsTag(span, builder, abstractBuilderClass, "appName", "application_name"); + setStringFieldAsTag(span, builder, abstractBuilderClass, "mainClass", "main_class"); + setStringFieldAsTag(span, builder, abstractBuilderClass, "appResource", "app_resource"); + + // Extract spark conf entries and redact sensitive values + try { + Field confField = abstractBuilderClass.getDeclaredField("conf"); + confField.setAccessible(true); + @SuppressWarnings("unchecked") + Map conf = (Map) confField.get(builder); + if (conf != null) { + Pattern redactionPattern = + Pattern.compile("(?i)secret|password|token|access.key|api.key"); + for (Map.Entry entry : conf.entrySet()) { + if (SparkConfAllowList.canCaptureJobParameter(entry.getKey())) { + String value = entry.getValue(); + if (redactionPattern.matcher(entry.getKey()).find() + || redactionPattern.matcher(value).find()) { + value = "[redacted]"; + } + span.setTag("config." + entry.getKey().replace('.', '_'), value); + } + } + } + } catch (NoSuchFieldException e) { + log.debug("Could not find conf field on builder", e); + } + } catch (Exception e) { + log.debug("Failed to extract SparkLauncher configuration", e); + } + } + + private static void setStringFieldAsTag( + AgentSpan span, Object obj, Class clazz, String fieldName, String tagName) { + try { + Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + Object value = field.get(obj); + if (value != null) { + span.setTag(tagName, value.toString()); + } + } catch (Exception e) { + log.debug("Could not read field {} from builder", fieldName, e); + } + } + + public static synchronized void createLauncherSpan(String resource, Object launcher) { if (launcherSpan != null) { return; } @@ -32,6 +98,11 @@ public static synchronized void createLauncherSpan(String resource) { .withResourceName(resource) .start(); span.setSamplingPriority(PrioritySampling.USER_KEEP, SamplingMechanism.DATA_JOBS); + + if (launcher != null) { + setLauncherConfigTags(span, launcher); + } + launcherSpan = span; if (!shutdownHookRegistered) { @@ -80,8 +151,10 @@ public static synchronized void finishLauncherSpan(Throwable throwable) { public static class StartApplicationAdvice { @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) public static void exit( - @Advice.Return SparkAppHandle handle, @Advice.Thrown Throwable throwable) { - createLauncherSpan("SparkLauncher.startApplication"); + @Advice.This Object launcher, + @Advice.Return SparkAppHandle handle, + @Advice.Thrown Throwable throwable) { + createLauncherSpan("SparkLauncher.startApplication", launcher); if (throwable != null) { AgentSpan span = launcherSpan; @@ -105,8 +178,8 @@ public static void exit( public static class LaunchAdvice { @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) - public static void exit(@Advice.Thrown Throwable throwable) { - createLauncherSpan("SparkLauncher.launch"); + public static void exit(@Advice.This Object launcher, @Advice.Thrown Throwable throwable) { + createLauncherSpan("SparkLauncher.launch", launcher); if (throwable != null) { AgentSpan span = launcherSpan; From f7d45ac3f2117eec2adb91112baea4eb874f92ca Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Fri, 20 Feb 2026 11:07:28 +0100 Subject: [PATCH 10/20] Update tests with new attrs --- .../spark/SparkLauncherAdvice.java | 10 +-- .../spark/SparkLauncherTest.groovy | 65 +++++++++++++++++++ 2 files changed, 71 insertions(+), 4 deletions(-) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java index 0b15d150f30..bedc1f63257 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java @@ -17,6 +17,10 @@ public class SparkLauncherAdvice { private static final Logger log = LoggerFactory.getLogger(SparkLauncherAdvice.class); + // Same default pattern as spark.redaction.regex in Spark source + private static final Pattern CONF_REDACTION_PATTERN = + Pattern.compile("(?i)secret|password|token|access.key|api.key"); + /** The launcher span, accessible from SparkExitAdvice via reflection. */ public static volatile AgentSpan launcherSpan; @@ -50,13 +54,11 @@ private static void setLauncherConfigTags(AgentSpan span, Object launcher) { @SuppressWarnings("unchecked") Map conf = (Map) confField.get(builder); if (conf != null) { - Pattern redactionPattern = - Pattern.compile("(?i)secret|password|token|access.key|api.key"); for (Map.Entry entry : conf.entrySet()) { if (SparkConfAllowList.canCaptureJobParameter(entry.getKey())) { String value = entry.getValue(); - if (redactionPattern.matcher(entry.getKey()).find() - || redactionPattern.matcher(value).find()) { + if (CONF_REDACTION_PATTERN.matcher(entry.getKey()).find() + || CONF_REDACTION_PATTERN.matcher(value).find()) { value = "[redacted]"; } span.setTag("config." + entry.getKey().replace('.', '_'), value); diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy index cf1ffcf25ed..e68d8b27667 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy @@ -4,9 +4,74 @@ import datadog.trace.agent.test.InstrumentationSpecification import datadog.trace.api.sampling.PrioritySampling import datadog.trace.api.sampling.SamplingMechanism import datadog.trace.bootstrap.instrumentation.api.AgentTracer +import org.apache.spark.launcher.SparkLauncher class SparkLauncherTest extends InstrumentationSpecification { + def "createLauncherSpan extracts config tags from SparkLauncher"() { + setup: + SparkLauncherAdvice.launcherSpan = null + + when: + def launcher = new SparkLauncher() + .setMaster("yarn") + .setDeployMode("cluster") + .setAppName("test-app") + .setMainClass("com.example.Main") + .setAppResource("/path/to/app.jar") + .setConf("spark.executor.memory", "4g") + .setConf("spark.executor.instances", "10") + SparkLauncherAdvice.createLauncherSpan("SparkLauncher.startApplication", launcher) + SparkLauncherAdvice.finishLauncherSpan(0) + + then: + assertTraces(1) { + trace(1) { + span { + operationName "spark.launcher.launch" + spanType "spark" + resourceName "SparkLauncher.startApplication" + errored false + assert span.tags["master"] == "yarn" + assert span.tags["deploy_mode"] == "cluster" + assert span.tags["application_name"] == "test-app" + assert span.tags["main_class"] == "com.example.Main" + assert span.tags["app_resource"] == "/path/to/app.jar" + assert span.tags["config.spark_executor_memory"] == "4g" + assert span.tags["config.spark_executor_instances"] == "10" + } + } + } + } + + def "createLauncherSpan redacts sensitive conf values"() { + setup: + SparkLauncherAdvice.launcherSpan = null + + when: + def launcher = new SparkLauncher() + // spark.app.name is allowlisted; its value contains "secret" so should be redacted + .setConf("spark.app.name", "my-secret-app") + // spark.master is allowlisted; its value is harmless so should pass through + .setConf("spark.master", "yarn") + SparkLauncherAdvice.createLauncherSpan("SparkLauncher.startApplication", launcher) + SparkLauncherAdvice.finishLauncherSpan(0) + + then: + assertTraces(1) { + trace(1) { + span { + operationName "spark.launcher.launch" + spanType "spark" + resourceName "SparkLauncher.startApplication" + errored false + assert span.tags["config.spark_app_name"] == "[redacted]" + assert span.tags["config.spark_master"] == "yarn" + } + } + } + } + def "generate spark.launcher span from startApplication"() { setup: SparkLauncherAdvice.launcherSpan = null From 3f0d8a07a197a635f6fef149d0c2efd8b7427f0b Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Fri, 20 Feb 2026 13:19:54 +0100 Subject: [PATCH 11/20] fix sportBugsMain and muzzle --- .../trace/instrumentation/spark/SparkLauncherAdvice.java | 2 ++ .../instrumentation/spark/SparkLauncherInstrumentation.java | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java index bedc1f63257..7a5354bda58 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java @@ -5,6 +5,7 @@ import datadog.trace.api.sampling.SamplingMechanism; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.lang.reflect.Field; import java.util.Map; import java.util.regex.Pattern; @@ -22,6 +23,7 @@ public class SparkLauncherAdvice { Pattern.compile("(?i)secret|password|token|access.key|api.key"); /** The launcher span, accessible from SparkExitAdvice via reflection. */ + @SuppressFBWarnings("PA_PUBLIC_PRIMITIVE_ATTRIBUTE") public static volatile AgentSpan launcherSpan; private static volatile boolean shutdownHookRegistered = false; diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java index 861a37175f4..ae83c5983df 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java @@ -30,7 +30,9 @@ public String instrumentedType() { @Override public String[] helperClassNames() { return new String[] { - packageName + ".SparkLauncherAdvice", packageName + ".SparkLauncherAdvice$AppHandleListener", + packageName + ".SparkConfAllowList", + packageName + ".SparkLauncherAdvice", + packageName + ".SparkLauncherAdvice$AppHandleListener", }; } From 559ce6c24eab202b9832c88a97714b5e071d1274 Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Fri, 20 Feb 2026 14:06:52 +0100 Subject: [PATCH 12/20] remove SparkLauncher.launch() instrumentation --- .../spark/AbstractSparkInstrumentation.java | 11 --- .../spark/SparkExitAdvice.java | 8 -- .../spark/SparkLauncherAdvice.java | 53 +++-------- .../spark/SparkLauncherInstrumentation.java | 6 -- .../spark/SparkLauncherTest.groovy | 88 +++++++++++++------ 5 files changed, 73 insertions(+), 93 deletions(-) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java index fe777e7f276..80c71a4f64b 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java @@ -106,17 +106,6 @@ public static void exit(@Advice.Thrown Throwable throwable) { if (AbstractDatadogSparkListener.listener != null) { AbstractDatadogSparkListener.listener.finishApplication( System.currentTimeMillis(), throwable, 0, null); - } else { - try { - Class adviceClass = - Thread.currentThread() - .getContextClassLoader() - .loadClass("datadog.trace.instrumentation.spark.SparkLauncherAdvice"); - java.lang.reflect.Method finishMethod = - adviceClass.getMethod("finishLauncherSpan", Throwable.class); - finishMethod.invoke(null, throwable); - } catch (Exception ignored) { - } } } } diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkExitAdvice.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkExitAdvice.java index 6a26391af1c..1306da2b373 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkExitAdvice.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkExitAdvice.java @@ -21,14 +21,6 @@ public static void enter(@Advice.Argument(0) int exitCode) { .getMethod( "finishApplication", long.class, Throwable.class, int.class, String.class); method.invoke(datadogListener, System.currentTimeMillis(), null, exitCode, null); - } else { - // No Spark listener means we may be in a launcher-only process (e.g. SparkLauncher on EMR) - Class adviceClass = - Thread.currentThread() - .getContextClassLoader() - .loadClass("datadog.trace.instrumentation.spark.SparkLauncherAdvice"); - Method finishMethod = adviceClass.getDeclaredMethod("finishLauncherSpan", int.class); - finishMethod.invoke(null, exitCode); } } catch (Exception ignored) { } diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java index 7a5354bda58..4e7961fdf22 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java @@ -5,7 +5,6 @@ import datadog.trace.api.sampling.SamplingMechanism; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.lang.reflect.Field; import java.util.Map; import java.util.regex.Pattern; @@ -22,9 +21,7 @@ public class SparkLauncherAdvice { private static final Pattern CONF_REDACTION_PATTERN = Pattern.compile("(?i)secret|password|token|access.key|api.key"); - /** The launcher span, accessible from SparkExitAdvice via reflection. */ - @SuppressFBWarnings("PA_PUBLIC_PRIMITIVE_ATTRIBUTE") - public static volatile AgentSpan launcherSpan; + static volatile AgentSpan launcherSpan; private static volatile boolean shutdownHookRegistered = false; @@ -89,7 +86,7 @@ private static void setStringFieldAsTag( } } - public static synchronized void createLauncherSpan(String resource, Object launcher) { + static synchronized void createLauncherSpan(Object launcher) { if (launcherSpan != null) { return; } @@ -99,14 +96,10 @@ public static synchronized void createLauncherSpan(String resource, Object launc tracer .buildSpan("spark.launcher.launch") .withSpanType("spark") - .withResourceName(resource) + .withResourceName("SparkLauncher.startApplication") .start(); span.setSamplingPriority(PrioritySampling.USER_KEEP, SamplingMechanism.DATA_JOBS); - - if (launcher != null) { - setLauncherConfigTags(span, launcher); - } - + setLauncherConfigTags(span, launcher); launcherSpan = span; if (!shutdownHookRegistered) { @@ -127,20 +120,20 @@ public static synchronized void createLauncherSpan(String resource, Object launc } } - public static synchronized void finishLauncherSpan(int exitCode) { + static synchronized void finishSpan(boolean isError, String errorType) { AgentSpan span = launcherSpan; if (span == null) { return; } - if (exitCode != 0) { + if (isError) { span.setError(true); - span.setTag(DDTags.ERROR_TYPE, "Spark Launcher Failed with exit code " + exitCode); + span.setTag(DDTags.ERROR_TYPE, errorType); } span.finish(); launcherSpan = null; } - public static synchronized void finishLauncherSpan(Throwable throwable) { + static synchronized void finishSpanWithThrowable(Throwable throwable) { AgentSpan span = launcherSpan; if (span == null) { return; @@ -158,15 +151,10 @@ public static void exit( @Advice.This Object launcher, @Advice.Return SparkAppHandle handle, @Advice.Thrown Throwable throwable) { - createLauncherSpan("SparkLauncher.startApplication", launcher); + createLauncherSpan(launcher); if (throwable != null) { - AgentSpan span = launcherSpan; - if (span != null) { - span.addThrowable(throwable); - span.finish(); - launcherSpan = null; - } + finishSpanWithThrowable(throwable); return; } @@ -180,22 +168,6 @@ public static void exit( } } - public static class LaunchAdvice { - @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) - public static void exit(@Advice.This Object launcher, @Advice.Thrown Throwable throwable) { - createLauncherSpan("SparkLauncher.launch", launcher); - - if (throwable != null) { - AgentSpan span = launcherSpan; - if (span != null) { - span.addThrowable(throwable); - span.finish(); - launcherSpan = null; - } - } - } - } - static class AppHandleListener implements SparkAppHandle.Listener { @Override public void stateChanged(SparkAppHandle handle) { @@ -213,8 +185,9 @@ public void stateChanged(SparkAppHandle handle) { if (state == SparkAppHandle.State.FAILED || state == SparkAppHandle.State.KILLED || state == SparkAppHandle.State.LOST) { - span.setError(true); - span.setTag(DDTags.ERROR_TYPE, "Spark Application " + state); + finishSpan(true, "Spark Application " + state); + } else { + finishSpan(false, null); } } } diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java index ae83c5983df..d3d9c1b1e29 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java @@ -43,11 +43,5 @@ public void methodAdvice(MethodTransformer transformer) { .and(named("startApplication")) .and(isDeclaredBy(named("org.apache.spark.launcher.SparkLauncher"))), packageName + ".SparkLauncherAdvice$StartApplicationAdvice"); - - transformer.applyAdvice( - isMethod() - .and(named("launch")) - .and(isDeclaredBy(named("org.apache.spark.launcher.SparkLauncher"))), - packageName + ".SparkLauncherAdvice$LaunchAdvice"); } } diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy index e68d8b27667..69678fedcc8 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy @@ -4,11 +4,12 @@ import datadog.trace.agent.test.InstrumentationSpecification import datadog.trace.api.sampling.PrioritySampling import datadog.trace.api.sampling.SamplingMechanism import datadog.trace.bootstrap.instrumentation.api.AgentTracer +import org.apache.spark.launcher.SparkAppHandle import org.apache.spark.launcher.SparkLauncher class SparkLauncherTest extends InstrumentationSpecification { - def "createLauncherSpan extracts config tags from SparkLauncher"() { + def "StartApplicationAdvice extracts config tags from SparkLauncher"() { setup: SparkLauncherAdvice.launcherSpan = null @@ -21,8 +22,10 @@ class SparkLauncherTest extends InstrumentationSpecification { .setAppResource("/path/to/app.jar") .setConf("spark.executor.memory", "4g") .setConf("spark.executor.instances", "10") - SparkLauncherAdvice.createLauncherSpan("SparkLauncher.startApplication", launcher) - SparkLauncherAdvice.finishLauncherSpan(0) + // Call the advice directly (handle=null, throwable=null simulates a successful call + // where no SparkAppHandle is returned — span stays open for shutdown hook) + SparkLauncherAdvice.createLauncherSpan(launcher) + SparkLauncherAdvice.finishSpan(false, null) then: assertTraces(1) { @@ -44,7 +47,7 @@ class SparkLauncherTest extends InstrumentationSpecification { } } - def "createLauncherSpan redacts sensitive conf values"() { + def "StartApplicationAdvice redacts sensitive conf values"() { setup: SparkLauncherAdvice.launcherSpan = null @@ -54,8 +57,8 @@ class SparkLauncherTest extends InstrumentationSpecification { .setConf("spark.app.name", "my-secret-app") // spark.master is allowlisted; its value is harmless so should pass through .setConf("spark.master", "yarn") - SparkLauncherAdvice.createLauncherSpan("SparkLauncher.startApplication", launcher) - SparkLauncherAdvice.finishLauncherSpan(0) + SparkLauncherAdvice.createLauncherSpan(launcher) + SparkLauncherAdvice.finishSpan(false, null) then: assertTraces(1) { @@ -72,64 +75,93 @@ class SparkLauncherTest extends InstrumentationSpecification { } } - def "generate spark.launcher span from startApplication"() { + def "finishSpanWithThrowable finishes span with error"() { setup: SparkLauncherAdvice.launcherSpan = null when: + def launcher = new SparkLauncher().setAppName("test-app") + SparkLauncherAdvice.createLauncherSpan(launcher) + SparkLauncherAdvice.finishSpanWithThrowable(new RuntimeException("startApplication failed")) + + then: + SparkLauncherAdvice.launcherSpan == null + assertTraces(1) { + trace(1) { + span { + operationName "spark.launcher.launch" + spanType "spark" + errored true + } + } + } + } + + def "AppHandleListener finishes span on final state FINISHED"() { + setup: + SparkLauncherAdvice.launcherSpan = null def tracer = AgentTracer.get() - def launcherSpan = tracer - .buildSpan("spark.launcher") + SparkLauncherAdvice.launcherSpan = tracer + .buildSpan("spark.launcher.launch") .withSpanType("spark") .withResourceName("SparkLauncher.startApplication") .start() - launcherSpan.setSamplingPriority( + SparkLauncherAdvice.launcherSpan.setSamplingPriority( PrioritySampling.USER_KEEP, SamplingMechanism.DATA_JOBS) - SparkLauncherAdvice.launcherSpan = launcherSpan + def listener = new SparkLauncherAdvice.AppHandleListener() + def handle = Mock(SparkAppHandle) - SparkLauncherAdvice.finishLauncherSpan(1) + when: + handle.getState() >> SparkAppHandle.State.FINISHED + handle.getAppId() >> "app-123" + listener.stateChanged(handle) then: + SparkLauncherAdvice.launcherSpan == null assertTraces(1) { trace(1) { span { - operationName "spark.launcher" + operationName "spark.launcher.launch" spanType "spark" - resourceName "SparkLauncher.startApplication" - errored true - assert span.tags["error.type"] == "Spark Launcher Failed with exit code 1" + errored false + assert span.tags["spark.app_id"] == "app-123" + assert span.tags["spark.launcher.app_state"] == "FINISHED" } } } } - def "generate spark.launcher span with successful exit"() { + def "AppHandleListener finishes span with error on FAILED state"() { setup: SparkLauncherAdvice.launcherSpan = null - - when: def tracer = AgentTracer.get() - def launcherSpan = tracer - .buildSpan("spark.launcher") + SparkLauncherAdvice.launcherSpan = tracer + .buildSpan("spark.launcher.launch") .withSpanType("spark") - .withResourceName("SparkLauncher.launch") + .withResourceName("SparkLauncher.startApplication") .start() - launcherSpan.setSamplingPriority( + SparkLauncherAdvice.launcherSpan.setSamplingPriority( PrioritySampling.USER_KEEP, SamplingMechanism.DATA_JOBS) - SparkLauncherAdvice.launcherSpan = launcherSpan + def listener = new SparkLauncherAdvice.AppHandleListener() + def handle = Mock(SparkAppHandle) - SparkLauncherAdvice.finishLauncherSpan(0) + when: + handle.getState() >> SparkAppHandle.State.FAILED + handle.getAppId() >> "app-456" + listener.stateChanged(handle) then: + SparkLauncherAdvice.launcherSpan == null assertTraces(1) { trace(1) { span { - operationName "spark.launcher" + operationName "spark.launcher.launch" spanType "spark" - resourceName "SparkLauncher.launch" - errored false + errored true + assert span.tags["error.type"] == "Spark Application FAILED" + assert span.tags["spark.app_id"] == "app-456" } } } From 725bbf020465bbf73fa788b7f70863fe1b6f247f Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Fri, 20 Feb 2026 14:44:19 +0100 Subject: [PATCH 13/20] share common config key redaction method --- .../spark/SparkConfAllowList.java | 20 +++++++++++++++---- .../spark/SparkLauncherAdvice.java | 11 +--------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkConfAllowList.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkConfAllowList.java index 08ed700047a..23c13688306 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkConfAllowList.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkConfAllowList.java @@ -17,6 +17,12 @@ * @see Spark Configuration */ class SparkConfAllowList { + // Using values from + // https://github.com/apache/spark/blob/v3.5.1/core/src/main/scala/org/apache/spark/internal/config/package.scala#L1150-L1158 + static final String DEFAULT_REDACTION_REGEX = "(?i)secret|password|token|access.key|api.key"; + + private static final Pattern DEFAULT_REDACTION_PATTERN = Pattern.compile(DEFAULT_REDACTION_REGEX); + /** * Job-specific parameters that can be used to control job execution or provide metadata about the * job being executed @@ -80,11 +86,17 @@ public static boolean canCaptureJobParameter(String parameterName) { return allowedJobParams.contains(parameterName); } + /** Redact a value if the key or value matches the default redaction pattern. */ + public static String redactValue(String key, String value) { + if (DEFAULT_REDACTION_PATTERN.matcher(key).find() + || DEFAULT_REDACTION_PATTERN.matcher(value).find()) { + return "[redacted]"; + } + return value; + } + public static List> getRedactedSparkConf(SparkConf conf) { - // Using values from - // https://github.com/apache/spark/blob/v3.5.1/core/src/main/scala/org/apache/spark/internal/config/package.scala#L1150-L1158 - String redactionPattern = - conf.get("spark.redaction.regex", "(?i)secret|password|token|access.key|api.key"); + String redactionPattern = conf.get("spark.redaction.regex", DEFAULT_REDACTION_REGEX); List> redacted = new ArrayList<>(); Pattern pattern = Pattern.compile(redactionPattern); diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java index 4e7961fdf22..bb596973183 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java @@ -7,7 +7,6 @@ import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import java.lang.reflect.Field; import java.util.Map; -import java.util.regex.Pattern; import net.bytebuddy.asm.Advice; import org.apache.spark.launcher.SparkAppHandle; import org.slf4j.Logger; @@ -17,10 +16,6 @@ public class SparkLauncherAdvice { private static final Logger log = LoggerFactory.getLogger(SparkLauncherAdvice.class); - // Same default pattern as spark.redaction.regex in Spark source - private static final Pattern CONF_REDACTION_PATTERN = - Pattern.compile("(?i)secret|password|token|access.key|api.key"); - static volatile AgentSpan launcherSpan; private static volatile boolean shutdownHookRegistered = false; @@ -55,11 +50,7 @@ private static void setLauncherConfigTags(AgentSpan span, Object launcher) { if (conf != null) { for (Map.Entry entry : conf.entrySet()) { if (SparkConfAllowList.canCaptureJobParameter(entry.getKey())) { - String value = entry.getValue(); - if (CONF_REDACTION_PATTERN.matcher(entry.getKey()).find() - || CONF_REDACTION_PATTERN.matcher(value).find()) { - value = "[redacted]"; - } + String value = SparkConfAllowList.redactValue(entry.getKey(), entry.getValue()); span.setTag("config." + entry.getKey().replace('.', '_'), value); } } From f57bf18ccc792b3887f4329edbb91335f8a859f8 Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Fri, 20 Feb 2026 15:05:49 +0100 Subject: [PATCH 14/20] make public to avoid IllegalAccessError --- .../trace/instrumentation/spark/SparkLauncherAdvice.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java index bb596973183..4d6496ad8cd 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java @@ -77,7 +77,7 @@ private static void setStringFieldAsTag( } } - static synchronized void createLauncherSpan(Object launcher) { + public static synchronized void createLauncherSpan(Object launcher) { if (launcherSpan != null) { return; } @@ -111,7 +111,7 @@ static synchronized void createLauncherSpan(Object launcher) { } } - static synchronized void finishSpan(boolean isError, String errorType) { + public static synchronized void finishSpan(boolean isError, String errorType) { AgentSpan span = launcherSpan; if (span == null) { return; @@ -124,7 +124,7 @@ static synchronized void finishSpan(boolean isError, String errorType) { launcherSpan = null; } - static synchronized void finishSpanWithThrowable(Throwable throwable) { + public static synchronized void finishSpanWithThrowable(Throwable throwable) { AgentSpan span = launcherSpan; if (span == null) { return; @@ -159,7 +159,7 @@ public static void exit( } } - static class AppHandleListener implements SparkAppHandle.Listener { + public static class AppHandleListener implements SparkAppHandle.Listener { @Override public void stateChanged(SparkAppHandle handle) { SparkAppHandle.State state = handle.getState(); From c02607dca45b1ad58c9304b1df306060cdaf0245 Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Fri, 20 Feb 2026 15:56:20 +0100 Subject: [PATCH 15/20] error type and error message --- .../trace/instrumentation/spark/SparkLauncherAdvice.java | 7 ++++--- .../trace/instrumentation/spark/SparkLauncherTest.groovy | 3 ++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java index 4d6496ad8cd..4f79e553434 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java @@ -111,14 +111,15 @@ public static synchronized void createLauncherSpan(Object launcher) { } } - public static synchronized void finishSpan(boolean isError, String errorType) { + public static synchronized void finishSpan(boolean isError, String errorMessage) { AgentSpan span = launcherSpan; if (span == null) { return; } if (isError) { span.setError(true); - span.setTag(DDTags.ERROR_TYPE, errorType); + span.setTag(DDTags.ERROR_TYPE, "Spark Launcher Failed"); + span.setTag(DDTags.ERROR_MSG, errorMessage); } span.finish(); launcherSpan = null; @@ -176,7 +177,7 @@ public void stateChanged(SparkAppHandle handle) { if (state == SparkAppHandle.State.FAILED || state == SparkAppHandle.State.KILLED || state == SparkAppHandle.State.LOST) { - finishSpan(true, "Spark Application " + state); + finishSpan(true, "Application " + state); } else { finishSpan(false, null); } diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy index 69678fedcc8..cd6d1e1e51a 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy @@ -160,7 +160,8 @@ class SparkLauncherTest extends InstrumentationSpecification { operationName "spark.launcher.launch" spanType "spark" errored true - assert span.tags["error.type"] == "Spark Application FAILED" + assert span.tags["error.type"] == "Spark Launcher Failed" + assert span.tags["error.msg"] == "Application FAILED" assert span.tags["spark.app_id"] == "app-456" } } From 95c6c74e1748449bc435416b1db21ec4789b8846 Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Fri, 20 Feb 2026 16:30:01 +0100 Subject: [PATCH 16/20] Add appId and stack trace --- .../instrumentation/spark/AbstractSparkInstrumentation.java | 2 ++ .../trace/instrumentation/spark/SparkLauncherAdvice.java | 2 ++ 2 files changed, 4 insertions(+) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java index 80c71a4f64b..9e6e9619001 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java @@ -106,6 +106,8 @@ public static void exit(@Advice.Thrown Throwable throwable) { if (AbstractDatadogSparkListener.listener != null) { AbstractDatadogSparkListener.listener.finishApplication( System.currentTimeMillis(), throwable, 0, null); + } else { + SparkLauncherAdvice.finishSpanWithThrowable(throwable); } } } diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java index 4f79e553434..ad8ac099090 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java @@ -171,6 +171,7 @@ public void stateChanged(SparkAppHandle handle) { String appId = handle.getAppId(); if (appId != null) { span.setTag("spark.app_id", appId); + span.setTag("app_id", appId); } if (state.isFinal()) { @@ -192,6 +193,7 @@ public void infoChanged(SparkAppHandle handle) { String appId = handle.getAppId(); if (appId != null) { span.setTag("spark.app_id", appId); + span.setTag("app_id", appId); } } } From 9c973acb2c4dbdbcff7c27762c7390233e11b670 Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Fri, 20 Feb 2026 17:50:21 +0100 Subject: [PATCH 17/20] extract span building in SparkLaunchListener --- .../spark/AbstractSparkInstrumentation.java | 2 +- .../spark/SparkLauncherInstrumentation.java | 26 ++- ...Advice.java => SparkLauncherListener.java} | 181 ++++++++---------- .../spark/SparkLauncherTest.groovy | 48 +++-- .../spark/Spark212Instrumentation.java | 1 + .../spark/Spark213Instrumentation.java | 1 + 6 files changed, 125 insertions(+), 134 deletions(-) rename dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/{SparkLauncherAdvice.java => SparkLauncherListener.java} (67%) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java index 9e6e9619001..ea2c1080a06 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java @@ -107,7 +107,7 @@ public static void exit(@Advice.Thrown Throwable throwable) { AbstractDatadogSparkListener.listener.finishApplication( System.currentTimeMillis(), throwable, 0, null); } else { - SparkLauncherAdvice.finishSpanWithThrowable(throwable); + SparkLauncherListener.finishSpanWithThrowable(throwable); } } } diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java index d3d9c1b1e29..3e1b2ffd93c 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java @@ -8,6 +8,8 @@ import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.InstrumenterModule; import datadog.trace.api.InstrumenterConfig; +import net.bytebuddy.asm.Advice; +import org.apache.spark.launcher.SparkAppHandle; @AutoService(InstrumenterModule.class) public class SparkLauncherInstrumentation extends InstrumenterModule.Tracing @@ -31,8 +33,7 @@ public String instrumentedType() { public String[] helperClassNames() { return new String[] { packageName + ".SparkConfAllowList", - packageName + ".SparkLauncherAdvice", - packageName + ".SparkLauncherAdvice$AppHandleListener", + packageName + ".SparkLauncherListener", }; } @@ -42,6 +43,25 @@ public void methodAdvice(MethodTransformer transformer) { isMethod() .and(named("startApplication")) .and(isDeclaredBy(named("org.apache.spark.launcher.SparkLauncher"))), - packageName + ".SparkLauncherAdvice$StartApplicationAdvice"); + SparkLauncherInstrumentation.class.getName() + "$StartApplicationAdvice"); + } + + public static class StartApplicationAdvice { + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void exit( + @Advice.This Object launcher, + @Advice.Return SparkAppHandle handle, + @Advice.Thrown Throwable throwable) { + SparkLauncherListener.createLauncherSpan(launcher); + + if (throwable != null) { + SparkLauncherListener.finishSpanWithThrowable(throwable); + return; + } + + if (handle != null) { + handle.addListener(new SparkLauncherListener()); + } + } } } diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java similarity index 67% rename from dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java rename to dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java index ad8ac099090..c7c54dbfd01 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java @@ -7,76 +7,18 @@ import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import java.lang.reflect.Field; import java.util.Map; -import net.bytebuddy.asm.Advice; import org.apache.spark.launcher.SparkAppHandle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SparkLauncherAdvice { +public class SparkLauncherListener implements SparkAppHandle.Listener { - private static final Logger log = LoggerFactory.getLogger(SparkLauncherAdvice.class); + private static final Logger log = LoggerFactory.getLogger(SparkLauncherListener.class); static volatile AgentSpan launcherSpan; private static volatile boolean shutdownHookRegistered = false; - /** Extract SparkLauncher configuration via reflection and set as span tags. */ - private static void setLauncherConfigTags(AgentSpan span, Object launcher) { - try { - // SparkLauncher extends AbstractLauncher which has a 'builder' field - Field builderField = launcher.getClass().getSuperclass().getDeclaredField("builder"); - builderField.setAccessible(true); - Object builder = builderField.get(launcher); - if (builder == null) { - return; - } - - Class builderClass = builder.getClass(); - // Fields are on AbstractCommandBuilder (parent of SparkSubmitCommandBuilder) - Class abstractBuilderClass = builderClass.getSuperclass(); - - setStringFieldAsTag(span, builder, abstractBuilderClass, "master", "master"); - setStringFieldAsTag(span, builder, abstractBuilderClass, "deployMode", "deploy_mode"); - setStringFieldAsTag(span, builder, abstractBuilderClass, "appName", "application_name"); - setStringFieldAsTag(span, builder, abstractBuilderClass, "mainClass", "main_class"); - setStringFieldAsTag(span, builder, abstractBuilderClass, "appResource", "app_resource"); - - // Extract spark conf entries and redact sensitive values - try { - Field confField = abstractBuilderClass.getDeclaredField("conf"); - confField.setAccessible(true); - @SuppressWarnings("unchecked") - Map conf = (Map) confField.get(builder); - if (conf != null) { - for (Map.Entry entry : conf.entrySet()) { - if (SparkConfAllowList.canCaptureJobParameter(entry.getKey())) { - String value = SparkConfAllowList.redactValue(entry.getKey(), entry.getValue()); - span.setTag("config." + entry.getKey().replace('.', '_'), value); - } - } - } - } catch (NoSuchFieldException e) { - log.debug("Could not find conf field on builder", e); - } - } catch (Exception e) { - log.debug("Failed to extract SparkLauncher configuration", e); - } - } - - private static void setStringFieldAsTag( - AgentSpan span, Object obj, Class clazz, String fieldName, String tagName) { - try { - Field field = clazz.getDeclaredField(fieldName); - field.setAccessible(true); - Object value = field.get(obj); - if (value != null) { - span.setTag(tagName, value.toString()); - } - } catch (Exception e) { - log.debug("Could not read field {} from builder", fieldName, e); - } - } - public static synchronized void createLauncherSpan(Object launcher) { if (launcherSpan != null) { return; @@ -93,13 +35,14 @@ public static synchronized void createLauncherSpan(Object launcher) { setLauncherConfigTags(span, launcher); launcherSpan = span; + if (!shutdownHookRegistered) { shutdownHookRegistered = true; Runtime.getRuntime() .addShutdownHook( new Thread( () -> { - synchronized (SparkLauncherAdvice.class) { + synchronized (SparkLauncherListener.class) { AgentSpan s = launcherSpan; if (s != null) { log.info("Finishing spark.launcher span from shutdown hook"); @@ -137,65 +80,93 @@ public static synchronized void finishSpanWithThrowable(Throwable throwable) { launcherSpan = null; } - public static class StartApplicationAdvice { - @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) - public static void exit( - @Advice.This Object launcher, - @Advice.Return SparkAppHandle handle, - @Advice.Thrown Throwable throwable) { - createLauncherSpan(launcher); + @Override + public void stateChanged(SparkAppHandle handle) { + SparkAppHandle.State state = handle.getState(); + AgentSpan span = launcherSpan; + if (span != null) { + span.setTag("spark.launcher.app_state", state.toString()); - if (throwable != null) { - finishSpanWithThrowable(throwable); - return; + String appId = handle.getAppId(); + if (appId != null) { + span.setTag("spark.app_id", appId); + span.setTag("app_id", appId); } - if (handle != null) { - try { - handle.addListener(new AppHandleListener()); - } catch (Exception e) { - log.debug("Failed to register SparkAppHandle listener", e); + if (state.isFinal()) { + if (state == SparkAppHandle.State.FAILED + || state == SparkAppHandle.State.KILLED + || state == SparkAppHandle.State.LOST) { + finishSpan(true, "Application " + state); + } else { + finishSpan(false, null); } } } } - public static class AppHandleListener implements SparkAppHandle.Listener { - @Override - public void stateChanged(SparkAppHandle handle) { - SparkAppHandle.State state = handle.getState(); - AgentSpan span = launcherSpan; - if (span != null) { - span.setTag("spark.launcher.app_state", state.toString()); - - String appId = handle.getAppId(); - if (appId != null) { - span.setTag("spark.app_id", appId); - span.setTag("app_id", appId); - } + @Override + public void infoChanged(SparkAppHandle handle) { + AgentSpan span = launcherSpan; + if (span != null) { + String appId = handle.getAppId(); + if (appId != null) { + span.setTag("spark.app_id", appId); + span.setTag("app_id", appId); + } + } + } - if (state.isFinal()) { - if (state == SparkAppHandle.State.FAILED - || state == SparkAppHandle.State.KILLED - || state == SparkAppHandle.State.LOST) { - finishSpan(true, "Application " + state); - } else { - finishSpan(false, null); + private static void setLauncherConfigTags(AgentSpan span, Object launcher) { + try { + Field builderField = launcher.getClass().getSuperclass().getDeclaredField("builder"); + builderField.setAccessible(true); + Object builder = builderField.get(launcher); + if (builder == null) { + return; + } + + Class builderClass = builder.getClass(); + Class abstractBuilderClass = builderClass.getSuperclass(); + + setStringFieldAsTag(span, builder, abstractBuilderClass, "master", "master"); + setStringFieldAsTag(span, builder, abstractBuilderClass, "deployMode", "deploy_mode"); + setStringFieldAsTag(span, builder, abstractBuilderClass, "appName", "application_name"); + setStringFieldAsTag(span, builder, abstractBuilderClass, "mainClass", "main_class"); + setStringFieldAsTag(span, builder, abstractBuilderClass, "appResource", "app_resource"); + + try { + Field confField = abstractBuilderClass.getDeclaredField("conf"); + confField.setAccessible(true); + @SuppressWarnings("unchecked") + Map conf = (Map) confField.get(builder); + if (conf != null) { + for (Map.Entry entry : conf.entrySet()) { + if (SparkConfAllowList.canCaptureJobParameter(entry.getKey())) { + String value = SparkConfAllowList.redactValue(entry.getKey(), entry.getValue()); + span.setTag("config." + entry.getKey().replace('.', '_'), value); + } } } + } catch (NoSuchFieldException e) { + log.debug("Could not find conf field on builder", e); } + } catch (Exception e) { + log.debug("Failed to extract SparkLauncher configuration", e); } + } - @Override - public void infoChanged(SparkAppHandle handle) { - AgentSpan span = launcherSpan; - if (span != null) { - String appId = handle.getAppId(); - if (appId != null) { - span.setTag("spark.app_id", appId); - span.setTag("app_id", appId); - } + private static void setStringFieldAsTag( + AgentSpan span, Object obj, Class clazz, String fieldName, String tagName) { + try { + Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + Object value = field.get(obj); + if (value != null) { + span.setTag(tagName, value.toString()); } + } catch (Exception e) { + log.debug("Could not read field {} from builder", fieldName, e); } } } diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy index cd6d1e1e51a..ca8ec80ddde 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy @@ -11,7 +11,7 @@ class SparkLauncherTest extends InstrumentationSpecification { def "StartApplicationAdvice extracts config tags from SparkLauncher"() { setup: - SparkLauncherAdvice.launcherSpan = null + SparkLauncherListener.launcherSpan = null when: def launcher = new SparkLauncher() @@ -22,10 +22,8 @@ class SparkLauncherTest extends InstrumentationSpecification { .setAppResource("/path/to/app.jar") .setConf("spark.executor.memory", "4g") .setConf("spark.executor.instances", "10") - // Call the advice directly (handle=null, throwable=null simulates a successful call - // where no SparkAppHandle is returned — span stays open for shutdown hook) - SparkLauncherAdvice.createLauncherSpan(launcher) - SparkLauncherAdvice.finishSpan(false, null) + SparkLauncherListener.createLauncherSpan(launcher) + SparkLauncherListener.finishSpan(false, null) then: assertTraces(1) { @@ -49,7 +47,7 @@ class SparkLauncherTest extends InstrumentationSpecification { def "StartApplicationAdvice redacts sensitive conf values"() { setup: - SparkLauncherAdvice.launcherSpan = null + SparkLauncherListener.launcherSpan = null when: def launcher = new SparkLauncher() @@ -57,8 +55,8 @@ class SparkLauncherTest extends InstrumentationSpecification { .setConf("spark.app.name", "my-secret-app") // spark.master is allowlisted; its value is harmless so should pass through .setConf("spark.master", "yarn") - SparkLauncherAdvice.createLauncherSpan(launcher) - SparkLauncherAdvice.finishSpan(false, null) + SparkLauncherListener.createLauncherSpan(launcher) + SparkLauncherListener.finishSpan(false, null) then: assertTraces(1) { @@ -77,15 +75,15 @@ class SparkLauncherTest extends InstrumentationSpecification { def "finishSpanWithThrowable finishes span with error"() { setup: - SparkLauncherAdvice.launcherSpan = null + SparkLauncherListener.launcherSpan = null when: def launcher = new SparkLauncher().setAppName("test-app") - SparkLauncherAdvice.createLauncherSpan(launcher) - SparkLauncherAdvice.finishSpanWithThrowable(new RuntimeException("startApplication failed")) + SparkLauncherListener.createLauncherSpan(launcher) + SparkLauncherListener.finishSpanWithThrowable(new RuntimeException("startApplication failed")) then: - SparkLauncherAdvice.launcherSpan == null + SparkLauncherListener.launcherSpan == null assertTraces(1) { trace(1) { span { @@ -97,19 +95,19 @@ class SparkLauncherTest extends InstrumentationSpecification { } } - def "AppHandleListener finishes span on final state FINISHED"() { + def "SparkLauncherListener finishes span on final state FINISHED"() { setup: - SparkLauncherAdvice.launcherSpan = null + SparkLauncherListener.launcherSpan = null def tracer = AgentTracer.get() - SparkLauncherAdvice.launcherSpan = tracer + SparkLauncherListener.launcherSpan = tracer .buildSpan("spark.launcher.launch") .withSpanType("spark") .withResourceName("SparkLauncher.startApplication") .start() - SparkLauncherAdvice.launcherSpan.setSamplingPriority( + SparkLauncherListener.launcherSpan.setSamplingPriority( PrioritySampling.USER_KEEP, SamplingMechanism.DATA_JOBS) - def listener = new SparkLauncherAdvice.AppHandleListener() + def listener = new SparkLauncherListener() def handle = Mock(SparkAppHandle) when: @@ -118,7 +116,7 @@ class SparkLauncherTest extends InstrumentationSpecification { listener.stateChanged(handle) then: - SparkLauncherAdvice.launcherSpan == null + SparkLauncherListener.launcherSpan == null assertTraces(1) { trace(1) { span { @@ -132,19 +130,19 @@ class SparkLauncherTest extends InstrumentationSpecification { } } - def "AppHandleListener finishes span with error on FAILED state"() { + def "SparkLauncherListener finishes span with error on FAILED state"() { setup: - SparkLauncherAdvice.launcherSpan = null + SparkLauncherListener.launcherSpan = null def tracer = AgentTracer.get() - SparkLauncherAdvice.launcherSpan = tracer + SparkLauncherListener.launcherSpan = tracer .buildSpan("spark.launcher.launch") .withSpanType("spark") .withResourceName("SparkLauncher.startApplication") .start() - SparkLauncherAdvice.launcherSpan.setSamplingPriority( + SparkLauncherListener.launcherSpan.setSamplingPriority( PrioritySampling.USER_KEEP, SamplingMechanism.DATA_JOBS) - def listener = new SparkLauncherAdvice.AppHandleListener() + def listener = new SparkLauncherListener() def handle = Mock(SparkAppHandle) when: @@ -153,7 +151,7 @@ class SparkLauncherTest extends InstrumentationSpecification { listener.stateChanged(handle) then: - SparkLauncherAdvice.launcherSpan == null + SparkLauncherListener.launcherSpan == null assertTraces(1) { trace(1) { span { @@ -161,7 +159,7 @@ class SparkLauncherTest extends InstrumentationSpecification { spanType "spark" errored true assert span.tags["error.type"] == "Spark Launcher Failed" - assert span.tags["error.msg"] == "Application FAILED" + assert span.tags["error.message"] == "Application FAILED" assert span.tags["spark.app_id"] == "app-456" } } diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java index 4cd23089cd1..7b00f04b30c 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java @@ -32,6 +32,7 @@ public String[] helperClassNames() { packageName + ".RemoveEldestHashMap", packageName + ".SparkAggregatedTaskMetrics", packageName + ".SparkConfAllowList", + packageName + ".SparkLauncherListener", packageName + ".SparkSQLUtils", packageName + ".SparkSQLUtils$SparkPlanInfoForStage", packageName + ".SparkSQLUtils$AccumulatorWithStage", diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java index c9e534f6429..eda0436f464 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java @@ -32,6 +32,7 @@ public String[] helperClassNames() { packageName + ".RemoveEldestHashMap", packageName + ".SparkAggregatedTaskMetrics", packageName + ".SparkConfAllowList", + packageName + ".SparkLauncherListener", packageName + ".SparkSQLUtils", packageName + ".SparkSQLUtils$SparkPlanInfoForStage", packageName + ".SparkSQLUtils$AccumulatorWithStage", From 4b9b2257f829d04dd35730264a7ac484793be249 Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Fri, 20 Feb 2026 19:05:14 +0100 Subject: [PATCH 18/20] wait for throwable and let the span be finished by shutdown hook --- .../spark/SparkLauncherListener.java | 7 +++- .../spark/SparkLauncherTest.groovy | 41 +++++++++++++++++-- 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java index c7c54dbfd01..1b784bb6bba 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java @@ -97,7 +97,12 @@ public void stateChanged(SparkAppHandle handle) { if (state == SparkAppHandle.State.FAILED || state == SparkAppHandle.State.KILLED || state == SparkAppHandle.State.LOST) { - finishSpan(true, "Application " + state); + // Set error tags but don't finish yet — RunMainAdvice may add the throwable + // with the full stack trace. The span will be finished by RunMainAdvice or + // the shutdown hook. + span.setError(true); + span.setTag(DDTags.ERROR_TYPE, "Spark Launcher Failed"); + span.setTag(DDTags.ERROR_MSG, "Application " + state); } else { finishSpan(false, null); } diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy index ca8ec80ddde..f7a5b5a9601 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy @@ -130,7 +130,7 @@ class SparkLauncherTest extends InstrumentationSpecification { } } - def "SparkLauncherListener finishes span with error on FAILED state"() { + def "SparkLauncherListener sets error tags on FAILED state but does not finish span"() { setup: SparkLauncherListener.launcherSpan = null def tracer = AgentTracer.get() @@ -150,6 +150,40 @@ class SparkLauncherTest extends InstrumentationSpecification { handle.getAppId() >> "app-456" listener.stateChanged(handle) + then: + // Span stays open so RunMainAdvice can add the throwable + SparkLauncherListener.launcherSpan != null + SparkLauncherListener.launcherSpan.isError() + SparkLauncherListener.launcherSpan.getTags()["error.type"] == "Spark Launcher Failed" + SparkLauncherListener.launcherSpan.getTags()["error.message"] == "Application FAILED" + SparkLauncherListener.launcherSpan.getTags()["spark.app_id"] == "app-456" + + cleanup: + SparkLauncherListener.finishSpan(false, null) + } + + def "finishSpanWithThrowable adds stack trace after FAILED state"() { + setup: + SparkLauncherListener.launcherSpan = null + def tracer = AgentTracer.get() + SparkLauncherListener.launcherSpan = tracer + .buildSpan("spark.launcher.launch") + .withSpanType("spark") + .withResourceName("SparkLauncher.startApplication") + .start() + SparkLauncherListener.launcherSpan.setSamplingPriority( + PrioritySampling.USER_KEEP, + SamplingMechanism.DATA_JOBS) + def listener = new SparkLauncherListener() + def handle = Mock(SparkAppHandle) + + when: + // Simulate: listener sets error tags, then RunMainAdvice finishes with throwable + handle.getState() >> SparkAppHandle.State.FAILED + handle.getAppId() >> "app-456" + listener.stateChanged(handle) + SparkLauncherListener.finishSpanWithThrowable(new RuntimeException("job crashed")) + then: SparkLauncherListener.launcherSpan == null assertTraces(1) { @@ -158,8 +192,9 @@ class SparkLauncherTest extends InstrumentationSpecification { operationName "spark.launcher.launch" spanType "spark" errored true - assert span.tags["error.type"] == "Spark Launcher Failed" - assert span.tags["error.message"] == "Application FAILED" + assert span.tags["error.type"] == "java.lang.RuntimeException" + assert span.tags["error.message"] == "job crashed" + assert span.tags["error.stack"] != null assert span.tags["spark.app_id"] == "app-456" } } From 64343941532a37f5e80a0685778456ef29eb258b Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Sat, 21 Feb 2026 21:47:16 +0100 Subject: [PATCH 19/20] spotless apply --- .../instrumentation/spark/SparkLauncherInstrumentation.java | 3 +-- .../trace/instrumentation/spark/SparkLauncherListener.java | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java index 3e1b2ffd93c..d072aa89db6 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java @@ -32,8 +32,7 @@ public String instrumentedType() { @Override public String[] helperClassNames() { return new String[] { - packageName + ".SparkConfAllowList", - packageName + ".SparkLauncherListener", + packageName + ".SparkConfAllowList", packageName + ".SparkLauncherListener", }; } diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java index 1b784bb6bba..ba7b1d5376c 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java @@ -35,7 +35,6 @@ public static synchronized void createLauncherSpan(Object launcher) { setLauncherConfigTags(span, launcher); launcherSpan = span; - if (!shutdownHookRegistered) { shutdownHookRegistered = true; Runtime.getRuntime() From 7f4844a092a5b44c9bfe063ce0b2a938833af1fe Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Mon, 23 Feb 2026 10:11:08 +0100 Subject: [PATCH 20/20] fix span lifecycle in the launcher listener --- .../spark/SparkLauncherListener.java | 72 +++++++++++-------- 1 file changed, 44 insertions(+), 28 deletions(-) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java index ba7b1d5376c..8fac8a092d6 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java @@ -11,6 +11,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Listener for SparkLauncher spans. Tracks the lifecycle of a Spark application submitted via + * SparkLauncher.startApplication(). Only a single launcher span can be active at a time. Subsequent + * calls to startApplication() from the same or different launcher instances will not create spans; + * only the first launch in the JVM is traced + */ public class SparkLauncherListener implements SparkAppHandle.Listener { private static final Logger log = LoggerFactory.getLogger(SparkLauncherListener.class); @@ -81,29 +87,31 @@ public static synchronized void finishSpanWithThrowable(Throwable throwable) { @Override public void stateChanged(SparkAppHandle handle) { - SparkAppHandle.State state = handle.getState(); - AgentSpan span = launcherSpan; - if (span != null) { - span.setTag("spark.launcher.app_state", state.toString()); - - String appId = handle.getAppId(); - if (appId != null) { - span.setTag("spark.app_id", appId); - span.setTag("app_id", appId); - } + synchronized (SparkLauncherListener.class) { + SparkAppHandle.State state = handle.getState(); + AgentSpan span = launcherSpan; + if (span != null) { + span.setTag("spark.launcher.app_state", state.toString()); + + String appId = handle.getAppId(); + if (appId != null) { + span.setTag("spark.app_id", appId); + span.setTag("app_id", appId); + } - if (state.isFinal()) { - if (state == SparkAppHandle.State.FAILED - || state == SparkAppHandle.State.KILLED - || state == SparkAppHandle.State.LOST) { - // Set error tags but don't finish yet — RunMainAdvice may add the throwable - // with the full stack trace. The span will be finished by RunMainAdvice or - // the shutdown hook. - span.setError(true); - span.setTag(DDTags.ERROR_TYPE, "Spark Launcher Failed"); - span.setTag(DDTags.ERROR_MSG, "Application " + state); - } else { - finishSpan(false, null); + if (state.isFinal()) { + if (state == SparkAppHandle.State.FAILED + || state == SparkAppHandle.State.KILLED + || state == SparkAppHandle.State.LOST) { + // Set error tags but don't finish yet — RunMainAdvice may add the throwable + // with the full stack trace. The span will be finished by RunMainAdvice or + // the shutdown hook. + span.setError(true); + span.setTag(DDTags.ERROR_TYPE, "Spark Launcher Failed"); + span.setTag(DDTags.ERROR_MSG, "Application " + state); + } else { + finishSpan(false, null); + } } } } @@ -111,16 +119,24 @@ public void stateChanged(SparkAppHandle handle) { @Override public void infoChanged(SparkAppHandle handle) { - AgentSpan span = launcherSpan; - if (span != null) { - String appId = handle.getAppId(); - if (appId != null) { - span.setTag("spark.app_id", appId); - span.setTag("app_id", appId); + synchronized (SparkLauncherListener.class) { + AgentSpan span = launcherSpan; + if (span != null) { + String appId = handle.getAppId(); + if (appId != null) { + span.setTag("spark.app_id", appId); + span.setTag("app_id", appId); + } } } } + /** + * Extract launcher configuration via reflection and set as span tags. Secret redaction uses the + * default pattern only (not spark.redaction.regex) because the SparkLauncher conf map is a plain + * Map, not a SparkConf, so there is no way to read the user's custom redaction regex at this + * point. + */ private static void setLauncherConfigTags(AgentSpan span, Object launcher) { try { Field builderField = launcher.getClass().getSuperclass().getDeclaredField("builder");