diff --git a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-21.0/src/test/java/ThreadPerTaskExecutorVirtualThreadTest.java b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-21.0/src/test/java/ThreadPerTaskExecutorVirtualThreadTest.java new file mode 100644 index 00000000000..53f7a348a44 --- /dev/null +++ b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-21.0/src/test/java/ThreadPerTaskExecutorVirtualThreadTest.java @@ -0,0 +1,190 @@ +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import datadog.trace.agent.test.AbstractInstrumentationTest; +import datadog.trace.api.Trace; +import datadog.trace.core.DDSpan; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.junit.jupiter.api.Test; + +/** + * Reproductions for the executor swap profiling-backend PR#8520 made for OkHttp's Dispatcher + * (cached thread pool → {@code Executors.newThreadPerTaskExecutor(Thread.ofVirtual()...)}). + * + *

Each test asserts that an {@code @Trace}-annotated child method invoked from inside a task + * submitted to the virtual-thread-per-task executor attaches to the parent span that was active at + * submission time. If propagation breaks, the child either lands in a different trace or floats + * free as a root span and the {@code assertEquals(parentSpanId, childSpan.getParentId())} check + * fails. + * + *

This mirrors what would happen with OkHttp: the {@code okhttp} client span is created by + * {@code TracingInterceptor.intercept()} on the executor thread using {@code activeSpan()}, so "is + * the parent scope active on the worker?" is the entire question. + */ +class ThreadPerTaskExecutorVirtualThreadTest extends AbstractInstrumentationTest { + + @Trace(operationName = "parent") + void underParentTrace(int expectedChildren, Runnable body) { + body.run(); + blockUntilChildSpansFinished(expectedChildren); + } + + @Trace(operationName = "child") + static void child() {} + + @Test + void virtualThreadFactory_propagatesActiveScope() throws Exception { + ExecutorService executor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual().factory()); + try { + underParentTrace(1, () -> executor.execute(ThreadPerTaskExecutorVirtualThreadTest::child)); + } finally { + executor.shutdown(); + } + + writer.waitForTraces(1); + List trace = writer.get(0); + assertEquals(2, trace.size(), "expected parent + child span"); + DDSpan parentSpan = findByOp(trace, "parent"); + DDSpan childSpan = findByOp(trace, "child"); + assertNotNull(parentSpan, "parent span should exist"); + assertNotNull(childSpan, "child span should exist"); + assertEquals( + parentSpan.getSpanId(), + childSpan.getParentId(), + "child span should be parented under the active span at executor.execute()"); + } + + @Test + void namedVirtualThreadFactory_propagatesActiveScope() throws Exception { + // Exact builder shape from profiling-backend PR#8520: + // Thread.ofVirtual().name("okhttp-" + track + "-", 0).factory() + ExecutorService executor = + Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("okhttp-test-", 0).factory()); + try { + underParentTrace(1, () -> executor.execute(ThreadPerTaskExecutorVirtualThreadTest::child)); + } finally { + executor.shutdown(); + } + + writer.waitForTraces(1); + List trace = writer.get(0); + DDSpan parentSpan = findByOp(trace, "parent"); + DDSpan childSpan = findByOp(trace, "child"); + assertNotNull(parentSpan, "parent span should exist"); + assertNotNull(childSpan, "child span should exist"); + assertEquals( + parentSpan.getSpanId(), + childSpan.getParentId(), + "the .name(prefix, start) builder should not break propagation"); + } + + /** + * Mirrors OkHttp's dispatcher recursion: a task running on the executor submits more work back to + * the same executor. In OkHttp this is {@code Dispatcher.finished() → promoteAndExecute() + * → executorService.execute(nextAsyncCall)}, called from inside an {@code AsyncCall.run()} + * on a dispatcher thread. + * + *

Both child spans should land in the parent's trace. If the worker thread loses the parent + * scope between the outer activation and the inner submission, the second child either becomes a + * new root span or gets attached to the wrong sibling. + */ + @Test + void recursiveSubmissionFromWorkerThread_keepsTraceConnected() throws Exception { + ExecutorService executor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual().factory()); + try { + underParentTrace( + 2, + () -> + executor.execute( + () -> { + child(); + executor.execute(ThreadPerTaskExecutorVirtualThreadTest::child); + })); + } finally { + executor.shutdown(); + } + + writer.waitForTraces(1); + List trace = writer.get(0); + DDSpan parentSpan = findByOp(trace, "parent"); + assertNotNull(parentSpan, "parent span should exist"); + long parentTraceId = parentSpan.getTraceId().toLong(); + long parentSpanId = parentSpan.getSpanId(); + + long childCount = + trace.stream().filter(s -> "child".contentEquals(s.getOperationName())).count(); + assertEquals(2, childCount, "both child spans should land in the same trace as the parent"); + + trace.stream() + .filter(s -> "child".contentEquals(s.getOperationName())) + .forEach( + s -> { + assertEquals( + parentTraceId, + s.getTraceId().toLong(), + "child span must share the parent's trace"); + assertEquals( + parentSpanId, + s.getParentId(), + "child span must attach to the parent, not float free"); + }); + } + + /** + * Forces the virtual thread to unmount and remount mid-task by sleeping between two child spans. + * This is the case OkHttp actually hits in practice — every blocking socket read unmounts + * the carrier, and the scope stack has to be reinstated by {@code VirtualThreadInstrumentation} + * on each remount. The fix log for this area is long (#10931, #11009, #11111), so it is worth + * pinning down with a regression test. + */ + @Test + void virtualThreadFactory_propagatesAcrossUnmount() throws Exception { + ExecutorService executor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual().factory()); + try { + underParentTrace( + 2, + () -> + executor.execute( + () -> { + child(); + try { + // Sleep is a JDK 21+ carrier-unmounting parking point for virtual threads. + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + child(); + })); + } finally { + executor.shutdown(); + } + + writer.waitForTraces(1); + List trace = writer.get(0); + DDSpan parentSpan = findByOp(trace, "parent"); + assertNotNull(parentSpan, "parent span should exist"); + long parentSpanId = parentSpan.getSpanId(); + + long childCount = + trace.stream().filter(s -> "child".contentEquals(s.getOperationName())).count(); + assertEquals(2, childCount, "both child spans should be captured"); + + trace.stream() + .filter(s -> "child".contentEquals(s.getOperationName())) + .forEach( + s -> + assertEquals( + parentSpanId, + s.getParentId(), + "child span before and after virtual-thread unmount must both attach to parent")); + } + + private static DDSpan findByOp(List spans, String op) { + return spans.stream() + .filter(s -> op.contentEquals(s.getOperationName())) + .findFirst() + .orElse(null); + } +} diff --git a/dd-java-agent/instrumentation/okhttp/okhttp-3.0/build.gradle b/dd-java-agent/instrumentation/okhttp/okhttp-3.0/build.gradle index 5ce7c997f99..e2ce1db8684 100644 --- a/dd-java-agent/instrumentation/okhttp/okhttp-3.0/build.gradle +++ b/dd-java-agent/instrumentation/okhttp/okhttp-3.0/build.gradle @@ -8,9 +8,30 @@ muzzle { } apply from: "$rootDir/gradle/java.gradle" +// Use slf4j-simple for tests; logback's synchronized appenders can pin virtual-thread carriers +// when many vthreads log concurrently, which deadlocks this module's vthread21Test suite. +apply from: "$rootDir/gradle/slf4j-simple.gradle" addTestSuiteForDir('latestDepTest', 'test') +// Separate suite for tests that need the JDK 21+ virtual-thread API. Kept apart from the +// default test suite so that the existing OkHttp 3.0 tests keep their Java 1.8 baseline. +addTestSuite('vthread21Test') + +tasks.named("compileVthread21TestJava", JavaCompile) { + configureCompiler(it, 21) +} + +tasks.named("vthread21Test", Test) { + javaLauncher = javaToolchains.launcherFor { + languageVersion = JavaLanguageVersion.of(21) + } +} + +tasks.named("check") { + dependsOn "vthread21Test" +} + dependencies { compileOnly(group: 'com.squareup.okhttp3', name: 'okhttp', version: '3.0.0') @@ -38,4 +59,14 @@ dependencies { testRuntimeOnly(project(':dd-java-agent:instrumentation:datadog:asm:iast-instrumenter')) testRuntimeOnly(project(':dd-java-agent:instrumentation:java:java-net:java-net-1.8')) + + // vthread21Test inherits testImplementation via addTestSuite's extendsFrom wiring. + vthread21TestImplementation(group: 'com.squareup.okhttp3', name: 'okhttp', version: '3.11.0') + vthread21TestImplementation(group: 'com.squareup.okio', name: 'okio', version: '1.14.0') + + // Pull in the JDK-21+ concurrent / lang instrumentations so the test installs the same + // TaskRunnerInstrumentation + VirtualThreadInstrumentation chain that profiling-backend + // exercises in production. + vthread21TestRuntimeOnly project(':dd-java-agent:instrumentation:java:java-concurrent:java-concurrent-21.0') + vthread21TestRuntimeOnly project(':dd-java-agent:instrumentation:java:java-lang:java-lang-21.0') } diff --git a/dd-java-agent/instrumentation/okhttp/okhttp-3.0/src/main/java/datadog/trace/instrumentation/okhttp3/AsyncCallInstrumentation.java b/dd-java-agent/instrumentation/okhttp/okhttp-3.0/src/main/java/datadog/trace/instrumentation/okhttp3/AsyncCallInstrumentation.java new file mode 100644 index 00000000000..0d9d9295b99 --- /dev/null +++ b/dd-java-agent/instrumentation/okhttp/okhttp-3.0/src/main/java/datadog/trace/instrumentation/okhttp3/AsyncCallInstrumentation.java @@ -0,0 +1,78 @@ +package datadog.trace.instrumentation.okhttp3; + +import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture; +import static java.util.Collections.singletonMap; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; +import java.util.Map; +import net.bytebuddy.asm.Advice; + +/** + * Captures the active scope at {@code AsyncCall.} (i.e. the moment {@code + * Call.enqueue(callback)} was invoked on the user's thread) and stores it in the {@code + * ContextStore} shared with the rest of the concurrent instrumentation. {@code + * RunnableInstrumentation} then re-activates that scope on {@code AsyncCall.run()} entry, which + * overrides whatever scope {@code TaskRunner.run()} (or {@code beforeExecute}) put in place from + * the dispatcher's worker thread. + * + *

Without this, {@code TaskRunnerInstrumentation} captures whatever scope happens to be active + * on the worker thread when {@code Dispatcher.promoteAndExecute()} dequeues and submits the call — + * and when promotion runs from inside {@code Dispatcher.finished()} (i.e. recursively from a + * different AsyncCall's run()), that scope belongs to the finishing call, not to the + * caller who actually enqueued this AsyncCall. Result: under concurrent OkHttp load, {@code + * okhttp.request} spans cross-contaminate between traces. + * + *

The class moved between OkHttp 3.x and 4.x: + * + *

+ * + * Both are inner classes of {@code RealCall} and both transitively implement {@link Runnable}. + */ +@AutoService(InstrumenterModule.class) +public final class AsyncCallInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice { + + public AsyncCallInstrumentation() { + // Re-use the existing "okhttp" / "okhttp-3" instrumentation names so we don't introduce a + // separately-toggleable feature flag (DD_TRACE_OKHTTP_ASYNC_CALL_ENABLED). The capture here + // is conceptually part of the OkHttp instrumentation — if you disable OkHttp tracing, you + // also disable this capture, which is the right behavior. + super("okhttp", "okhttp-3"); + } + + @Override + public String[] knownMatchingTypes() { + return new String[] { + "okhttp3.RealCall$AsyncCall", // OkHttp 3.x + "okhttp3.internal.connection.RealCall$AsyncCall", // OkHttp 4.x + }; + } + + @Override + public Map contextStore() { + // Same Runnable -> State store that RunnableInstrumentation reads from. + return singletonMap("java.lang.Runnable", State.class.getName()); + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice(isConstructor(), getClass().getName() + "$Construct"); + } + + public static final class Construct { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void captureScope(@Advice.This Runnable asyncCall) { + // AdviceUtils.capture is a no-op when async propagation is disabled or there's no active + // span — same behavior as the rest of the concurrent instrumentation. + capture(InstrumentationContext.get(Runnable.class, State.class), asyncCall); + } + } +} diff --git a/dd-java-agent/instrumentation/okhttp/okhttp-3.0/src/vthread21Test/java/OkHttpVirtualThreadDispatcherTest.java b/dd-java-agent/instrumentation/okhttp/okhttp-3.0/src/vthread21Test/java/OkHttpVirtualThreadDispatcherTest.java new file mode 100644 index 00000000000..2261251f24c --- /dev/null +++ b/dd-java-agent/instrumentation/okhttp/okhttp-3.0/src/vthread21Test/java/OkHttpVirtualThreadDispatcherTest.java @@ -0,0 +1,321 @@ +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.sun.net.httpserver.HttpServer; +import datadog.trace.agent.test.AbstractInstrumentationTest; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import datadog.trace.core.DDSpan; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import okhttp3.Call; +import okhttp3.Callback; +import okhttp3.Dispatcher; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +/** + * End-to-end reproduction of profiling-backend PR#8520: swap OkHttp's Dispatcher executor from + * {@code Executors.newCachedThreadPool(...)} to {@code + * Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name(prefix, start).factory())}. + * + *

The test runs both shapes against the same {@link HttpServer} mock. Inside a manually + * activated "parent" span it does {@code client.newCall(request).enqueue(callback)} and waits on a + * latch for the callback. The agent's OkHttp instrumentation injects {@code TracingInterceptor}, + * which creates the {@code okhttp.request} client span using whatever scope is active on the + * dispatcher worker. The assertions verify the client span lands under the parent — i.e., + * the dispatcher's worker thread saw the propagated scope. + * + *

If propagation fails for the virtual-thread shape (the failure profiling-backend is reporting) + * the {@code okhttp.request} span will either become a root span in its own trace or be parented + * under nothing, and {@code assertEquals(parentSpan.getSpanId(), okhttpSpan.getParentId())} fails. + */ +class OkHttpVirtualThreadDispatcherTest extends AbstractInstrumentationTest { + + private static HttpServer mockServer; + private static String baseUrl; + + @BeforeAll + static void startServer() throws IOException { + mockServer = HttpServer.create(new InetSocketAddress("localhost", 0), 64); + mockServer.createContext( + "/ok", + exchange -> { + byte[] body = "ok".getBytes(); + exchange.sendResponseHeaders(200, body.length); + exchange.getResponseBody().write(body); + exchange.close(); + }); + // Default executor is single-threaded — give the server real concurrency so the test isn't + // bottlenecked on the mock backend itself. + mockServer.setExecutor(Executors.newCachedThreadPool()); + mockServer.start(); + baseUrl = + "http://" + + mockServer.getAddress().getHostString() + + ":" + + mockServer.getAddress().getPort(); + } + + @AfterAll + static void stopServer() { + if (mockServer != null) { + mockServer.stop(0); + mockServer = null; + } + } + + /** Activate a manual parent span, run the OkHttp call, wait for the okhttp.request child. */ + private void runUnderParent(OkHttpClient client, CountDownLatch done) { + AgentSpan parentSpan = AgentTracer.startSpan("test", "parent"); + try (AgentScope ignored = AgentTracer.activateSpan(parentSpan)) { + Request request = new Request.Builder().url(baseUrl + "/ok").build(); + client + .newCall(request) + .enqueue( + new Callback() { + @Override + public void onResponse(Call call, Response response) throws IOException { + response.body().close(); + done.countDown(); + } + + @Override + public void onFailure(Call call, IOException e) { + done.countDown(); + } + }); + if (!done.await(10, TimeUnit.SECONDS)) { + throw new AssertionError("timed out waiting for OkHttp callback"); + } + // Wait for the okhttp.request child to finish before closing the parent scope so the trace + // collector sees a complete trace. + blockUntilChildSpansFinished(1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } finally { + parentSpan.finish(); + } + } + + @Test + void cachedThreadPoolDispatcher_parentsOkHttpSpanUnderParent() throws Exception { + ExecutorService dispatcherExecutor = Executors.newCachedThreadPool(); + OkHttpClient client = buildClient(dispatcherExecutor); + try { + runUnderParent(client, new CountDownLatch(1)); + } finally { + dispatcherExecutor.shutdown(); + } + + assertOkHttpSpanParentedUnderParent(); + } + + @Test + void virtualThreadPerTaskDispatcher_parentsOkHttpSpanUnderParent() throws Exception { + // Exact shape from profiling-backend PR#8520. + ExecutorService dispatcherExecutor = + Executors.newThreadPerTaskExecutor( + Thread.ofVirtual().name("okhttp-test-", 0).factory()); + OkHttpClient client = buildClient(dispatcherExecutor); + try { + runUnderParent(client, new CountDownLatch(1)); + } finally { + dispatcherExecutor.shutdown(); + } + + assertOkHttpSpanParentedUnderParent(); + } + + /** + * Concurrent stress test: spin up N independent parent traces, each enqueueing M OkHttp requests + * through the same virtual-thread dispatcher. Dispatcher capacity is intentionally set below N*M + * so that some calls get queued and then promoted from {@code Dispatcher.finished()} running on a + * dispatcher worker thread (a different parent's worker). + * + *

If the agent captures the worker's currently-active scope when the promoted call is + * submitted — instead of the scope active where the original {@code enqueue()} happened + * — the {@code okhttp.request} span will land in the wrong parent's trace. This test fails + * loudly on that cross-contamination. + */ + @Test + void concurrentVirtualThreadPerTaskDispatcher_keepsEachTraceSeparate() throws Exception { + int parentCount = 16; + int requestsPerParent = 4; + int totalRequests = parentCount * requestsPerParent; + + ExecutorService dispatcherExecutor = + Executors.newThreadPerTaskExecutor( + Thread.ofVirtual().name("okhttp-burst-", 0).factory()); + OkHttpClient client = buildClient(dispatcherExecutor); + // Force queue contention: capacity is much smaller than the total in-flight requests, so + // many calls will be promoted from finished() rather than direct from enqueue(). + Dispatcher dispatcher = client.dispatcher(); + dispatcher.setMaxRequests(4); + dispatcher.setMaxRequestsPerHost(4); + + ExecutorService parentRunner = Executors.newFixedThreadPool(parentCount); + CountDownLatch allParentsDone = new CountDownLatch(parentCount); + AtomicReference failure = new AtomicReference<>(); + + try { + for (int i = 0; i < parentCount; i++) { + parentRunner.submit( + () -> { + try { + runParentBurst(client, requestsPerParent); + } catch (Throwable t) { + failure.compareAndSet(null, t); + } finally { + allParentsDone.countDown(); + } + }); + } + assertTrue( + allParentsDone.await(60, TimeUnit.SECONDS), "parent threads timed out"); + } finally { + parentRunner.shutdown(); + dispatcherExecutor.shutdown(); + } + + if (failure.get() != null) { + throw new AssertionError("a parent-runner thread threw", failure.get()); + } + + // Each parent produces one trace containing 1 parent span + M okhttp.request spans. + writer.waitForTraces(parentCount); + + // Map trace-id -> spans-in-that-trace and assert structure. + Map> tracesByRoot = new HashMap<>(); + for (List trace : writer) { + DDSpan parentSpan = findByOp(trace, "parent"); + assertNotNull(parentSpan, "every collected trace should have a parent span"); + tracesByRoot.put(parentSpan.getSpanId(), trace); + } + assertEquals( + parentCount, + tracesByRoot.size(), + "expected one distinct trace per parent burst (no cross-trace contamination)"); + + int totalOkhttpSpans = 0; + List contamination = new ArrayList<>(); + for (Map.Entry> entry : tracesByRoot.entrySet()) { + long parentSpanId = entry.getKey(); + List trace = entry.getValue(); + int okhttpCountInThisTrace = 0; + for (DDSpan span : trace) { + if ("okhttp.request".contentEquals(span.getOperationName())) { + okhttpCountInThisTrace++; + if (span.getParentId() != parentSpanId) { + contamination.add( + "okhttp.request span " + + span.getSpanId() + + " has parentId=" + + span.getParentId() + + " but it lives in the trace rooted at " + + parentSpanId); + } + } + } + totalOkhttpSpans += okhttpCountInThisTrace; + assertEquals( + requestsPerParent, + okhttpCountInThisTrace, + "trace rooted at parent " + parentSpanId + " has wrong child count"); + } + + assertEquals( + totalRequests, totalOkhttpSpans, "total okhttp.request spans across all traces"); + assertTrue( + contamination.isEmpty(), + "found cross-parented okhttp.request spans:\n - " + String.join("\n - ", contamination)); + } + + /** + * One parent burst: M OkHttp requests under a single freshly-started parent span. Deliberately + * does not block on per-parent child-span accounting — the whole point of the test + * is to detect when children leak to a sibling's trace, and per-parent blocking would just turn + * that into a timeout instead of producing a useful assertion message. Wait for the HTTP + * callbacks (so the request actually ran), then close the parent. + */ + private void runParentBurst(OkHttpClient client, int requestsPerParent) { + AgentSpan parentSpan = AgentTracer.startSpan("test", "parent"); + try (AgentScope ignored = AgentTracer.activateSpan(parentSpan)) { + CountDownLatch done = new CountDownLatch(requestsPerParent); + for (int i = 0; i < requestsPerParent; i++) { + Request request = new Request.Builder().url(baseUrl + "/ok").build(); + client + .newCall(request) + .enqueue( + new Callback() { + @Override + public void onResponse(Call call, Response response) throws IOException { + response.body().close(); + done.countDown(); + } + + @Override + public void onFailure(Call call, IOException e) { + done.countDown(); + } + }); + } + if (!done.await(30, TimeUnit.SECONDS)) { + throw new AssertionError("timed out waiting for OkHttp callbacks"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } finally { + parentSpan.finish(); + } + } + + private static OkHttpClient buildClient(ExecutorService dispatcherExecutor) { + Dispatcher dispatcher = new Dispatcher(dispatcherExecutor); + return new OkHttpClient.Builder().dispatcher(dispatcher).build(); + } + + private void assertOkHttpSpanParentedUnderParent() throws Exception { + writer.waitForTraces(1); + List trace = writer.get(0); + DDSpan parentSpan = findByOp(trace, "parent"); + DDSpan okhttpSpan = findByOp(trace, "okhttp.request"); + assertNotNull(parentSpan, "parent span should exist"); + assertNotNull( + okhttpSpan, + "okhttp.request client span should exist; if missing, propagation may have produced an" + + " orphan trace instead"); + assertEquals( + parentSpan.getTraceId().toLong(), + okhttpSpan.getTraceId().toLong(), + "okhttp.request span must share the parent's trace id"); + assertEquals( + parentSpan.getSpanId(), + okhttpSpan.getParentId(), + "okhttp.request span must be parented under the parent span active at enqueue() time"); + } + + private static DDSpan findByOp(List spans, String op) { + return spans.stream() + .filter(s -> op.contentEquals(s.getOperationName())) + .findFirst() + .orElse(null); + } +}