From 8f5eb8a9e30153f9a23079e75c398f54f510feb8 Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Wed, 27 May 2026 14:33:26 -0400 Subject: [PATCH] fix(okhttp): capture scope at AsyncCall. to keep traces intact under virtual-thread Dispatcher load OkHttp's Dispatcher recursively promotes queued AsyncCalls from inside finished(), running on a worker thread that has a *different* call's parent scope active. The existing TaskRunnerInstrumentation captures that wrong scope when the promoted call is executed, so under a virtual-thread-per-task Dispatcher (or any executor where the same recursion exposes the issue) okhttp.request spans cross-contaminate between concurrent caller traces. Single-shot requests don't trigger this because nothing is queued. Fix: add AsyncCallInstrumentation that captures the active scope at AsyncCall. (i.e. the user thread that ran enqueue(), where the right parent is active) and stores it in the shared ContextStore. RunnableInstrumentation then activates that state on AsyncCall.run() entry, overriding whatever scope TaskRunner inherited from the dispatcher-recursion path. Covers both class locations (3.x and 4.x). Muzzle passes against 24 OkHttp versions in [3.0.0, 5.3.2]. All 168 pre-existing forked OkHttp tests still pass. Tests added: * okhttp-3.0/src/vthread21Test/.../OkHttpVirtualThreadDispatcherTest: concurrent-burst test (16 parents x 4 requests through a virtual- thread Dispatcher with capacity 4) plus two single-shot baselines. The concurrent-burst test fails on master and v1.61.1 without the fix and passes with it. * java-concurrent-21.0/src/test/java/ThreadPerTaskExecutorVirtualThreadTest: pure-executor regression coverage for Thread.ofVirtual().factory(), the .name(prefix, start) builder variant, recursive submission, and propagation across virtual-thread unmount. Co-Authored-By: Claude Opus 4.7 (1M context) --- ...hreadPerTaskExecutorVirtualThreadTest.java | 190 +++++++++++ .../okhttp/okhttp-3.0/build.gradle | 31 ++ .../okhttp3/AsyncCallInstrumentation.java | 78 +++++ .../OkHttpVirtualThreadDispatcherTest.java | 321 ++++++++++++++++++ 4 files changed, 620 insertions(+) create mode 100644 dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-21.0/src/test/java/ThreadPerTaskExecutorVirtualThreadTest.java create mode 100644 dd-java-agent/instrumentation/okhttp/okhttp-3.0/src/main/java/datadog/trace/instrumentation/okhttp3/AsyncCallInstrumentation.java create mode 100644 dd-java-agent/instrumentation/okhttp/okhttp-3.0/src/vthread21Test/java/OkHttpVirtualThreadDispatcherTest.java diff --git a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-21.0/src/test/java/ThreadPerTaskExecutorVirtualThreadTest.java b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-21.0/src/test/java/ThreadPerTaskExecutorVirtualThreadTest.java new file mode 100644 index 00000000000..53f7a348a44 --- /dev/null +++ b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-21.0/src/test/java/ThreadPerTaskExecutorVirtualThreadTest.java @@ -0,0 +1,190 @@ +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import datadog.trace.agent.test.AbstractInstrumentationTest; +import datadog.trace.api.Trace; +import datadog.trace.core.DDSpan; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.junit.jupiter.api.Test; + +/** + * Reproductions for the executor swap profiling-backend PR#8520 made for OkHttp's Dispatcher + * (cached thread pool → {@code Executors.newThreadPerTaskExecutor(Thread.ofVirtual()...)}). + * + *

Each test asserts that an {@code @Trace}-annotated child method invoked from inside a task + * submitted to the virtual-thread-per-task executor attaches to the parent span that was active at + * submission time. If propagation breaks, the child either lands in a different trace or floats + * free as a root span and the {@code assertEquals(parentSpanId, childSpan.getParentId())} check + * fails. + * + *

This mirrors what would happen with OkHttp: the {@code okhttp} client span is created by + * {@code TracingInterceptor.intercept()} on the executor thread using {@code activeSpan()}, so "is + * the parent scope active on the worker?" is the entire question. + */ +class ThreadPerTaskExecutorVirtualThreadTest extends AbstractInstrumentationTest { + + @Trace(operationName = "parent") + void underParentTrace(int expectedChildren, Runnable body) { + body.run(); + blockUntilChildSpansFinished(expectedChildren); + } + + @Trace(operationName = "child") + static void child() {} + + @Test + void virtualThreadFactory_propagatesActiveScope() throws Exception { + ExecutorService executor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual().factory()); + try { + underParentTrace(1, () -> executor.execute(ThreadPerTaskExecutorVirtualThreadTest::child)); + } finally { + executor.shutdown(); + } + + writer.waitForTraces(1); + List trace = writer.get(0); + assertEquals(2, trace.size(), "expected parent + child span"); + DDSpan parentSpan = findByOp(trace, "parent"); + DDSpan childSpan = findByOp(trace, "child"); + assertNotNull(parentSpan, "parent span should exist"); + assertNotNull(childSpan, "child span should exist"); + assertEquals( + parentSpan.getSpanId(), + childSpan.getParentId(), + "child span should be parented under the active span at executor.execute()"); + } + + @Test + void namedVirtualThreadFactory_propagatesActiveScope() throws Exception { + // Exact builder shape from profiling-backend PR#8520: + // Thread.ofVirtual().name("okhttp-" + track + "-", 0).factory() + ExecutorService executor = + Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("okhttp-test-", 0).factory()); + try { + underParentTrace(1, () -> executor.execute(ThreadPerTaskExecutorVirtualThreadTest::child)); + } finally { + executor.shutdown(); + } + + writer.waitForTraces(1); + List trace = writer.get(0); + DDSpan parentSpan = findByOp(trace, "parent"); + DDSpan childSpan = findByOp(trace, "child"); + assertNotNull(parentSpan, "parent span should exist"); + assertNotNull(childSpan, "child span should exist"); + assertEquals( + parentSpan.getSpanId(), + childSpan.getParentId(), + "the .name(prefix, start) builder should not break propagation"); + } + + /** + * Mirrors OkHttp's dispatcher recursion: a task running on the executor submits more work back to + * the same executor. In OkHttp this is {@code Dispatcher.finished() → promoteAndExecute() + * → executorService.execute(nextAsyncCall)}, called from inside an {@code AsyncCall.run()} + * on a dispatcher thread. + * + *

Both child spans should land in the parent's trace. If the worker thread loses the parent + * scope between the outer activation and the inner submission, the second child either becomes a + * new root span or gets attached to the wrong sibling. + */ + @Test + void recursiveSubmissionFromWorkerThread_keepsTraceConnected() throws Exception { + ExecutorService executor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual().factory()); + try { + underParentTrace( + 2, + () -> + executor.execute( + () -> { + child(); + executor.execute(ThreadPerTaskExecutorVirtualThreadTest::child); + })); + } finally { + executor.shutdown(); + } + + writer.waitForTraces(1); + List trace = writer.get(0); + DDSpan parentSpan = findByOp(trace, "parent"); + assertNotNull(parentSpan, "parent span should exist"); + long parentTraceId = parentSpan.getTraceId().toLong(); + long parentSpanId = parentSpan.getSpanId(); + + long childCount = + trace.stream().filter(s -> "child".contentEquals(s.getOperationName())).count(); + assertEquals(2, childCount, "both child spans should land in the same trace as the parent"); + + trace.stream() + .filter(s -> "child".contentEquals(s.getOperationName())) + .forEach( + s -> { + assertEquals( + parentTraceId, + s.getTraceId().toLong(), + "child span must share the parent's trace"); + assertEquals( + parentSpanId, + s.getParentId(), + "child span must attach to the parent, not float free"); + }); + } + + /** + * Forces the virtual thread to unmount and remount mid-task by sleeping between two child spans. + * This is the case OkHttp actually hits in practice — every blocking socket read unmounts + * the carrier, and the scope stack has to be reinstated by {@code VirtualThreadInstrumentation} + * on each remount. The fix log for this area is long (#10931, #11009, #11111), so it is worth + * pinning down with a regression test. + */ + @Test + void virtualThreadFactory_propagatesAcrossUnmount() throws Exception { + ExecutorService executor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual().factory()); + try { + underParentTrace( + 2, + () -> + executor.execute( + () -> { + child(); + try { + // Sleep is a JDK 21+ carrier-unmounting parking point for virtual threads. + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + child(); + })); + } finally { + executor.shutdown(); + } + + writer.waitForTraces(1); + List trace = writer.get(0); + DDSpan parentSpan = findByOp(trace, "parent"); + assertNotNull(parentSpan, "parent span should exist"); + long parentSpanId = parentSpan.getSpanId(); + + long childCount = + trace.stream().filter(s -> "child".contentEquals(s.getOperationName())).count(); + assertEquals(2, childCount, "both child spans should be captured"); + + trace.stream() + .filter(s -> "child".contentEquals(s.getOperationName())) + .forEach( + s -> + assertEquals( + parentSpanId, + s.getParentId(), + "child span before and after virtual-thread unmount must both attach to parent")); + } + + private static DDSpan findByOp(List spans, String op) { + return spans.stream() + .filter(s -> op.contentEquals(s.getOperationName())) + .findFirst() + .orElse(null); + } +} diff --git a/dd-java-agent/instrumentation/okhttp/okhttp-3.0/build.gradle b/dd-java-agent/instrumentation/okhttp/okhttp-3.0/build.gradle index 5ce7c997f99..e2ce1db8684 100644 --- a/dd-java-agent/instrumentation/okhttp/okhttp-3.0/build.gradle +++ b/dd-java-agent/instrumentation/okhttp/okhttp-3.0/build.gradle @@ -8,9 +8,30 @@ muzzle { } apply from: "$rootDir/gradle/java.gradle" +// Use slf4j-simple for tests; logback's synchronized appenders can pin virtual-thread carriers +// when many vthreads log concurrently, which deadlocks this module's vthread21Test suite. +apply from: "$rootDir/gradle/slf4j-simple.gradle" addTestSuiteForDir('latestDepTest', 'test') +// Separate suite for tests that need the JDK 21+ virtual-thread API. Kept apart from the +// default test suite so that the existing OkHttp 3.0 tests keep their Java 1.8 baseline. +addTestSuite('vthread21Test') + +tasks.named("compileVthread21TestJava", JavaCompile) { + configureCompiler(it, 21) +} + +tasks.named("vthread21Test", Test) { + javaLauncher = javaToolchains.launcherFor { + languageVersion = JavaLanguageVersion.of(21) + } +} + +tasks.named("check") { + dependsOn "vthread21Test" +} + dependencies { compileOnly(group: 'com.squareup.okhttp3', name: 'okhttp', version: '3.0.0') @@ -38,4 +59,14 @@ dependencies { testRuntimeOnly(project(':dd-java-agent:instrumentation:datadog:asm:iast-instrumenter')) testRuntimeOnly(project(':dd-java-agent:instrumentation:java:java-net:java-net-1.8')) + + // vthread21Test inherits testImplementation via addTestSuite's extendsFrom wiring. + vthread21TestImplementation(group: 'com.squareup.okhttp3', name: 'okhttp', version: '3.11.0') + vthread21TestImplementation(group: 'com.squareup.okio', name: 'okio', version: '1.14.0') + + // Pull in the JDK-21+ concurrent / lang instrumentations so the test installs the same + // TaskRunnerInstrumentation + VirtualThreadInstrumentation chain that profiling-backend + // exercises in production. + vthread21TestRuntimeOnly project(':dd-java-agent:instrumentation:java:java-concurrent:java-concurrent-21.0') + vthread21TestRuntimeOnly project(':dd-java-agent:instrumentation:java:java-lang:java-lang-21.0') } diff --git a/dd-java-agent/instrumentation/okhttp/okhttp-3.0/src/main/java/datadog/trace/instrumentation/okhttp3/AsyncCallInstrumentation.java b/dd-java-agent/instrumentation/okhttp/okhttp-3.0/src/main/java/datadog/trace/instrumentation/okhttp3/AsyncCallInstrumentation.java new file mode 100644 index 00000000000..0d9d9295b99 --- /dev/null +++ b/dd-java-agent/instrumentation/okhttp/okhttp-3.0/src/main/java/datadog/trace/instrumentation/okhttp3/AsyncCallInstrumentation.java @@ -0,0 +1,78 @@ +package datadog.trace.instrumentation.okhttp3; + +import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture; +import static java.util.Collections.singletonMap; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; +import java.util.Map; +import net.bytebuddy.asm.Advice; + +/** + * Captures the active scope at {@code AsyncCall.} (i.e. the moment {@code + * Call.enqueue(callback)} was invoked on the user's thread) and stores it in the {@code + * ContextStore} shared with the rest of the concurrent instrumentation. {@code + * RunnableInstrumentation} then re-activates that scope on {@code AsyncCall.run()} entry, which + * overrides whatever scope {@code TaskRunner.run()} (or {@code beforeExecute}) put in place from + * the dispatcher's worker thread. + * + *

Without this, {@code TaskRunnerInstrumentation} captures whatever scope happens to be active + * on the worker thread when {@code Dispatcher.promoteAndExecute()} dequeues and submits the call — + * and when promotion runs from inside {@code Dispatcher.finished()} (i.e. recursively from a + * different AsyncCall's run()), that scope belongs to the finishing call, not to the + * caller who actually enqueued this AsyncCall. Result: under concurrent OkHttp load, {@code + * okhttp.request} spans cross-contaminate between traces. + * + *

The class moved between OkHttp 3.x and 4.x: + * + *

    + *
  • OkHttp 3.x — {@code okhttp3.RealCall$AsyncCall} + *
  • OkHttp 4.x — {@code okhttp3.internal.connection.RealCall$AsyncCall} + *
+ * + * Both are inner classes of {@code RealCall} and both transitively implement {@link Runnable}. + */ +@AutoService(InstrumenterModule.class) +public final class AsyncCallInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice { + + public AsyncCallInstrumentation() { + // Re-use the existing "okhttp" / "okhttp-3" instrumentation names so we don't introduce a + // separately-toggleable feature flag (DD_TRACE_OKHTTP_ASYNC_CALL_ENABLED). The capture here + // is conceptually part of the OkHttp instrumentation — if you disable OkHttp tracing, you + // also disable this capture, which is the right behavior. + super("okhttp", "okhttp-3"); + } + + @Override + public String[] knownMatchingTypes() { + return new String[] { + "okhttp3.RealCall$AsyncCall", // OkHttp 3.x + "okhttp3.internal.connection.RealCall$AsyncCall", // OkHttp 4.x + }; + } + + @Override + public Map contextStore() { + // Same Runnable -> State store that RunnableInstrumentation reads from. + return singletonMap("java.lang.Runnable", State.class.getName()); + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice(isConstructor(), getClass().getName() + "$Construct"); + } + + public static final class Construct { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void captureScope(@Advice.This Runnable asyncCall) { + // AdviceUtils.capture is a no-op when async propagation is disabled or there's no active + // span — same behavior as the rest of the concurrent instrumentation. + capture(InstrumentationContext.get(Runnable.class, State.class), asyncCall); + } + } +} diff --git a/dd-java-agent/instrumentation/okhttp/okhttp-3.0/src/vthread21Test/java/OkHttpVirtualThreadDispatcherTest.java b/dd-java-agent/instrumentation/okhttp/okhttp-3.0/src/vthread21Test/java/OkHttpVirtualThreadDispatcherTest.java new file mode 100644 index 00000000000..2261251f24c --- /dev/null +++ b/dd-java-agent/instrumentation/okhttp/okhttp-3.0/src/vthread21Test/java/OkHttpVirtualThreadDispatcherTest.java @@ -0,0 +1,321 @@ +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.sun.net.httpserver.HttpServer; +import datadog.trace.agent.test.AbstractInstrumentationTest; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import datadog.trace.core.DDSpan; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import okhttp3.Call; +import okhttp3.Callback; +import okhttp3.Dispatcher; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +/** + * End-to-end reproduction of profiling-backend PR#8520: swap OkHttp's Dispatcher executor from + * {@code Executors.newCachedThreadPool(...)} to {@code + * Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name(prefix, start).factory())}. + * + *

The test runs both shapes against the same {@link HttpServer} mock. Inside a manually + * activated "parent" span it does {@code client.newCall(request).enqueue(callback)} and waits on a + * latch for the callback. The agent's OkHttp instrumentation injects {@code TracingInterceptor}, + * which creates the {@code okhttp.request} client span using whatever scope is active on the + * dispatcher worker. The assertions verify the client span lands under the parent — i.e., + * the dispatcher's worker thread saw the propagated scope. + * + *

If propagation fails for the virtual-thread shape (the failure profiling-backend is reporting) + * the {@code okhttp.request} span will either become a root span in its own trace or be parented + * under nothing, and {@code assertEquals(parentSpan.getSpanId(), okhttpSpan.getParentId())} fails. + */ +class OkHttpVirtualThreadDispatcherTest extends AbstractInstrumentationTest { + + private static HttpServer mockServer; + private static String baseUrl; + + @BeforeAll + static void startServer() throws IOException { + mockServer = HttpServer.create(new InetSocketAddress("localhost", 0), 64); + mockServer.createContext( + "/ok", + exchange -> { + byte[] body = "ok".getBytes(); + exchange.sendResponseHeaders(200, body.length); + exchange.getResponseBody().write(body); + exchange.close(); + }); + // Default executor is single-threaded — give the server real concurrency so the test isn't + // bottlenecked on the mock backend itself. + mockServer.setExecutor(Executors.newCachedThreadPool()); + mockServer.start(); + baseUrl = + "http://" + + mockServer.getAddress().getHostString() + + ":" + + mockServer.getAddress().getPort(); + } + + @AfterAll + static void stopServer() { + if (mockServer != null) { + mockServer.stop(0); + mockServer = null; + } + } + + /** Activate a manual parent span, run the OkHttp call, wait for the okhttp.request child. */ + private void runUnderParent(OkHttpClient client, CountDownLatch done) { + AgentSpan parentSpan = AgentTracer.startSpan("test", "parent"); + try (AgentScope ignored = AgentTracer.activateSpan(parentSpan)) { + Request request = new Request.Builder().url(baseUrl + "/ok").build(); + client + .newCall(request) + .enqueue( + new Callback() { + @Override + public void onResponse(Call call, Response response) throws IOException { + response.body().close(); + done.countDown(); + } + + @Override + public void onFailure(Call call, IOException e) { + done.countDown(); + } + }); + if (!done.await(10, TimeUnit.SECONDS)) { + throw new AssertionError("timed out waiting for OkHttp callback"); + } + // Wait for the okhttp.request child to finish before closing the parent scope so the trace + // collector sees a complete trace. + blockUntilChildSpansFinished(1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } finally { + parentSpan.finish(); + } + } + + @Test + void cachedThreadPoolDispatcher_parentsOkHttpSpanUnderParent() throws Exception { + ExecutorService dispatcherExecutor = Executors.newCachedThreadPool(); + OkHttpClient client = buildClient(dispatcherExecutor); + try { + runUnderParent(client, new CountDownLatch(1)); + } finally { + dispatcherExecutor.shutdown(); + } + + assertOkHttpSpanParentedUnderParent(); + } + + @Test + void virtualThreadPerTaskDispatcher_parentsOkHttpSpanUnderParent() throws Exception { + // Exact shape from profiling-backend PR#8520. + ExecutorService dispatcherExecutor = + Executors.newThreadPerTaskExecutor( + Thread.ofVirtual().name("okhttp-test-", 0).factory()); + OkHttpClient client = buildClient(dispatcherExecutor); + try { + runUnderParent(client, new CountDownLatch(1)); + } finally { + dispatcherExecutor.shutdown(); + } + + assertOkHttpSpanParentedUnderParent(); + } + + /** + * Concurrent stress test: spin up N independent parent traces, each enqueueing M OkHttp requests + * through the same virtual-thread dispatcher. Dispatcher capacity is intentionally set below N*M + * so that some calls get queued and then promoted from {@code Dispatcher.finished()} running on a + * dispatcher worker thread (a different parent's worker). + * + *

If the agent captures the worker's currently-active scope when the promoted call is + * submitted — instead of the scope active where the original {@code enqueue()} happened + * — the {@code okhttp.request} span will land in the wrong parent's trace. This test fails + * loudly on that cross-contamination. + */ + @Test + void concurrentVirtualThreadPerTaskDispatcher_keepsEachTraceSeparate() throws Exception { + int parentCount = 16; + int requestsPerParent = 4; + int totalRequests = parentCount * requestsPerParent; + + ExecutorService dispatcherExecutor = + Executors.newThreadPerTaskExecutor( + Thread.ofVirtual().name("okhttp-burst-", 0).factory()); + OkHttpClient client = buildClient(dispatcherExecutor); + // Force queue contention: capacity is much smaller than the total in-flight requests, so + // many calls will be promoted from finished() rather than direct from enqueue(). + Dispatcher dispatcher = client.dispatcher(); + dispatcher.setMaxRequests(4); + dispatcher.setMaxRequestsPerHost(4); + + ExecutorService parentRunner = Executors.newFixedThreadPool(parentCount); + CountDownLatch allParentsDone = new CountDownLatch(parentCount); + AtomicReference failure = new AtomicReference<>(); + + try { + for (int i = 0; i < parentCount; i++) { + parentRunner.submit( + () -> { + try { + runParentBurst(client, requestsPerParent); + } catch (Throwable t) { + failure.compareAndSet(null, t); + } finally { + allParentsDone.countDown(); + } + }); + } + assertTrue( + allParentsDone.await(60, TimeUnit.SECONDS), "parent threads timed out"); + } finally { + parentRunner.shutdown(); + dispatcherExecutor.shutdown(); + } + + if (failure.get() != null) { + throw new AssertionError("a parent-runner thread threw", failure.get()); + } + + // Each parent produces one trace containing 1 parent span + M okhttp.request spans. + writer.waitForTraces(parentCount); + + // Map trace-id -> spans-in-that-trace and assert structure. + Map> tracesByRoot = new HashMap<>(); + for (List trace : writer) { + DDSpan parentSpan = findByOp(trace, "parent"); + assertNotNull(parentSpan, "every collected trace should have a parent span"); + tracesByRoot.put(parentSpan.getSpanId(), trace); + } + assertEquals( + parentCount, + tracesByRoot.size(), + "expected one distinct trace per parent burst (no cross-trace contamination)"); + + int totalOkhttpSpans = 0; + List contamination = new ArrayList<>(); + for (Map.Entry> entry : tracesByRoot.entrySet()) { + long parentSpanId = entry.getKey(); + List trace = entry.getValue(); + int okhttpCountInThisTrace = 0; + for (DDSpan span : trace) { + if ("okhttp.request".contentEquals(span.getOperationName())) { + okhttpCountInThisTrace++; + if (span.getParentId() != parentSpanId) { + contamination.add( + "okhttp.request span " + + span.getSpanId() + + " has parentId=" + + span.getParentId() + + " but it lives in the trace rooted at " + + parentSpanId); + } + } + } + totalOkhttpSpans += okhttpCountInThisTrace; + assertEquals( + requestsPerParent, + okhttpCountInThisTrace, + "trace rooted at parent " + parentSpanId + " has wrong child count"); + } + + assertEquals( + totalRequests, totalOkhttpSpans, "total okhttp.request spans across all traces"); + assertTrue( + contamination.isEmpty(), + "found cross-parented okhttp.request spans:\n - " + String.join("\n - ", contamination)); + } + + /** + * One parent burst: M OkHttp requests under a single freshly-started parent span. Deliberately + * does not block on per-parent child-span accounting — the whole point of the test + * is to detect when children leak to a sibling's trace, and per-parent blocking would just turn + * that into a timeout instead of producing a useful assertion message. Wait for the HTTP + * callbacks (so the request actually ran), then close the parent. + */ + private void runParentBurst(OkHttpClient client, int requestsPerParent) { + AgentSpan parentSpan = AgentTracer.startSpan("test", "parent"); + try (AgentScope ignored = AgentTracer.activateSpan(parentSpan)) { + CountDownLatch done = new CountDownLatch(requestsPerParent); + for (int i = 0; i < requestsPerParent; i++) { + Request request = new Request.Builder().url(baseUrl + "/ok").build(); + client + .newCall(request) + .enqueue( + new Callback() { + @Override + public void onResponse(Call call, Response response) throws IOException { + response.body().close(); + done.countDown(); + } + + @Override + public void onFailure(Call call, IOException e) { + done.countDown(); + } + }); + } + if (!done.await(30, TimeUnit.SECONDS)) { + throw new AssertionError("timed out waiting for OkHttp callbacks"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } finally { + parentSpan.finish(); + } + } + + private static OkHttpClient buildClient(ExecutorService dispatcherExecutor) { + Dispatcher dispatcher = new Dispatcher(dispatcherExecutor); + return new OkHttpClient.Builder().dispatcher(dispatcher).build(); + } + + private void assertOkHttpSpanParentedUnderParent() throws Exception { + writer.waitForTraces(1); + List trace = writer.get(0); + DDSpan parentSpan = findByOp(trace, "parent"); + DDSpan okhttpSpan = findByOp(trace, "okhttp.request"); + assertNotNull(parentSpan, "parent span should exist"); + assertNotNull( + okhttpSpan, + "okhttp.request client span should exist; if missing, propagation may have produced an" + + " orphan trace instead"); + assertEquals( + parentSpan.getTraceId().toLong(), + okhttpSpan.getTraceId().toLong(), + "okhttp.request span must share the parent's trace id"); + assertEquals( + parentSpan.getSpanId(), + okhttpSpan.getParentId(), + "okhttp.request span must be parented under the parent span active at enqueue() time"); + } + + private static DDSpan findByOp(List spans, String op) { + return spans.stream() + .filter(s -> op.contentEquals(s.getOperationName())) + .findFirst() + .orElse(null); + } +}