Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import datadog.trace.agent.test.AbstractInstrumentationTest;
import datadog.trace.api.Trace;
import datadog.trace.core.DDSpan;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.jupiter.api.Test;

/**
* Reproductions for the executor swap profiling-backend PR#8520 made for OkHttp's Dispatcher
* (cached thread pool → {@code Executors.newThreadPerTaskExecutor(Thread.ofVirtual()...)}).
*
* <p>Each test asserts that an {@code @Trace}-annotated child method invoked from inside a task
* submitted to the virtual-thread-per-task executor attaches to the parent span that was active at
* submission time. If propagation breaks, the child either lands in a different trace or floats
* free as a root span and the {@code assertEquals(parentSpanId, childSpan.getParentId())} check
* fails.
*
* <p>This mirrors what would happen with OkHttp: the {@code okhttp} client span is created by
* {@code TracingInterceptor.intercept()} on the executor thread using {@code activeSpan()}, so "is
* the parent scope active on the worker?" is the entire question.
*/
class ThreadPerTaskExecutorVirtualThreadTest extends AbstractInstrumentationTest {

@Trace(operationName = "parent")
void underParentTrace(int expectedChildren, Runnable body) {
body.run();
blockUntilChildSpansFinished(expectedChildren);
}

@Trace(operationName = "child")
static void child() {}

@Test
void virtualThreadFactory_propagatesActiveScope() throws Exception {
ExecutorService executor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual().factory());
try {
underParentTrace(1, () -> executor.execute(ThreadPerTaskExecutorVirtualThreadTest::child));
} finally {
executor.shutdown();
}

writer.waitForTraces(1);
List<DDSpan> trace = writer.get(0);
assertEquals(2, trace.size(), "expected parent + child span");
DDSpan parentSpan = findByOp(trace, "parent");
DDSpan childSpan = findByOp(trace, "child");
assertNotNull(parentSpan, "parent span should exist");
assertNotNull(childSpan, "child span should exist");
assertEquals(
parentSpan.getSpanId(),
childSpan.getParentId(),
"child span should be parented under the active span at executor.execute()");
}

@Test
void namedVirtualThreadFactory_propagatesActiveScope() throws Exception {
// Exact builder shape from profiling-backend PR#8520:
// Thread.ofVirtual().name("okhttp-" + track + "-", 0).factory()
ExecutorService executor =
Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("okhttp-test-", 0).factory());
try {
underParentTrace(1, () -> executor.execute(ThreadPerTaskExecutorVirtualThreadTest::child));
} finally {
executor.shutdown();
}

writer.waitForTraces(1);
List<DDSpan> trace = writer.get(0);
DDSpan parentSpan = findByOp(trace, "parent");
DDSpan childSpan = findByOp(trace, "child");
assertNotNull(parentSpan, "parent span should exist");
assertNotNull(childSpan, "child span should exist");
assertEquals(
parentSpan.getSpanId(),
childSpan.getParentId(),
"the .name(prefix, start) builder should not break propagation");
}

/**
* Mirrors OkHttp's dispatcher recursion: a task running on the executor submits more work back to
* the same executor. In OkHttp this is {@code Dispatcher.finished() &rarr; promoteAndExecute()
* &rarr; executorService.execute(nextAsyncCall)}, called from inside an {@code AsyncCall.run()}
* on a dispatcher thread.
*
* <p>Both child spans should land in the parent's trace. If the worker thread loses the parent
* scope between the outer activation and the inner submission, the second child either becomes a
* new root span or gets attached to the wrong sibling.
*/
@Test
void recursiveSubmissionFromWorkerThread_keepsTraceConnected() throws Exception {
ExecutorService executor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual().factory());
try {
underParentTrace(
2,
() ->
executor.execute(
() -> {
child();
executor.execute(ThreadPerTaskExecutorVirtualThreadTest::child);
}));
} finally {
executor.shutdown();
}

writer.waitForTraces(1);
List<DDSpan> trace = writer.get(0);
DDSpan parentSpan = findByOp(trace, "parent");
assertNotNull(parentSpan, "parent span should exist");
long parentTraceId = parentSpan.getTraceId().toLong();
long parentSpanId = parentSpan.getSpanId();

long childCount =
trace.stream().filter(s -> "child".contentEquals(s.getOperationName())).count();
assertEquals(2, childCount, "both child spans should land in the same trace as the parent");

trace.stream()
.filter(s -> "child".contentEquals(s.getOperationName()))
.forEach(
s -> {
assertEquals(
parentTraceId,
s.getTraceId().toLong(),
"child span must share the parent's trace");
assertEquals(
parentSpanId,
s.getParentId(),
"child span must attach to the parent, not float free");
});
}

/**
* Forces the virtual thread to unmount and remount mid-task by sleeping between two child spans.
* This is the case OkHttp actually hits in practice &mdash; every blocking socket read unmounts
* the carrier, and the scope stack has to be reinstated by {@code VirtualThreadInstrumentation}
* on each remount. The fix log for this area is long (#10931, #11009, #11111), so it is worth
* pinning down with a regression test.
*/
@Test
void virtualThreadFactory_propagatesAcrossUnmount() throws Exception {
ExecutorService executor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual().factory());
try {
underParentTrace(
2,
() ->
executor.execute(
() -> {
child();
try {
// Sleep is a JDK 21+ carrier-unmounting parking point for virtual threads.
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
child();
}));
} finally {
executor.shutdown();
}

writer.waitForTraces(1);
List<DDSpan> trace = writer.get(0);
DDSpan parentSpan = findByOp(trace, "parent");
assertNotNull(parentSpan, "parent span should exist");
long parentSpanId = parentSpan.getSpanId();

long childCount =
trace.stream().filter(s -> "child".contentEquals(s.getOperationName())).count();
assertEquals(2, childCount, "both child spans should be captured");

trace.stream()
.filter(s -> "child".contentEquals(s.getOperationName()))
.forEach(
s ->
assertEquals(
parentSpanId,
s.getParentId(),
"child span before and after virtual-thread unmount must both attach to parent"));
}

private static DDSpan findByOp(List<DDSpan> spans, String op) {
return spans.stream()
.filter(s -> op.contentEquals(s.getOperationName()))
.findFirst()
.orElse(null);
}
}
31 changes: 31 additions & 0 deletions dd-java-agent/instrumentation/okhttp/okhttp-3.0/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,30 @@ muzzle {
}

apply from: "$rootDir/gradle/java.gradle"
// Use slf4j-simple for tests; logback's synchronized appenders can pin virtual-thread carriers
// when many vthreads log concurrently, which deadlocks this module's vthread21Test suite.
apply from: "$rootDir/gradle/slf4j-simple.gradle"

addTestSuiteForDir('latestDepTest', 'test')

// Separate suite for tests that need the JDK 21+ virtual-thread API. Kept apart from the
// default test suite so that the existing OkHttp 3.0 tests keep their Java 1.8 baseline.
addTestSuite('vthread21Test')

tasks.named("compileVthread21TestJava", JavaCompile) {
configureCompiler(it, 21)
}

tasks.named("vthread21Test", Test) {
javaLauncher = javaToolchains.launcherFor {
languageVersion = JavaLanguageVersion.of(21)
}
Comment on lines +25 to +28
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Add JDK 21 constraints for the virtual-thread suite

This suite is forced to run with a JDK 21 launcher, but it does not declare a task-specific testJvmConstraints { minJavaVersion = JavaVersion.VERSION_21 }. The repo's test-JVM plugin gates tasks from the requested -PtestJvm, not from a later launcher override, so lower-JDK shards such as -PtestJvm=8/11 still consider vthread21Test allowed and then run or resolve this JDK 21-only suite from check. Other per-suite JDK 21 tests in this repo add the constraint on the Test task; doing the same here keeps lower-JDK checks from unexpectedly executing this virtual-thread coverage.

Useful? React with 👍 / 👎.

}

tasks.named("check") {
dependsOn "vthread21Test"
}

dependencies {
compileOnly(group: 'com.squareup.okhttp3', name: 'okhttp', version: '3.0.0')

Expand Down Expand Up @@ -38,4 +59,14 @@ dependencies {

testRuntimeOnly(project(':dd-java-agent:instrumentation:datadog:asm:iast-instrumenter'))
testRuntimeOnly(project(':dd-java-agent:instrumentation:java:java-net:java-net-1.8'))

// vthread21Test inherits testImplementation via addTestSuite's extendsFrom wiring.
vthread21TestImplementation(group: 'com.squareup.okhttp3', name: 'okhttp', version: '3.11.0')
vthread21TestImplementation(group: 'com.squareup.okio', name: 'okio', version: '1.14.0')

// Pull in the JDK-21+ concurrent / lang instrumentations so the test installs the same
// TaskRunnerInstrumentation + VirtualThreadInstrumentation chain that profiling-backend
// exercises in production.
vthread21TestRuntimeOnly project(':dd-java-agent:instrumentation:java:java-concurrent:java-concurrent-21.0')
vthread21TestRuntimeOnly project(':dd-java-agent:instrumentation:java:java-lang:java-lang-21.0')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package datadog.trace.instrumentation.okhttp3;

import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.Map;
import net.bytebuddy.asm.Advice;

/**
* Captures the active scope at {@code AsyncCall.<init>} (i.e. the moment {@code
* Call.enqueue(callback)} was invoked on the user's thread) and stores it in the {@code
* ContextStore<Runnable, State>} shared with the rest of the concurrent instrumentation. {@code
* RunnableInstrumentation} then re-activates that scope on {@code AsyncCall.run()} entry, which
* overrides whatever scope {@code TaskRunner.run()} (or {@code beforeExecute}) put in place from
* the dispatcher's worker thread.
*
* <p>Without this, {@code TaskRunnerInstrumentation} captures whatever scope happens to be active
* on the worker thread when {@code Dispatcher.promoteAndExecute()} dequeues and submits the call —
* and when promotion runs from inside {@code Dispatcher.finished()} (i.e. recursively from a
* <em>different</em> AsyncCall's run()), that scope belongs to the finishing call, not to the
* caller who actually enqueued this AsyncCall. Result: under concurrent OkHttp load, {@code
* okhttp.request} spans cross-contaminate between traces.
*
* <p>The class moved between OkHttp 3.x and 4.x:
*
* <ul>
* <li>OkHttp 3.x &mdash; {@code okhttp3.RealCall$AsyncCall}
* <li>OkHttp 4.x &mdash; {@code okhttp3.internal.connection.RealCall$AsyncCall}
* </ul>
*
* Both are inner classes of {@code RealCall} and both transitively implement {@link Runnable}.
*/
@AutoService(InstrumenterModule.class)
public final class AsyncCallInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice {

public AsyncCallInstrumentation() {
// Re-use the existing "okhttp" / "okhttp-3" instrumentation names so we don't introduce a
// separately-toggleable feature flag (DD_TRACE_OKHTTP_ASYNC_CALL_ENABLED). The capture here
// is conceptually part of the OkHttp instrumentation — if you disable OkHttp tracing, you
// also disable this capture, which is the right behavior.
super("okhttp", "okhttp-3");
}

@Override
public String[] knownMatchingTypes() {
return new String[] {
"okhttp3.RealCall$AsyncCall", // OkHttp 3.x
"okhttp3.internal.connection.RealCall$AsyncCall", // OkHttp 4.x
};
}

@Override
public Map<String, String> contextStore() {
// Same Runnable -> State store that RunnableInstrumentation reads from.
return singletonMap("java.lang.Runnable", State.class.getName());
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(isConstructor(), getClass().getName() + "$Construct");
}

public static final class Construct {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void captureScope(@Advice.This Runnable asyncCall) {
// AdviceUtils.capture is a no-op when async propagation is disabled or there's no active
// span — same behavior as the rest of the concurrent instrumentation.
capture(InstrumentationContext.get(Runnable.class, State.class), asyncCall);
}
}
}
Loading
Loading