diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java index dec3a8eb9e5..7c151dab5c4 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java @@ -33,6 +33,10 @@ public List firstTrace() { return get(0); } + public int getTraceCount() { + return traceCount.get(); + } + @SuppressFBWarnings("NN_NAKED_NOTIFY") @Override public void write(List trace) { diff --git a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java index 6c5efb4a0d1..55db6962976 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java @@ -1391,6 +1391,11 @@ public String getSpanId(AgentSpan span) { return "0"; } + // @VisibleForTesting + TraceInterceptors getInterceptors() { + return interceptors; + } + @Override public boolean addTraceInterceptor(final TraceInterceptor interceptor) { TraceInterceptor conflictingInterceptor = interceptors.add(interceptor); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java b/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java index 6949ddf31de..2c62819e97a 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java @@ -125,7 +125,8 @@ static DDSpan create( * @param timestampMicro if greater than zero, use this time instead of the current time * @param context the context used for the span */ - private DDSpan( + // @VisibleForTesting + DDSpan( @Nonnull String instrumentationName, final long timestampMicro, @Nonnull DDSpanContext context, @@ -680,6 +681,11 @@ public long getDurationNano() { return durationNano; } + // @VisibleForTesting + void setDurationNano(long duration) { + DURATION_NANO_UPDATER.set(this, duration); + } + @Override public String getServiceName() { return context.getServiceName(); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/PendingTrace.java b/dd-trace-core/src/main/java/datadog/trace/core/PendingTrace.java index 776c30870dc..5a0d1e992a4 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/PendingTrace.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/PendingTrace.java @@ -139,7 +139,8 @@ public PendingTrace create(@Nonnull DDTraceId traceId, ConfigSnapshot traceConfi private static final AtomicLongFieldUpdater LAST_REFERENCED = AtomicLongFieldUpdater.newUpdater(PendingTrace.class, "lastReferenced"); - private PendingTrace( + // @VisibleForTesting + PendingTrace( @Nonnull CoreTracer tracer, @Nonnull DDTraceId traceId, @Nonnull PendingTraceBuffer pendingTraceBuffer, @@ -238,6 +239,26 @@ int getPendingReferenceCount() { return pendingReferenceCount; } + // @VisibleForTesting + PendingTraceBuffer getPendingTraceBuffer() { + return pendingTraceBuffer; + } + + // @VisibleForTesting + DDTraceId getTraceId() { + return traceId; + } + + // @VisibleForTesting + boolean isRootSpanWritten() { + return rootSpanWritten; + } + + // @VisibleForTesting + int getIsEnqueued() { + return isEnqueued; + } + boolean empty() { return 0 >= COMPLETED_SPAN_COUNT.get(this) + PENDING_REFERENCE_COUNT.get(this); } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java b/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java index ad89c996d90..9cd7f73eb2e 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java @@ -308,6 +308,16 @@ public DelayingPendingTraceBuffer( LongRunningTracesTracker getRunningTracesTracker() { return runningTracesTracker; } + + // @VisibleForTesting + Thread getWorker() { + return worker; + } + + // @VisibleForTesting + MessagePassingBlockingQueue getQueue() { + return queue; + } } static class DiscardingPendingTraceBuffer extends PendingTraceBuffer { diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy deleted file mode 100644 index 62738a2ff0a..00000000000 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy +++ /dev/null @@ -1,629 +0,0 @@ -package datadog.trace.core - -import datadog.environment.JavaVirtualMachine -import datadog.metrics.api.Monitoring -import datadog.trace.api.Config -import datadog.trace.SamplingPriorityMetadataChecker -import datadog.trace.api.DDSpanId -import datadog.trace.api.DDTraceId -import datadog.trace.api.flare.TracerFlare -import datadog.trace.api.time.SystemTimeSource -import datadog.trace.api.datastreams.NoopPathwayContext -import datadog.trace.context.TraceScope -import datadog.trace.core.monitor.HealthMetrics -import datadog.trace.core.propagation.PropagationTags -import datadog.trace.core.scopemanager.ContinuableScopeManager -import datadog.trace.test.util.DDSpecification -import groovy.json.JsonSlurper -import spock.lang.IgnoreIf -import spock.lang.Subject -import spock.lang.Timeout -import spock.util.concurrent.PollingConditions -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicInteger -import java.util.zip.ZipInputStream -import java.util.zip.ZipOutputStream - -import static datadog.trace.api.sampling.PrioritySampling.UNSET -import static datadog.trace.api.sampling.PrioritySampling.USER_KEEP -import static datadog.trace.core.PendingTraceBuffer.BUFFER_SIZE -import static java.nio.charset.StandardCharsets.UTF_8 - -@IgnoreIf(reason = """ -Oracle JDK 1.8 did not merge the fix in JDK-8058322, leading to the JVM failing to correctly -extract method parameters without args, when the code is compiled on a later JDK (targeting 8). -This can manifest when creating mocks. -""", value = { - JavaVirtualMachine.isOracleJDK8() -}) -@Timeout(5) -class PendingTraceBufferTest extends DDSpecification { - @Subject - def buffer = PendingTraceBuffer.delaying(SystemTimeSource.INSTANCE, Mock(Config), null, null) - def bufferSpy = Spy(buffer) - - def tracer = Mock(CoreTracer) - def traceConfig = Mock(CoreTracer.ConfigSnapshot) - def scopeManager = new ContinuableScopeManager(10, true) - def factory = new PendingTrace.Factory(tracer, bufferSpy, SystemTimeSource.INSTANCE, false, HealthMetrics.NO_OP) - List continuations = [] - - def setup() { - tracer.captureTraceConfig() >> traceConfig - traceConfig.getServiceMapping() >> [:] - } - - def cleanup() { - buffer.close() - buffer.worker.join(1000) - } - - def "test buffer lifecycle"() { - expect: - !buffer.worker.alive - - when: - buffer.start() - - then: - buffer.worker.alive - buffer.worker.daemon - - when: "start called again" - buffer.start() - - then: - thrown IllegalThreadStateException - buffer.worker.alive - buffer.worker.daemon - - when: - buffer.close() - buffer.worker.join(1000) - - then: - !buffer.worker.alive - } - - def "continuation buffers root"() { - setup: - def trace = factory.create(DDTraceId.ONE) - def span = newSpanOf(trace) - - expect: - !trace.rootSpanWritten - - when: - addContinuation(span) - span.finish() // This should enqueue - - then: - continuations.size() == 1 - trace.pendingReferenceCount == 1 - 1 * bufferSpy.longRunningSpansEnabled() - 1 * bufferSpy.enqueue(trace) - _ * tracer.getPartialFlushMinSpans() >> 10 - _ * tracer.getTagInterceptor() - 1 * tracer.getTimeWithNanoTicks(_) - 1 * tracer.onRootSpanPublished(span) - 0 * _ - - when: - continuations[0].cancel() - - then: - trace.pendingReferenceCount == 0 - 1 * tracer.write({ it.size() == 1 }) - 1 * tracer.writeTimer() >> Monitoring.DISABLED.newTimer("") - _ * tracer.getPartialFlushMinSpans() >> 10 - 0 * _ - } - - def "unfinished child buffers root"() { - setup: - def trace = factory.create(DDTraceId.ONE) - def parent = newSpanOf(trace) - def child = newSpanOf(parent) - - expect: - !trace.rootSpanWritten - - when: - parent.finish() // This should enqueue - - then: - trace.size() == 1 - trace.pendingReferenceCount == 1 - 1 * bufferSpy.enqueue(trace) - _ * bufferSpy.longRunningSpansEnabled() - _ * tracer.getPartialFlushMinSpans() >> 10 - _ * tracer.getTagInterceptor() - 1 * tracer.getTimeWithNanoTicks(_) - 1 * tracer.onRootSpanPublished(parent) - 0 * _ - - when: - child.finish() - - then: - trace.size() == 0 - trace.pendingReferenceCount == 0 - _ * bufferSpy.longRunningSpansEnabled() - 1 * tracer.write({ it.size() == 2 }) - 1 * tracer.writeTimer() >> Monitoring.DISABLED.newTimer("") - _ * tracer.getPartialFlushMinSpans() >> 10 - _ * tracer.getTagInterceptor() - 1 * tracer.getTimeWithNanoTicks(_) - 0 * _ - } - - def "priority sampling is always sent"() { - setup: - def parent = addContinuation(newSpanOf(factory.create(DDTraceId.ONE), USER_KEEP, 0)) - def metadataChecker = new SamplingPriorityMetadataChecker() - - when: "Fill the buffer - Only children - Priority taken from root" - - for (int i = 0; i < 11; i++) { - newSpanOf(parent).finish() - } - - then: - _ * tracer.getPartialFlushMinSpans() >> 10 - _ * tracer.getTagInterceptor() - _ * traceConfig.getServiceMapping() >> [:] - _ * tracer.getTimeWithNanoTicks(_) - 1 * tracer.writeTimer() >> Monitoring.DISABLED.newTimer("") - _ * bufferSpy.longRunningSpansEnabled() - 1 * tracer.write(_) >> { List> spans -> - spans.first().first().processTagsAndBaggage(metadataChecker) - } - 0 * _ - metadataChecker.hasSamplingPriority - } - - def "buffer full yields immediate write"() { - setup: - // Don't start the buffer thread - - when: "Fill the buffer" - for (i in 1..buffer.queue.capacity()) { - addContinuation(newSpanOf(factory.create(DDTraceId.ONE))).finish() - } - - then: - _ * tracer.captureTraceConfig() >> traceConfig - buffer.queue.size() == BUFFER_SIZE - buffer.queue.capacity() * bufferSpy.enqueue(_) - _ * bufferSpy.longRunningSpansEnabled() - _ * tracer.getPartialFlushMinSpans() >> 10 - _ * tracer.getTagInterceptor() - _ * traceConfig.getServiceMapping() >> [:] - _ * tracer.getTimeWithNanoTicks(_) - buffer.queue.capacity() * tracer.onRootSpanPublished(_) - 0 * _ - - when: - def pendingTrace = factory.create(DDTraceId.ONE) - addContinuation(newSpanOf(pendingTrace)).finish() - - then: - 1 * tracer.captureTraceConfig() >> traceConfig - 1 * bufferSpy.enqueue(_) - 1 * tracer.writeTimer() >> Monitoring.DISABLED.newTimer("") - _ * bufferSpy.longRunningSpansEnabled() - 1 * tracer.write({ it.size() == 1 }) - _ * tracer.getPartialFlushMinSpans() >> 10 - _ * tracer.getTagInterceptor() - _ * traceConfig.getServiceMapping() >> [:] - 2 * tracer.getTimeWithNanoTicks(_) - 1 * tracer.onRootSpanPublished(_) - 0 * _ - pendingTrace.isEnqueued == 0 - } - - def "long-running trace: buffer full does not trigger write"() { - setup: - // Don't start the buffer thread - - when: "Fill the buffer" - for (i in 1..buffer.queue.capacity()) { - addContinuation(newSpanOf(factory.create(DDTraceId.ONE))).finish() - } - - then: - _ * tracer.captureTraceConfig() >> traceConfig - buffer.queue.size() == BUFFER_SIZE - buffer.queue.capacity() * bufferSpy.enqueue(_) - _ * bufferSpy.longRunningSpansEnabled() - _ * tracer.getPartialFlushMinSpans() >> 10 - _ * tracer.getTagInterceptor() - _ * traceConfig.getServiceMapping() >> [:] - _ * tracer.getTimeWithNanoTicks(_) - buffer.queue.capacity() * tracer.onRootSpanPublished(_) - 0 * _ - - when: - def pendingTrace = factory.create(DDTraceId.ONE) - pendingTrace.longRunningTrackedState = LongRunningTracesTracker.TO_TRACK - addContinuation(newSpanOf(pendingTrace)).finish() - - then: - 1 * tracer.captureTraceConfig() >> traceConfig - 1 * bufferSpy.enqueue(_) - _ * bufferSpy.longRunningSpansEnabled() - 0 * tracer.writeTimer() >> Monitoring.DISABLED.newTimer("") - _ * bufferSpy.longRunningSpansEnabled() - 0 * tracer.write({ it.size() == 1 }) - _ * tracer.getPartialFlushMinSpans() >> 10 - _ * tracer.getTagInterceptor() - _ * traceConfig.getServiceMapping() >> [:] - _ * tracer.getTimeWithNanoTicks(_) - 1 * tracer.onRootSpanPublished(_) - 0 * _ - - pendingTrace.isEnqueued == 0 - } - - def "continuation allows adding after root finished"() { - setup: - def trace = factory.create(DDTraceId.ONE) - def parent = addContinuation(newSpanOf(trace)) - TraceScope.Continuation continuation = continuations[0] - - expect: - continuations.size() == 1 - - when: - parent.finish() // This should enqueue - - then: - trace.size() == 1 - trace.pendingReferenceCount == 1 - !trace.rootSpanWritten - _ * bufferSpy.longRunningSpansEnabled() - 1 * bufferSpy.enqueue(trace) - _ * tracer.getPartialFlushMinSpans() >> 10 - _ * tracer.getTagInterceptor() - 1 * tracer.getTimeWithNanoTicks(_) - 1 * tracer.onRootSpanPublished(parent) - 0 * _ - - when: - def child = newSpanOf(parent) - child.finish() - - then: - trace.size() == 2 - trace.pendingReferenceCount == 1 - !trace.rootSpanWritten - - when: - // Don't start the buffer thread here. When the continuation is cancelled, - // pendingReferenceCount drops to 0 with rootSpanWritten still false, so - // write() is called synchronously on this thread. Starting the buffer - // would introduce a race where the worker thread could process the - // enqueued trace before continuation.cancel(), causing extra mock - // invocations (TooManyInvocationsError). - continuation.cancel() - - then: - trace.size() == 0 - trace.pendingReferenceCount == 0 - trace.rootSpanWritten - 1 * tracer.writeTimer() >> Monitoring.DISABLED.newTimer("") - 1 * tracer.write({ it.size() == 2 }) - _ * tracer.getPartialFlushMinSpans() >> 10 - _ * tracer.getTagInterceptor() - 0 * _ - } - - def "late arrival span requeues pending trace"() { - setup: - buffer.start() - def parentLatch = new CountDownLatch(1) - def childLatch = new CountDownLatch(1) - - def trace = factory.create(DDTraceId.ONE) - def parent = newSpanOf(trace) - - when: - parent.finish() // This should enqueue - parentLatch.await() - - then: - trace.size() == 0 - trace.pendingReferenceCount == 0 - trace.rootSpanWritten - _ * bufferSpy.longRunningSpansEnabled() - 1 * tracer.writeTimer() >> Monitoring.DISABLED.newTimer("") - 1 * tracer.write({ it.size() == 1 }) >> { - parentLatch.countDown() - } - _ * tracer.getPartialFlushMinSpans() >> 10 - _ * tracer.getTagInterceptor() - 1 * tracer.getTimeWithNanoTicks(_) - 1 * tracer.onRootSpanPublished(parent) - 0 * _ - - when: - def child = newSpanOf(parent) - child.finish() - childLatch.await() - - then: - trace.size() == 0 - trace.pendingReferenceCount == 0 - trace.rootSpanWritten - 1 * bufferSpy.enqueue(trace) - _ * bufferSpy.longRunningSpansEnabled() - _ * tracer.getPartialFlushMinSpans() >> 10 - _ * tracer.getTagInterceptor() - 1 * tracer.writeTimer() >> Monitoring.DISABLED.newTimer("") - 1 * tracer.write({ it.size() == 1 }) >> { - childLatch.countDown() - } - _ * traceConfig.getServiceMapping() >> [:] - _ * tracer.getTimeWithNanoTicks(_) - _ * bufferSpy.longRunningSpansEnabled() - 0 * _ - } - - def "flush clears the buffer"() { - setup: - buffer.start() - def counter = new AtomicInteger(0) - // Create a fake element that newer gets written - def element = new PendingTraceBuffer.Element() { - @Override - long oldestFinishedTime() { - return TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()) - } - - @Override - boolean lastReferencedNanosAgo(long nanos) { - return false - } - - @Override - void write() { - counter.incrementAndGet() - } - - @Override - DDSpan getRootSpan() { - return null - } - - @Override - boolean setEnqueued(boolean enqueued) { - return true - } - @Override - boolean writeOnBufferFull() { - return true - } - } - - when: - buffer.enqueue(element) - buffer.enqueue(element) - buffer.enqueue(element) - - then: - counter.get() == 0 - - when: - buffer.flush() - - then: - counter.get() == 3 - } - - def "the same pending trace is not enqueued multiple times"() { - setup: - // Don't start the buffer thread - - when: "finish the root span" - def pendingTrace = factory.create(DDTraceId.ONE) - def span = newSpanOf(pendingTrace) - span.finish() - - then: - 1 * tracer.captureTraceConfig() >> traceConfig - pendingTrace.rootSpanWritten - pendingTrace.isEnqueued == 0 - buffer.queue.size() == 0 - _ * bufferSpy.longRunningSpansEnabled() - 1 * tracer.writeTimer() >> Monitoring.DISABLED.newTimer("") - 1 * tracer.write({ it.size() == 1 }) - 1 * tracer.getPartialFlushMinSpans() >> 10000 - _ * tracer.getTagInterceptor() - 1 * traceConfig.getServiceMapping() >> [:] - 2 * tracer.getTimeWithNanoTicks(_) - 1 * tracer.onRootSpanPublished(_) - 0 * _ - - when: "fail to fill the buffer" - for (i in 1..buffer.queue.capacity()) { - addContinuation(newSpanOf(span)).finish() - } - - then: - pendingTrace.isEnqueued == 1 - buffer.queue.size() == 1 - buffer.queue.capacity() * bufferSpy.enqueue(_) - _ * bufferSpy.longRunningSpansEnabled() - _ * tracer.getPartialFlushMinSpans() >> 10000 - _ * tracer.getTagInterceptor() - _ * traceConfig.getServiceMapping() >> [:] - _ * tracer.getTimeWithNanoTicks(_) - 0 * _ - - when: "process the buffer" - buffer.start() - - then: - new PollingConditions(timeout: 3, initialDelay: 0, delay: 0.5, factor: 1).eventually { - assert pendingTrace.isEnqueued == 0 - } - } - - def "testing tracer flare dump with multiple traces"() { - setup: - TracerFlare.addReporter {} // exercises default methods - def dumpReporter = Mock(PendingTraceBuffer.TracerDump) - TracerFlare.addReporter(dumpReporter) - def trace1 = factory.create(DDTraceId.ONE) - def parent1 = newSpanOf(trace1, UNSET, System.currentTimeMillis() * 1000) - def child1 = newSpanOf(parent1) - def trace2 = factory.create(DDTraceId.from(2)) - def parent2 = newSpanOf(trace2, UNSET, System.currentTimeMillis() * 2000) - def child2 = newSpanOf(parent2) - - when: "first flare dump with two traces" - parent1.finish() - parent2.finish() - buffer.start() - def entries1 = buildAndExtractZip() - - then: - 1 * dumpReporter.prepareForFlare() - 1 * dumpReporter.addReportToFlare(_) - 1 * dumpReporter.cleanupAfterFlare() - entries1.size() == 1 - def pendingTraceText1 = entries1["pending_traces.txt"] as String - pendingTraceText1.startsWith('[{"service":"fakeService","name":"fakeOperation","resource":"fakeResource","trace_id":1,"span_id":1,"parent_id":0') // Rest of dump is timestamp specific - - def parsedTraces1 = pendingTraceText1.split('\n').collect { new JsonSlurper().parseText(it) }.flatten() - parsedTraces1.size() == 2 - parsedTraces1[0]["trace_id"] == 1 //Asserting both traces exist - parsedTraces1[1]["trace_id"] == 2 - parsedTraces1[0]["start"] < parsedTraces1[1]["start"] //Asserting the dump has the oldest trace first - - // New pending traces are needed here because generating the first flare takes long enough that the - // earlier pending traces are flushed (within 500ms). - when: "second flare dump with new pending traces" - // Finish the first set of traces - child1.finish() - child2.finish() - // Create new pending traces - def trace3 = factory.create(DDTraceId.from(3)) - def parent3 = newSpanOf(trace3, UNSET, System.currentTimeMillis() * 3000) - def child3 = newSpanOf(parent3) - def trace4 = factory.create(DDTraceId.from(4)) - def parent4 = newSpanOf(trace4, UNSET, System.currentTimeMillis() * 4000) - def child4 = newSpanOf(parent4) - parent3.finish() - parent4.finish() - def entries2 = buildAndExtractZip() - - then: - 1 * dumpReporter.prepareForFlare() - 1 * dumpReporter.addReportToFlare(_) - 1 * dumpReporter.cleanupAfterFlare() - entries2.size() == 1 - def pendingTraceText2 = entries2["pending_traces.txt"] as String - def parsedTraces2 = pendingTraceText2.split('\n').collect { new JsonSlurper().parseText(it) }.flatten() - parsedTraces2.size() == 2 - - then: - child3.finish() - child4.finish() - - then: - trace1.size() == 0 - trace1.pendingReferenceCount == 0 - trace2.size() == 0 - trace2.pendingReferenceCount == 0 - trace3.size() == 0 - trace3.pendingReferenceCount == 0 - trace4.size() == 0 - trace4.pendingReferenceCount == 0 - } - - - def addContinuation(DDSpan span) { - def scope = scopeManager.activateSpan(span) - continuations << scopeManager.captureSpan(span) - scope.close() - return span - } - - static DDSpan newSpanOf(PendingTrace trace) { - return newSpanOf(trace, UNSET, 0) - } - - static DDSpan newSpanOf(PendingTrace trace, int samplingPriority, long timestampMicro) { - def context = new DDSpanContext( - trace.traceId, - 1, - DDSpanId.ZERO, - null, - "fakeService", - "fakeOperation", - "fakeResource", - samplingPriority, - null, - Collections.emptyMap(), - false, - "fakeType", - 0, - trace, - null, - null, - NoopPathwayContext.INSTANCE, - false, - PropagationTags.factory().empty()) - return DDSpan.create("test", timestampMicro, context, null) - } - - static DDSpan newSpanOf(DDSpan parent) { - def traceCollector = parent.context().traceCollector - def context = new DDSpanContext( - traceCollector.traceId, - 2, - parent.context().spanId, - null, - "fakeService", - "fakeOperation", - "fakeResource", - UNSET, - null, - Collections.emptyMap(), - false, - "fakeType", - 0, - traceCollector, - null, - null, - NoopPathwayContext.INSTANCE, - false, - PropagationTags.factory().empty()) - return DDSpan.create("test", 0, context, null) - } - - def buildAndExtractZip() { - TracerFlare.prepareForFlare() - def out = new ByteArrayOutputStream() - try (ZipOutputStream zip = new ZipOutputStream(out)) { - TracerFlare.addReportsToFlare(zip) - } finally { - TracerFlare.cleanupAfterFlare() - } - - def entries = [:] - - def zip = new ZipInputStream(new ByteArrayInputStream(out.toByteArray())) - def entry - while (entry = zip.nextEntry) { - def bytes = new ByteArrayOutputStream() - bytes << zip - entries.put(entry.name, entry.name.endsWith(".bin") - ? bytes.toByteArray() : new String(bytes.toByteArray(), UTF_8)) - } - - return entries - } -} diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceStrictWriteTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceStrictWriteTest.groovy deleted file mode 100644 index 0a51d6f2ead..00000000000 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceStrictWriteTest.groovy +++ /dev/null @@ -1,65 +0,0 @@ -package datadog.trace.core - -class PendingTraceStrictWriteTest extends PendingTraceTestBase { - - def "trace is not reported until unfinished continuation is closed"() { - when: - def scope = tracer.activateSpan(rootSpan) - def continuation = tracer.captureActiveSpan() - scope.close() - rootSpan.finish() - - then: - traceCollector.pendingReferenceCount == 1 - traceCollector.spans.asList() == [rootSpan] - writer == [] - - when: "root span buffer delay expires" - writer.waitForTracesMax(1, 1) - - then: - traceCollector.pendingReferenceCount == 1 - traceCollector.spans.asList() == [rootSpan] - writer == [] - writer.traceCount.get() == 0 - - when: "continuation is closed" - continuation.cancel() - - then: - traceCollector.pendingReferenceCount == 0 - traceCollector.spans.isEmpty() - writer == [[rootSpan]] - writer.traceCount.get() == 1 - } - - def "negative reference count throws an exception"() { - when: - def scope = tracer.activateSpan(rootSpan) - def continuation = tracer.captureActiveSpan() - scope.close() - rootSpan.finish() - - then: - traceCollector.pendingReferenceCount == 1 - traceCollector.spans.asList() == [rootSpan] - writer == [] - - when: "continuation is finished the first time" - continuation.cancel() - - then: - traceCollector.pendingReferenceCount == 0 - traceCollector.spans.isEmpty() - writer == [[rootSpan]] - writer.traceCount.get() == 1 - - when: "continuation is finished the second time" - // Yes this should be guarded by the used flag in the continuation, - // so remove it anyway to trigger the exception - traceCollector.removeContinuation(continuation) - - then: - thrown IllegalStateException - } -} diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceTest.groovy deleted file mode 100644 index 20ccaaeb680..00000000000 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceTest.groovy +++ /dev/null @@ -1,177 +0,0 @@ -package datadog.trace.core - -import datadog.environment.JavaVirtualMachine -import datadog.trace.api.DDTraceId -import datadog.trace.api.sampling.PrioritySampling -import datadog.trace.api.time.TimeSource -import datadog.trace.api.datastreams.NoopPathwayContext -import datadog.trace.core.monitor.HealthMetrics -import datadog.trace.core.propagation.PropagationTags -import spock.lang.IgnoreIf -import spock.lang.Timeout - -import java.util.concurrent.TimeUnit - -@IgnoreIf(reason = """ -Oracle JDK 1.8 did not merge the fix in JDK-8058322, leading to the JVM failing to correctly -extract method parameters without args, when the code is compiled on a later JDK (targeting 8). -This can manifest when creating mocks. -""", value = { - JavaVirtualMachine.isOracleJDK8() -}) -class PendingTraceTest extends PendingTraceTestBase { - - @Override - protected boolean useStrictTraceWrites() { - // This tests the behavior of the relaxed pending trace implementation - return false - } - protected DDSpan createSimpleSpan(PendingTrace trace){ - return createSimpleSpanWithID(trace,1) - } - - protected DDSpan createSimpleSpanWithID(PendingTrace trace, long id){ - return new DDSpan("test", 0L, new DDSpanContext( - DDTraceId.from(1), - id, - 0, - null, - "", - "", - "", - PrioritySampling.UNSET, - "", - [:], - false, - "", - 0, - trace, - null, - null, - NoopPathwayContext.INSTANCE, - false, - PropagationTags.factory().empty()), - null) - } - - @Timeout(value = 60, unit = TimeUnit.SECONDS) - def "trace is still reported when unfinished continuation discarded"() { - when: - def scope = tracer.activateSpan(rootSpan) - tracer.captureActiveSpan() - scope.close() - rootSpan.finish() - - then: - traceCollector.pendingReferenceCount == 1 - traceCollector.spans.asList() == [rootSpan] - writer == [] - - when: "root span buffer delay expires" - writer.waitForTraces(1) - - then: - traceCollector.pendingReferenceCount == 1 - traceCollector.spans.isEmpty() - writer == [[rootSpan]] - writer.traceCount.get() == 1 - } - def "verify healthmetrics called"() { - setup: - def tracer = Stub(CoreTracer) - def traceConfig = Stub(CoreTracer.ConfigSnapshot) - def buffer = Stub(PendingTraceBuffer) - def healthMetrics = Mock(HealthMetrics) - tracer.captureTraceConfig() >> traceConfig - traceConfig.getServiceMapping() >> [:] - PendingTrace trace = new PendingTrace(tracer, DDTraceId.from(0), buffer, Mock(TimeSource), null, false, healthMetrics) - - when: - rootSpan = createSimpleSpan(trace) - trace.registerSpan(rootSpan) - - then: - 1 * healthMetrics.onCreateSpan() - - when: - rootSpan.finish() - - then: - 1 * healthMetrics.onCreateTrace() - } - - def "write when writeRunningSpans is disabled: only completed spans are written"() { - setup: - def tracer = Stub(CoreTracer) - def traceConfig = Stub(CoreTracer.ConfigSnapshot) - def buffer = Stub(PendingTraceBuffer) - def healthMetrics = Stub(HealthMetrics) - tracer.captureTraceConfig() >> traceConfig - traceConfig.getServiceMapping() >> [:] - PendingTrace trace = new PendingTrace(tracer, DDTraceId.from(0), buffer, Mock(TimeSource), null, false, healthMetrics) - buffer.longRunningSpansEnabled() >> true - - def span1 = createSimpleSpanWithID(trace,39) - span1.durationNano = 31 - span1.samplingPriority = PrioritySampling.USER_KEEP - trace.registerSpan(span1) - - def unfinishedSpan = createSimpleSpanWithID(trace, 191) - trace.registerSpan(unfinishedSpan) - - def span2 = createSimpleSpanWithID(trace, 9999) - span2.durationNano = 9191 - trace.registerSpan(span2) - def traceToWrite = new ArrayList<>(0) - - when: - def completedSpans = trace.enqueueSpansToWrite(traceToWrite, false) - - then: - completedSpans == 2 - traceToWrite.size() == 2 - traceToWrite.containsAll([span1, span2]) - trace.spans.size() == 1 - trace.spans.pop() == unfinishedSpan - } - - def "write when writeRunningSpans is enabled: complete and running spans are written"() { - setup: - def tracer = Stub(CoreTracer) - def traceConfig = Stub(CoreTracer.ConfigSnapshot) - def buffer = Stub(PendingTraceBuffer) - def healthMetrics = Stub(HealthMetrics) - tracer.captureTraceConfig() >> traceConfig - traceConfig.getServiceMapping() >> [:] - PendingTrace trace = new PendingTrace(tracer, DDTraceId.from(0), buffer, Mock(TimeSource), null, false, healthMetrics) - buffer.longRunningSpansEnabled() >> true - - def span1 = createSimpleSpanWithID(trace,39) - span1.durationNano = 31 - span1.samplingPriority = PrioritySampling.USER_KEEP - trace.registerSpan(span1) - - def unfinishedSpan = createSimpleSpanWithID(trace, 191) - trace.registerSpan(unfinishedSpan) - - def span2 = createSimpleSpanWithID(trace, 9999) - span2.setServiceName("9191") - span2.durationNano = 9191 - trace.registerSpan(span2) - - def unfinishedSpan2 = createSimpleSpanWithID(trace, 77771) - trace.registerSpan(unfinishedSpan2) - - def traceToWrite = new ArrayList<>(0) - - when: - def completedSpans = trace.enqueueSpansToWrite(traceToWrite, true) - - then: - completedSpans == 2 - traceToWrite.size() == 4 - traceToWrite.containsAll([span1, span2, unfinishedSpan, unfinishedSpan2]) - trace.spans.size() == 2 - trace.spans.containsAll([unfinishedSpan, unfinishedSpan2]) - } -} diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceTestBase.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceTestBase.groovy deleted file mode 100644 index 14926e83919..00000000000 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceTestBase.groovy +++ /dev/null @@ -1,267 +0,0 @@ -package datadog.trace.core - -import ch.qos.logback.classic.Level -import ch.qos.logback.classic.Logger -import datadog.trace.common.writer.ListWriter -import datadog.trace.core.test.DDCoreSpecification -import org.slf4j.LoggerFactory - -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit - -import static datadog.trace.api.config.TracerConfig.PARTIAL_FLUSH_MIN_SPANS - -abstract class PendingTraceTestBase extends DDCoreSpecification { - - def writer = new ListWriter() - def tracer = tracerBuilder().writer(writer).build() - - DDSpan rootSpan = tracer.buildSpan("datadog", "fakeOperation").start() - PendingTrace traceCollector = rootSpan.context().traceCollector - - def setup() { - assert traceCollector.size() == 0 - assert traceCollector.pendingReferenceCount == 1 - assert traceCollector.rootSpanWritten == false - } - - def cleanup() { - tracer?.close() - } - - def "single span gets added to trace and written when finished"() { - when: - rootSpan.finish() - writer.waitForTraces(1) - - then: - traceCollector.spans.isEmpty() - writer == [[rootSpan]] - writer.traceCount.get() == 1 - } - - def "child finishes before parent"() { - when: - def child = tracer.buildSpan("datadog", "child").asChildOf(rootSpan).start() - - then: - traceCollector.pendingReferenceCount == 2 - - when: - child.finish() - - then: - traceCollector.pendingReferenceCount == 1 - traceCollector.spans.asList() == [child] - writer == [] - - when: - rootSpan.finish() - writer.waitForTraces(1) - - then: - traceCollector.pendingReferenceCount == 0 - traceCollector.spans.isEmpty() - writer == [[rootSpan, child]] - writer.traceCount.get() == 1 - } - - def "parent finishes before child which holds up trace"() { - when: - def child = tracer.buildSpan("datadog", "child").asChildOf(rootSpan).start() - - then: - traceCollector.pendingReferenceCount == 2 - - when: - rootSpan.finish() - - then: - traceCollector.pendingReferenceCount == 1 - traceCollector.spans.asList() == [rootSpan] - writer == [] - - when: - child.finish() - writer.waitForTraces(1) - - then: - traceCollector.pendingReferenceCount == 0 - traceCollector.spans.isEmpty() - writer == [[child, rootSpan]] - writer.traceCount.get() == 1 - } - - def "child spans created after trace written reported separately"() { - setup: - rootSpan.finish() - // this shouldn't happen, but it's possible users of the api - // may incorrectly add spans after the trace is reported. - // in those cases we should still decrement the pending trace count - DDSpan childSpan = tracer.buildSpan("datadog", "child").asChildOf(rootSpan).start() - childSpan.finish() - writer.waitForTraces(2) - - expect: - traceCollector.pendingReferenceCount == 0 - traceCollector.spans.isEmpty() - writer == [[rootSpan], [childSpan]] - } - - def "test getCurrentTimeNano"() { - expect: - // Generous 5 seconds to execute this test - Math.abs(TimeUnit.NANOSECONDS.toSeconds(traceCollector.currentTimeNano) - TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())) < 5 - } - - def "partial flush"() { - when: - injectSysConfig(PARTIAL_FLUSH_MIN_SPANS, "2") - def quickTracer = tracerBuilder().writer(writer).build() - def rootSpan = quickTracer.buildSpan("datadog", "root").start() - def traceCollector = rootSpan.context().traceCollector - def child1 = quickTracer.buildSpan("datadog", "child1").asChildOf(rootSpan).start() - def child2 = quickTracer.buildSpan("datadog", "child2").asChildOf(rootSpan).start() - - then: - traceCollector.pendingReferenceCount == 3 - - when: - child2.finish() - - then: - traceCollector.pendingReferenceCount == 2 - traceCollector.spans.asList() == [child2] - writer == [] - writer.traceCount.get() == 0 - - when: - child1.finish() - writer.waitForTraces(1) - - then: - traceCollector.pendingReferenceCount == 1 - traceCollector.spans.asList() == [] - writer == [[child1, child2]] - writer.traceCount.get() == 1 - - when: - rootSpan.finish() - writer.waitForTraces(2) - - then: - traceCollector.pendingReferenceCount == 0 - traceCollector.spans.isEmpty() - writer == [[child1, child2], [rootSpan]] - writer.traceCount.get() == 2 - - cleanup: - quickTracer.close() - } - - def "partial flush with root span closed last"() { - when: - injectSysConfig(PARTIAL_FLUSH_MIN_SPANS, "2") - def quickTracer = tracerBuilder().writer(writer).build() - def rootSpan = quickTracer.buildSpan("datadog", "root").start() - def trace = rootSpan.context().traceCollector - def child1 = quickTracer.buildSpan("datadog", "child1").asChildOf(rootSpan).start() - def child2 = quickTracer.buildSpan("datadog", "child2").asChildOf(rootSpan).start() - - then: - trace.pendingReferenceCount == 3 - - when: - child1.finish() - - then: - trace.pendingReferenceCount == 2 - trace.spans.asList() == [child1] - writer == [] - writer.traceCount.get() == 0 - - when: - child2.finish() - writer.waitForTraces(1) - - then: - trace.pendingReferenceCount == 1 - trace.spans.isEmpty() - writer == [[child2, child1]] - writer.traceCount.get() == 1 - - when: - rootSpan.finish() - writer.waitForTraces(2) - - then: - trace.pendingReferenceCount == 0 - trace.spans.isEmpty() - writer == [[child2, child1], [rootSpan]] - writer.traceCount.get() == 2 - - cleanup: - quickTracer.close() - } - - def "partial flush concurrency test"() { - // reduce logging noise - def logger = (Logger) LoggerFactory.getLogger("datadog.trace") - def previousLevel = logger.level - logger.setLevel(Level.OFF) - - setup: - def latch = new CountDownLatch(1) - def rootSpan = tracer.buildSpan("test", "root").start() - PendingTrace traceCollector = rootSpan.context().traceCollector - def exceptions = [] - def threads = (1..threadCount).collect { - Thread.start { - try { - latch.await() - def spans = (1..spanCount).collect { - tracer.startSpan("test", "child", rootSpan.context()) - } - spans.each { - it.finish() - } - } catch (Throwable ex) { - exceptions << ex - } - } - } - - when: - // Finish root span so other spans are queued automatically - rootSpan.finish() - - then: - writer.waitForTraces(1) - - when: - latch.countDown() - threads.each { - it.join() - } - traceCollector.pendingTraceBuffer.flush() - logger.setLevel(previousLevel) - - then: - exceptions.isEmpty() - traceCollector.pendingReferenceCount == 0 - writer.sum { it.size() } == threadCount * spanCount + 1 - - cleanup: - logger.setLevel(previousLevel) - - where: - threadCount | spanCount - 1 | 1 - 2 | 1 - 1 | 2 - // Sufficiently large to fill the buffer: - 5 | 2000 - 10 | 1000 - 50 | 500 - } -} diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/TraceInterceptorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/TraceInterceptorTest.groovy deleted file mode 100644 index 04fbccf41d8..00000000000 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/TraceInterceptorTest.groovy +++ /dev/null @@ -1,221 +0,0 @@ -package datadog.trace.core - -import datadog.trace.TestInterceptor -import datadog.trace.api.GlobalTracer -import datadog.trace.api.config.TracerConfig -import datadog.trace.api.interceptor.MutableSpan -import datadog.trace.api.interceptor.TraceInterceptor -import datadog.trace.common.writer.ListWriter -import datadog.trace.core.test.DDCoreSpecification -import spock.lang.Timeout - -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicBoolean - -@Timeout(10) -class TraceInterceptorTest extends DDCoreSpecification { - - def writer = new ListWriter() - def tracer - - def setup() { - injectSysConfig(TracerConfig.TRACE_GIT_METADATA_ENABLED, "false") - tracer = tracerBuilder().writer(writer).build() - } - - def cleanup() { - tracer?.close() - } - - def "interceptor is registered as a service"() { - expect: - tracer.interceptors.interceptors()[0] instanceof TestInterceptor - } - - def "interceptors with the same priority replaced"() { - setup: - int priority = 999 - ((TestInterceptor) tracer.interceptors.interceptors()[0]).priority = priority - tracer.interceptors.add(new TraceInterceptor() { - @Override - Collection onTraceComplete(Collection trace) { - return [] - } - - @Override - int priority() { - return priority - } - }) - - when: - def interceptors = tracer.interceptors.interceptors() - - then: - interceptors.length == 1 - interceptors[0] instanceof TestInterceptor - } - - def "interceptors with different priority sorted"() { - setup: - def priority = score - def existingInterceptor = tracer.interceptors.interceptors()[0] - def newInterceptor = new TraceInterceptor() { - @Override - Collection onTraceComplete(Collection trace) { - return [] - } - - @Override - int priority() { - return priority - } - } - tracer.interceptors.add(newInterceptor) - - expect: - Arrays.asList(tracer.interceptors.interceptors()) == reverse ? [newInterceptor, existingInterceptor]: [existingInterceptor, newInterceptor] - - where: - score | reverse - -1 | false - 1 | true - } - - def "interceptor can discard a trace (p=#score)"() { - setup: - def called = new AtomicBoolean(false) - def latch = new CountDownLatch(1) - def priority = score - tracer.interceptors.add(new TraceInterceptor() { - @Override - Collection onTraceComplete(Collection trace) { - called.set(true) - latch.countDown() - return [] - } - - @Override - int priority() { - return priority - } - }) - tracer.buildSpan("datadog", "test " + score).start().finish() - if (score == TestInterceptor.priority) { - // the interceptor didn't get added, so latch will never be released. - writer.waitForTraces(1) - } else { - latch.await(5, TimeUnit.SECONDS) - } - - when: - def interceptors = tracer.interceptors.interceptors() - - then: - interceptors.length == expectedSize - (called.get()) == (score != TestInterceptor.priority) - (writer == []) == (score != TestInterceptor.priority) - - where: - score | expectedSize| _ - TestInterceptor.priority-1 | 2| _ - TestInterceptor.priority | 1| _ // This conflicts with TestInterceptor, so it won't be added. - TestInterceptor.priority+1 | 2| _ - } - - def "interceptor can modify a span"() { - setup: - tracer.interceptors.add(new TraceInterceptor() { - @Override - Collection onTraceComplete(Collection trace) { - for (MutableSpan span : trace) { - span - .setOperationName("modifiedON-" + span.getOperationName()) - .setServiceName("modifiedSN-" + span.getServiceName()) - .setResourceName("modifiedRN-" + span.getResourceName()) - .setSpanType("modifiedST-" + span.getSpanType()) - .setTag("boolean-tag", true) - .setTag("number-tag", 5.0) - .setTag("string-tag", "howdy") - .setError(true) - } - return trace - } - - @Override - int priority() { - return 1 - } - }) - tracer.buildSpan("datadog", "test").start().finish() - writer.waitForTraces(1) - - expect: - def trace = writer.firstTrace() - trace.size() == 1 - - def span = trace[0] - - span.context().operationName == "modifiedON-test" - span.serviceName.startsWith("modifiedSN-") - span.resourceName.toString() == "modifiedRN-modifiedON-test" - span.type == "modifiedST-null" - span.context().getErrorFlag() - - def tags = span.context().tags - - tags["boolean-tag"] == true - tags["number-tag"] == 5.0 - tags["string-tag"] == "howdy" - - tags["thread.name"] != null - tags["thread.id"] != null - tags["runtime-id"] != null - tags["language"] != null - tags.size() >= 7 - } - - def "should be robust when interceptor return a null trace"() { - setup: - tracer.interceptors.add(new TraceInterceptor() { - @Override - Collection onTraceComplete(Collection trace) { - null - } - - @Override - int priority() { - return 0 - } - }) - - when: - DDSpan span = (DDSpan) tracer.startSpan("test", "test") - span.phasedFinish() - tracer.write(SpanList.of(span)) - - then: - notThrown(Throwable) - } - - def "register interceptor through bridge"() { - setup: - GlobalTracer.registerIfAbsent(tracer) - def interceptor = new TraceInterceptor() { - @Override - Collection onTraceComplete(Collection trace) { - return trace - } - - @Override - int priority() { - return 38 - } - } - - expect: - GlobalTracer.get().addTraceInterceptor(interceptor) - Arrays.asList(tracer.interceptors.interceptors()).contains(interceptor) - } -} diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/TracingConfigPollerTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/TracingConfigPollerTest.groovy deleted file mode 100644 index 2858fa61f4f..00000000000 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/TracingConfigPollerTest.groovy +++ /dev/null @@ -1,291 +0,0 @@ -package datadog.trace.core - -import datadog.communication.ddagent.DDAgentFeaturesDiscovery -import datadog.communication.ddagent.SharedCommunicationObjects -import datadog.metrics.api.Monitoring -import datadog.remoteconfig.ConfigurationPoller -import datadog.remoteconfig.Product -import datadog.remoteconfig.state.ParsedConfigKey -import datadog.remoteconfig.state.ProductListener -import datadog.trace.api.datastreams.DataStreamsTransactionExtractor -import datadog.trace.core.test.DDCoreSpecification -import java.nio.charset.StandardCharsets -import okhttp3.HttpUrl -import okhttp3.OkHttpClient -import spock.lang.Timeout - -@Timeout(10) -class TracingConfigPollerTest extends DDCoreSpecification { - - def "test mergeLibConfigs with null and non-null values"() { - setup: - def config1 = new TracingConfigPoller.LibConfig() // all nulls - def config2 = new TracingConfigPoller.LibConfig( - tracingEnabled: true, - debugEnabled: false, - runtimeMetricsEnabled: true, - logsInjectionEnabled: false, - dataStreamsEnabled: true, - traceSampleRate: 0.5, - dynamicInstrumentationEnabled: true, - exceptionReplayEnabled: false, - codeOriginEnabled: true, - liveDebuggingEnabled: false - ) - def config3 = new TracingConfigPoller.LibConfig( - tracingEnabled: false, - debugEnabled: true, - runtimeMetricsEnabled: false, - logsInjectionEnabled: true, - dataStreamsEnabled: false, - traceSampleRate: 0.8, - dynamicInstrumentationEnabled: false, - exceptionReplayEnabled: true, - codeOriginEnabled: false, - liveDebuggingEnabled: true - ) - - when: - def merged = TracingConfigPoller.LibConfig.mergeLibConfigs([config1, config2, config3]) - - then: - merged != null - // Should take first non-null values from config2 - merged.tracingEnabled == true - merged.debugEnabled == false - merged.runtimeMetricsEnabled == true - merged.logsInjectionEnabled == false - merged.dataStreamsEnabled == true - merged.traceSampleRate == 0.5 - merged.dynamicInstrumentationEnabled == true - merged.exceptionReplayEnabled == false - merged.codeOriginEnabled == true - merged.liveDebuggingEnabled == false - } - - def "test config priority calculation"() { - setup: - def configOverrides = new TracingConfigPoller.ConfigOverrides() - if (service != null || env != null) { - configOverrides.serviceTarget = new TracingConfigPoller.ServiceTarget( - service: service, - env: env, - ) - } - if (clusterName != null) { - configOverrides.k8sTargetV2 = new TracingConfigPoller.K8sTargetV2( - clusterTargets: [ - new TracingConfigPoller.ClusterTarget( - clusterName: clusterName, - enabled: true, - ) - ] - ) - } - configOverrides.libConfig = new TracingConfigPoller.LibConfig() - - when: - def priority = configOverrides.getOverridePriority() - - then: - priority == expectedPriority - - where: - service | env | clusterName | expectedPriority - "test-service" | "staging" | null | 5 - "test-service" | "*" | null | 4 - "*" | "staging" | null | 3 - null | null | "test-cluster" | 2 - "*" | "*" | null | 1 - } - - - def "test actual config commit with service and org level configs"() { - setup: - def orgKey = ParsedConfigKey.parse("datadog/2/APM_TRACING/org_config/config") - def serviceKey = ParsedConfigKey.parse("datadog/2/APM_TRACING/service_config/config") - def poller = Mock(ConfigurationPoller) - def sco = new SharedCommunicationObjects( - agentHttpClient: Mock(OkHttpClient), - monitoring: Mock(Monitoring), - agentUrl: HttpUrl.get('https://example.com'), - featuresDiscovery: Mock(DDAgentFeaturesDiscovery), - configurationPoller: poller - ) - - def updater - - when: - def tracer = CoreTracer.builder() - .sharedCommunicationObjects(sco) - .pollForTracingConfiguration() - .build() - - then: - 1 * poller.addListener(Product.APM_TRACING, _ as ProductListener) >> { - updater = it[1] // capture config updater for further testing - } - and: - tracer.captureTraceConfig().serviceMapping == [:] - tracer.captureTraceConfig().traceSampleRate == null - - when: - // Add org level config (priority 1) - should set service mapping - updater.accept(orgKey, """ - { - "service_target": { - "service": "*", - "env": "*" - }, - "lib_config": { - "tracing_service_mapping": [ - { - "from_key": "org-service", - "to_name": "org-mapped" - } - ], - "tracing_sampling_rate": 0.7 - } - } - """.getBytes(StandardCharsets.UTF_8), null) - - // Add service level config (priority 4) - should override service mapping and add header tags - updater.accept(serviceKey, """ - { - "service_target": { - "service": "test-service", - "env": "*" - }, - "lib_config": { - "tracing_service_mapping": [ - { - "from_key": "service-specific", - "to_name": "service-mapped" - } - ], - "tracing_header_tags": [ - { - "header": "X-Custom-Header", - "tag_name": "custom.header" - } - ], - "tracing_sampling_rate": 1.3, - "data_streams_transaction_extractors": [ - { - "name": "test", - "type": "unknown", - "value": "value" - } - ] - } - } - """.getBytes(StandardCharsets.UTF_8), null) - - // Commit both configs - updater.commit() - - then: - // Service level config should take precedence due to higher priority (4 vs 1) - tracer.captureTraceConfig().serviceMapping == ["service-specific": "service-mapped"] - tracer.captureTraceConfig().traceSampleRate == 1.0 // should be clamped to 1.0 - tracer.captureTraceConfig().requestHeaderTags == ["x-custom-header": "custom.header"] - tracer.captureTraceConfig().responseHeaderTags == ["x-custom-header": "custom.header"] - tracer.captureTraceConfig().getDataStreamsTransactionExtractors().size() == 1 - tracer.captureTraceConfig().getDataStreamsTransactionExtractors()[0].name == "test" - tracer.captureTraceConfig().getDataStreamsTransactionExtractors()[0].type == DataStreamsTransactionExtractor.Type.UNKNOWN - tracer.captureTraceConfig().getDataStreamsTransactionExtractors()[0].value == "value" - - when: - // Remove service level config - updater.remove(serviceKey, null) - updater.commit() - - then: - // Should fall back to org level config - tracer.captureTraceConfig().serviceMapping == ["org-service": "org-mapped"] - tracer.captureTraceConfig().traceSampleRate == 0.7 - tracer.captureTraceConfig().requestHeaderTags == [:] - tracer.captureTraceConfig().responseHeaderTags == [:] - - when: - // Remove org level config - updater.remove(orgKey, null) - updater.commit() - - then: - // Should have no configs - tracer.captureTraceConfig().serviceMapping == [:] - tracer.captureTraceConfig().traceSampleRate == null - - cleanup: - tracer?.close() - } - - def "test two org levels config setting different flags works"() { - setup: - def orgConfig1Key = ParsedConfigKey.parse("datadog/2/APM_TRACING/org_config/config1") - def orgConfig2Key = ParsedConfigKey.parse("datadog/2/APM_TRACING/org_config/config2") - def poller = Mock(ConfigurationPoller) - def sco = new SharedCommunicationObjects( - agentHttpClient: Mock(OkHttpClient), - monitoring: Mock(Monitoring), - agentUrl: HttpUrl.get('https://example.com'), - featuresDiscovery: Mock(DDAgentFeaturesDiscovery), - configurationPoller: poller - ) - - def updater - - when: - def tracer = CoreTracer.builder() - .sharedCommunicationObjects(sco) - .pollForTracingConfiguration() - .build() - - then: - 1 * poller.addListener(Product.APM_TRACING, _ as ProductListener) >> { - updater = it[1] // capture config updater for further testing - } - and: - tracer.captureTraceConfig().isTraceEnabled() == true - tracer.captureTraceConfig().isDataStreamsEnabled() == false - - when: - // Add org level config with ApmTracing enabled - updater.accept(orgConfig1Key, """ - { - "service_target": { - "service": "*", - "env": "*" - }, - "lib_config": { - "tracing_enabled": true - } - } - """.getBytes(StandardCharsets.UTF_8), null) - - // Add second org level config with DataStreams enabled - updater.accept(orgConfig2Key, """ - { - "service_target": { - "service": "*", - "env": "*" - }, - "lib_config": { - "data_streams_enabled": true - } - } - """.getBytes(StandardCharsets.UTF_8), null) - - // Commit both configs - updater.commit() - - then: - // Both org level configs should be merged, with data streams enabled - tracer.captureTraceConfig().isTraceEnabled() == true - tracer.captureTraceConfig().isDataStreamsEnabled() == true - - cleanup: - tracer?.close() - } -} diff --git a/dd-trace-core/src/test/java/datadog/trace/core/PendingTraceBufferTest.java b/dd-trace-core/src/test/java/datadog/trace/core/PendingTraceBufferTest.java new file mode 100644 index 00000000000..e3b03b53c15 --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/core/PendingTraceBufferTest.java @@ -0,0 +1,605 @@ +package datadog.trace.core; + +import static datadog.trace.api.sampling.PrioritySampling.UNSET; +import static datadog.trace.api.sampling.PrioritySampling.USER_KEEP; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import datadog.environment.JavaVirtualMachine; +import datadog.metrics.api.Monitoring; +import datadog.trace.SamplingPriorityMetadataChecker; +import datadog.trace.api.Config; +import datadog.trace.api.DDSpanId; +import datadog.trace.api.DDTraceId; +import datadog.trace.api.datastreams.NoopPathwayContext; +import datadog.trace.api.flare.TracerFlare; +import datadog.trace.api.time.SystemTimeSource; +import datadog.trace.context.TraceScope; +import datadog.trace.core.monitor.HealthMetrics; +import datadog.trace.core.propagation.PropagationTags; +import datadog.trace.core.scopemanager.ContinuableScopeManager; +import datadog.trace.test.util.DDJavaSpecification; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; +import java.util.zip.ZipOutputStream; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +@Timeout(5) +public class PendingTraceBufferTest extends DDJavaSpecification { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private PendingTraceBuffer buffer; + private PendingTraceBuffer bufferSpy; + private PendingTraceBuffer.DelayingPendingTraceBuffer delayingBuffer; + private CoreTracer tracer; + private CoreTracer.ConfigSnapshot traceConfig; + private ContinuableScopeManager scopeManager; + private PendingTrace.Factory factory; + private List continuations; + + @BeforeAll + static void checkJvm() { + Assumptions.assumeFalse( + JavaVirtualMachine.isOracleJDK8(), + "Oracle JDK 1.8 did not merge the fix in JDK-8058322, leading to the JVM failing to" + + " correctly extract method parameters without args, when the code is compiled on a" + + " later JDK (targeting 8). This can manifest when creating mocks."); + } + + @BeforeEach + void setup() { + tracer = mock(CoreTracer.class); + traceConfig = mock(CoreTracer.ConfigSnapshot.class); + buffer = PendingTraceBuffer.delaying(SystemTimeSource.INSTANCE, mock(Config.class), null, null); + bufferSpy = spy(buffer); + delayingBuffer = (PendingTraceBuffer.DelayingPendingTraceBuffer) buffer; + scopeManager = new ContinuableScopeManager(10, true); + factory = + new PendingTrace.Factory( + tracer, bufferSpy, SystemTimeSource.INSTANCE, false, HealthMetrics.NO_OP); + continuations = new ArrayList<>(); + + when(tracer.captureTraceConfig()).thenReturn(traceConfig); + when(traceConfig.getServiceMapping()).thenReturn(Collections.emptyMap()); + when(tracer.getPartialFlushMinSpans()).thenReturn(10); + when(tracer.writeTimer()).thenReturn(Monitoring.DISABLED.newTimer("")); + when(tracer.getTimeWithNanoTicks(anyLong())).thenAnswer(inv -> inv.getArgument(0)); + } + + @AfterEach + void cleanup() throws InterruptedException { + buffer.close(); + delayingBuffer.getWorker().join(1000); + } + + @Test + void testBufferLifecycle() throws InterruptedException { + assertFalse(delayingBuffer.getWorker().isAlive()); + + buffer.start(); + + assertTrue(delayingBuffer.getWorker().isAlive()); + assertTrue(delayingBuffer.getWorker().isDaemon()); + + assertThrows(IllegalThreadStateException.class, () -> buffer.start()); + assertTrue(delayingBuffer.getWorker().isAlive()); + assertTrue(delayingBuffer.getWorker().isDaemon()); + + buffer.close(); + delayingBuffer.getWorker().join(1000); + + assertFalse(delayingBuffer.getWorker().isAlive()); + } + + @Test + void continuationBuffersRoot() { + PendingTrace trace = factory.create(DDTraceId.ONE); + DDSpan span = newSpanOf(trace); + + assertFalse(trace.isRootSpanWritten()); + + addContinuation(span); + span.finish(); // This should enqueue + + assertEquals(1, continuations.size()); + assertEquals(1, trace.getPendingReferenceCount()); + verify(bufferSpy).enqueue(trace); + verify(tracer).onRootSpanPublished(span); + + clearInvocations(bufferSpy, tracer, traceConfig); + + continuations.get(0).cancel(); + + assertEquals(0, trace.getPendingReferenceCount()); + verify(tracer).write(any()); + verify(tracer).writeTimer(); + } + + @Test + void unfinishedChildBuffersRoot() { + PendingTrace trace = factory.create(DDTraceId.ONE); + DDSpan parent = newSpanOf(trace); + DDSpan child = newSpanOf(parent); + + assertFalse(trace.isRootSpanWritten()); + + parent.finish(); // This should enqueue + + assertEquals(1, trace.size()); + assertEquals(1, trace.getPendingReferenceCount()); + verify(bufferSpy).enqueue(trace); + verify(tracer).onRootSpanPublished(parent); + + clearInvocations(bufferSpy, tracer); + + child.finish(); + + assertEquals(0, trace.size()); + assertEquals(0, trace.getPendingReferenceCount()); + verify(tracer).write(any()); + verify(tracer).writeTimer(); + } + + @Test + void prioritySamplingIsAlwaysSent() { + SamplingPriorityMetadataChecker metadataChecker = new SamplingPriorityMetadataChecker(); + doAnswer( + invocation -> { + List spans = invocation.getArgument(0); + spans.get(0).processTagsAndBaggage(metadataChecker); + return null; + }) + .when(tracer) + .write(any()); + + DDSpan parent = addContinuation(newSpanOf(factory.create(DDTraceId.ONE), USER_KEEP, 0)); + // Fill the buffer - Only children - Priority taken from root + for (int i = 0; i < 11; i++) { + newSpanOf(parent).finish(); + } + + verify(tracer).write(any()); + assertTrue(metadataChecker.hasSamplingPriority); + } + + @Test + void bufferFullYieldsImmediateWrite() { + int capacity = delayingBuffer.getQueue().capacity(); + + // Fill the buffer + for (int i = 0; i < capacity; i++) { + addContinuation(newSpanOf(factory.create(DDTraceId.ONE))).finish(); + } + + assertEquals(capacity, delayingBuffer.getQueue().size()); + verify(bufferSpy, times(capacity)).enqueue(any()); + verify(tracer, times(capacity)).onRootSpanPublished(any()); + + clearInvocations(bufferSpy, tracer, traceConfig); + + PendingTrace pendingTrace = factory.create(DDTraceId.ONE); + addContinuation(newSpanOf(pendingTrace)).finish(); + + verify(bufferSpy).enqueue(any()); + verify(tracer).write(any()); + verify(tracer).onRootSpanPublished(any()); + assertEquals(0, pendingTrace.getIsEnqueued()); + } + + @Test + void longRunningTraceBufferFullDoesNotTriggerWrite() { + int capacity = delayingBuffer.getQueue().capacity(); + + // Fill the buffer + for (int i = 0; i < capacity; i++) { + addContinuation(newSpanOf(factory.create(DDTraceId.ONE))).finish(); + } + + assertEquals(capacity, delayingBuffer.getQueue().size()); + verify(bufferSpy, times(capacity)).enqueue(any()); + + clearInvocations(bufferSpy, tracer, traceConfig); + + PendingTrace pendingTrace = factory.create(DDTraceId.ONE); + pendingTrace.setLongRunningTrackedState(LongRunningTracesTracker.TO_TRACK); + addContinuation(newSpanOf(pendingTrace)).finish(); + + verify(bufferSpy).enqueue(any()); + verify(tracer, never()).write(any()); + verify(tracer).onRootSpanPublished(any()); + assertEquals(0, pendingTrace.getIsEnqueued()); + } + + @Test + void continuationAllowsAddingAfterRootFinished() { + PendingTrace trace = factory.create(DDTraceId.ONE); + DDSpan parent = addContinuation(newSpanOf(trace)); + TraceScope.Continuation continuation = continuations.get(0); + + assertEquals(1, continuations.size()); + + parent.finish(); // This should enqueue + + assertEquals(1, trace.size()); + assertEquals(1, trace.getPendingReferenceCount()); + assertFalse(trace.isRootSpanWritten()); + verify(bufferSpy).enqueue(trace); + verify(tracer).onRootSpanPublished(parent); + + clearInvocations(bufferSpy, tracer); + + DDSpan child = newSpanOf(parent); + child.finish(); + + assertEquals(2, trace.size()); + assertEquals(1, trace.getPendingReferenceCount()); + assertFalse(trace.isRootSpanWritten()); + + // Don't start the buffer thread here. When the continuation is cancelled, + // pendingReferenceCount drops to 0 with rootSpanWritten still false, so + // write() is called synchronously on this thread. Starting the buffer + // would introduce a race where the worker thread could process the + // enqueued trace before continuation.cancel(), causing unexpected interactions. + continuation.cancel(); + + assertEquals(0, trace.size()); + assertEquals(0, trace.getPendingReferenceCount()); + assertTrue(trace.isRootSpanWritten()); + verify(tracer).write(any()); + verify(tracer).writeTimer(); + } + + @Test + void lateArrivalSpanRequeuesPendingTrace() throws InterruptedException { + buffer.start(); + CountDownLatch parentLatch = new CountDownLatch(1); + CountDownLatch childLatch = new CountDownLatch(1); + + PendingTrace trace = factory.create(DDTraceId.ONE); + DDSpan parent = newSpanOf(trace); + + doAnswer( + invocation -> { + parentLatch.countDown(); + return null; + }) + .doAnswer( + invocation -> { + childLatch.countDown(); + return null; + }) + .when(tracer) + .write(any()); + + parent.finish(); // This should enqueue + assertTrue(parentLatch.await(3, TimeUnit.SECONDS)); + + assertEquals(0, trace.size()); + assertEquals(0, trace.getPendingReferenceCount()); + assertTrue(trace.isRootSpanWritten()); + verify(tracer).write(any()); + verify(tracer).onRootSpanPublished(parent); + + clearInvocations(bufferSpy, tracer, traceConfig); + + DDSpan child = newSpanOf(parent); + child.finish(); + assertTrue(childLatch.await(3, TimeUnit.SECONDS)); + + assertEquals(0, trace.size()); + assertEquals(0, trace.getPendingReferenceCount()); + assertTrue(trace.isRootSpanWritten()); + verify(bufferSpy, atLeastOnce()).enqueue(trace); + verify(tracer).write(any()); + } + + @Test + void flushClearsTheBuffer() throws InterruptedException { + buffer.start(); + AtomicInteger counter = new AtomicInteger(0); + // Create a fake element that newer gets written + PendingTraceBuffer.Element element = + new PendingTraceBuffer.Element() { + @Override + public long oldestFinishedTime() { + return TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()); + } + + @Override + public boolean lastReferencedNanosAgo(long nanos) { + return false; + } + + @Override + public void write() { + counter.incrementAndGet(); + } + + @Override + public DDSpan getRootSpan() { + return null; + } + + @Override + public boolean setEnqueued(boolean enqueued) { + return true; + } + + @Override + public boolean writeOnBufferFull() { + return true; + } + }; + + bufferSpy.enqueue(element); + bufferSpy.enqueue(element); + bufferSpy.enqueue(element); + + assertEquals(0, counter.get()); + + buffer.flush(); + + assertEquals(3, counter.get()); + } + + @Test + void samePendingTraceIsNotEnqueuedMultipleTimes() { + when(tracer.getPartialFlushMinSpans()).thenReturn(10000); + + PendingTrace pendingTrace = factory.create(DDTraceId.ONE); + DDSpan span = newSpanOf(pendingTrace); + span.finish(); + + assertTrue(pendingTrace.isRootSpanWritten()); + assertEquals(0, pendingTrace.getIsEnqueued()); + assertEquals(0, delayingBuffer.getQueue().size()); + verify(tracer).write(any()); + + clearInvocations(bufferSpy, tracer, traceConfig); + + int capacity = delayingBuffer.getQueue().capacity(); + // fail to fill the buffer + for (int i = 0; i < capacity; i++) { + addContinuation(newSpanOf(span)).finish(); + } + + assertEquals(1, pendingTrace.getIsEnqueued()); + assertEquals(1, delayingBuffer.getQueue().size()); + verify(bufferSpy, times(capacity)).enqueue(any()); + + // process the buffer + buffer.start(); + + long deadline = System.currentTimeMillis() + 3000; + while (pendingTrace.getIsEnqueued() != 0 && System.currentTimeMillis() < deadline) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + assertEquals(0, pendingTrace.getIsEnqueued()); + } + + @Test + void testingTracerFlareDumpWithMultipleTraces() throws IOException, InterruptedException { + TracerFlare.addReporter(zip -> {}); // exercises default methods + TracerFlare.Reporter dumpReporter = mock(TracerFlare.Reporter.class); + TracerFlare.addReporter(dumpReporter); + + PendingTrace trace1 = factory.create(DDTraceId.ONE); + DDSpan parent1 = newSpanOf(trace1, UNSET, System.currentTimeMillis() * 1000); + DDSpan child1 = newSpanOf(parent1); + PendingTrace trace2 = factory.create(DDTraceId.from(2)); + DDSpan parent2 = newSpanOf(trace2, UNSET, System.currentTimeMillis() * 2000); + DDSpan child2 = newSpanOf(parent2); + // first flare dump with two traces + parent1.finish(); + parent2.finish(); + buffer.start(); + Map entries1 = buildAndExtractZip(); + + verify(dumpReporter).prepareForFlare(); + verify(dumpReporter).addReportToFlare(any()); + verify(dumpReporter).cleanupAfterFlare(); + + assertEquals(1, entries1.size()); + String pendingTraceText1 = (String) entries1.get("pending_traces.txt"); + assertTrue( + pendingTraceText1.startsWith( + "[{\"service\":\"fakeService\",\"name\":\"fakeOperation\",\"resource\":\"fakeResource\",\"trace_id\":1,\"span_id\":1,\"parent_id\":0")); + + List> parsedTraces1 = parseTraceLines(pendingTraceText1); + assertEquals(2, parsedTraces1.size()); + assertEquals(1L, ((Number) parsedTraces1.get(0).get("trace_id")).longValue()); + assertEquals(2L, ((Number) parsedTraces1.get(1).get("trace_id")).longValue()); + assertTrue( + ((Number) parsedTraces1.get(0).get("start")).longValue() + < ((Number) parsedTraces1.get(1).get("start")).longValue()); + + clearInvocations(dumpReporter); + + // New pending traces are needed here because generating the first flare takes long enough that + // the + // earlier pending traces are flushed (within 500ms). + // second flare dump with new pending traces + // Finish the first set of traces + child1.finish(); + child2.finish(); + // Create new pending traces + PendingTrace trace3 = factory.create(DDTraceId.from(3)); + DDSpan parent3 = newSpanOf(trace3, UNSET, System.currentTimeMillis() * 3000); + DDSpan child3 = newSpanOf(parent3); + PendingTrace trace4 = factory.create(DDTraceId.from(4)); + DDSpan parent4 = newSpanOf(trace4, UNSET, System.currentTimeMillis() * 4000); + DDSpan child4 = newSpanOf(parent4); + parent3.finish(); + parent4.finish(); + Map entries2 = buildAndExtractZip(); + + verify(dumpReporter).prepareForFlare(); + verify(dumpReporter).addReportToFlare(any()); + verify(dumpReporter).cleanupAfterFlare(); + + assertEquals(1, entries2.size()); + String pendingTraceText2 = (String) entries2.get("pending_traces.txt"); + List> parsedTraces2 = parseTraceLines(pendingTraceText2); + assertEquals(2, parsedTraces2.size()); + + child3.finish(); + child4.finish(); + + long deadline = System.currentTimeMillis() + 3000; + while ((trace1.size() != 0 || trace2.size() != 0 || trace3.size() != 0 || trace4.size() != 0) + && System.currentTimeMillis() < deadline) { + Thread.sleep(50); + } + + assertEquals(0, trace1.size()); + assertEquals(0, trace1.getPendingReferenceCount()); + assertEquals(0, trace2.size()); + assertEquals(0, trace2.getPendingReferenceCount()); + assertEquals(0, trace3.size()); + assertEquals(0, trace3.getPendingReferenceCount()); + assertEquals(0, trace4.size()); + assertEquals(0, trace4.getPendingReferenceCount()); + } + + private DDSpan addContinuation(DDSpan span) { + TraceScope scope = scopeManager.activateSpan(span); + continuations.add(scopeManager.captureSpan(span)); + scope.close(); + return span; + } + + private static DDSpan newSpanOf(PendingTrace trace) { + return newSpanOf(trace, UNSET, 0); + } + + private static DDSpan newSpanOf(PendingTrace trace, int samplingPriority, long timestampMicro) { + DDSpanContext context = + new DDSpanContext( + trace.getTraceId(), + 1, + DDSpanId.ZERO, + null, + "fakeService", + "fakeOperation", + "fakeResource", + samplingPriority, + null, + Collections.emptyMap(), + false, + "fakeType", + 0, + trace, + null, + null, + NoopPathwayContext.INSTANCE, + false, + PropagationTags.factory().empty()); + return DDSpan.create("test", timestampMicro, context, null); + } + + private static DDSpan newSpanOf(DDSpan parent) { + TraceCollector traceCollector = parent.context().getTraceCollector(); + DDSpanContext context = + new DDSpanContext( + parent.context().getTraceId(), + 2, + parent.context().getSpanId(), + null, + "fakeService", + "fakeOperation", + "fakeResource", + UNSET, + null, + Collections.emptyMap(), + false, + "fakeType", + 0, + traceCollector, + null, + null, + NoopPathwayContext.INSTANCE, + false, + PropagationTags.factory().empty()); + return DDSpan.create("test", 0, context, null); + } + + private Map buildAndExtractZip() throws IOException { + TracerFlare.prepareForFlare(); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (ZipOutputStream zip = new ZipOutputStream(out)) { + TracerFlare.addReportsToFlare(zip); + } finally { + TracerFlare.cleanupAfterFlare(); + } + + Map entries = new LinkedHashMap<>(); + try (ZipInputStream zip = new ZipInputStream(new ByteArrayInputStream(out.toByteArray()))) { + ZipEntry entry; + byte[] buf = new byte[4096]; + while ((entry = zip.getNextEntry()) != null) { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + int read; + while ((read = zip.read(buf)) != -1) { + bytes.write(buf, 0, read); + } + String name = entry.getName(); + if (name.endsWith(".bin")) { + entries.put(name, bytes.toByteArray()); + } else { + entries.put(name, new String(bytes.toByteArray(), UTF_8)); + } + } + } + return entries; + } + + private static List> parseTraceLines(String text) throws IOException { + List> allSpans = new ArrayList<>(); + for (String line : text.split("\n")) { + if (!line.isEmpty()) { + List> lineSpans = + OBJECT_MAPPER.readValue(line, new TypeReference>>() {}); + allSpans.addAll(lineSpans); + } + } + return allSpans; + } +} diff --git a/dd-trace-core/src/test/java/datadog/trace/core/PendingTraceStrictWriteTest.java b/dd-trace-core/src/test/java/datadog/trace/core/PendingTraceStrictWriteTest.java new file mode 100644 index 00000000000..47cc41da7a0 --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/core/PendingTraceStrictWriteTest.java @@ -0,0 +1,63 @@ +package datadog.trace.core; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import java.util.ArrayList; +import java.util.Arrays; +import org.junit.jupiter.api.Test; + +public class PendingTraceStrictWriteTest extends PendingTraceTestBase { + + @Test + void traceNotReportedUntilContinuationClosed() throws InterruptedException { + AgentScope scope = tracer.activateSpan(rootSpan); + AgentScope.Continuation continuation = tracer.captureActiveSpan(); + scope.close(); + rootSpan.finish(); + + assertEquals(1, traceCollector.getPendingReferenceCount()); + assertEquals(Arrays.asList(rootSpan), new ArrayList<>(traceCollector.getSpans())); + assertTrue(writer.isEmpty()); + // root span buffer delay expires + writer.waitForTracesMax(1, 1); + + assertEquals(1, traceCollector.getPendingReferenceCount()); + assertEquals(Arrays.asList(rootSpan), new ArrayList<>(traceCollector.getSpans())); + assertTrue(writer.isEmpty()); + assertEquals(0, writer.getTraceCount()); + // continuation is closed + continuation.cancel(); + + assertEquals(0, traceCollector.getPendingReferenceCount()); + assertTrue(traceCollector.getSpans().isEmpty()); + assertEquals(Arrays.asList(Arrays.asList(rootSpan)), new ArrayList<>(writer)); + assertEquals(1, writer.getTraceCount()); + } + + @Test + void negativeReferenceCountThrowsException() { + AgentScope scope = tracer.activateSpan(rootSpan); + AgentScope.Continuation continuation = tracer.captureActiveSpan(); + scope.close(); + rootSpan.finish(); + + assertEquals(1, traceCollector.getPendingReferenceCount()); + assertEquals(Arrays.asList(rootSpan), new ArrayList<>(traceCollector.getSpans())); + assertTrue(writer.isEmpty()); + // continuation is finished the first time + continuation.cancel(); + + assertEquals(0, traceCollector.getPendingReferenceCount()); + assertTrue(traceCollector.getSpans().isEmpty()); + assertEquals(Arrays.asList(Arrays.asList(rootSpan)), new ArrayList<>(writer)); + assertEquals(1, writer.getTraceCount()); + // continuation is finished the second time + // Yes this should be guarded by the used flag in the continuation, + // so remove it anyway to trigger the exception + assertThrows( + IllegalStateException.class, () -> traceCollector.removeContinuation(continuation)); + } +} diff --git a/dd-trace-core/src/test/java/datadog/trace/core/PendingTraceTest.java b/dd-trace-core/src/test/java/datadog/trace/core/PendingTraceTest.java new file mode 100644 index 00000000000..0020fa3a157 --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/core/PendingTraceTest.java @@ -0,0 +1,221 @@ +package datadog.trace.core; + +import static datadog.trace.api.sampling.PrioritySampling.UNSET; +import static datadog.trace.api.sampling.PrioritySampling.USER_KEEP; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import datadog.environment.JavaVirtualMachine; +import datadog.trace.api.DDTraceId; +import datadog.trace.api.datastreams.NoopPathwayContext; +import datadog.trace.api.time.TimeSource; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.core.monitor.HealthMetrics; +import datadog.trace.core.propagation.PropagationTags; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeoutException; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +@Timeout(60) +public class PendingTraceTest extends PendingTraceTestBase { + + @BeforeAll + static void checkJvm() { + Assumptions.assumeFalse( + JavaVirtualMachine.isOracleJDK8(), + "Oracle JDK 1.8 did not merge the fix in JDK-8058322, leading to the JVM failing to" + + " correctly extract method parameters without args, when the code is compiled on a" + + " later JDK (targeting 8). This can manifest when creating mocks."); + } + + @Override + protected boolean useStrictTraceWrites() { + // This tests the behavior of the relaxed pending trace implementation + return false; + } + + private DDSpan createSimpleSpan(PendingTrace trace) { + return createSimpleSpanWithID(trace, 1); + } + + private DDSpan createSimpleSpanWithID(PendingTrace trace, long id) { + return new DDSpan( + "test", + 0L, + new DDSpanContext( + DDTraceId.from(1), + id, + 0, + null, + "", + "", + "", + UNSET, + "", + Collections.emptyMap(), + false, + "", + 0, + trace, + null, + null, + NoopPathwayContext.INSTANCE, + false, + PropagationTags.factory().empty()), + null); + } + + @Test + void traceStillReportedWhenUnfinishedContinuationDiscarded() + throws InterruptedException, TimeoutException { + AgentScope scope = tracer.activateSpan(rootSpan); + tracer.captureActiveSpan(); + scope.close(); + + rootSpan.finish(); + + assertEquals(1, traceCollector.getPendingReferenceCount()); + assertEquals(Arrays.asList(rootSpan), new ArrayList<>(traceCollector.getSpans())); + assertTrue(writer.isEmpty()); + // root span buffer delay expires + writer.waitForTraces(1); + + assertEquals(1, traceCollector.getPendingReferenceCount()); + assertTrue(traceCollector.getSpans().isEmpty()); + assertEquals(Arrays.asList(Arrays.asList(rootSpan)), new ArrayList<>(writer)); + assertEquals(1, writer.getTraceCount()); + } + + @Test + void verifyHealthMetricsCalled() { + CoreTracer stubTracer = mock(CoreTracer.class); + CoreTracer.ConfigSnapshot traceConfig = mock(CoreTracer.ConfigSnapshot.class); + PendingTraceBuffer buffer = mock(PendingTraceBuffer.class); + HealthMetrics healthMetrics = mock(HealthMetrics.class); + + when(stubTracer.captureTraceConfig()).thenReturn(traceConfig); + when(traceConfig.getServiceMapping()).thenReturn(Collections.emptyMap()); + + PendingTrace trace = + new PendingTrace( + stubTracer, + DDTraceId.from(0), + buffer, + mock(TimeSource.class), + null, + false, + healthMetrics); + + DDSpan span = createSimpleSpan(trace); + trace.registerSpan(span); + + verify(healthMetrics, times(1)).onCreateSpan(); + + span.finish(); + + verify(healthMetrics, times(1)).onCreateTrace(); + } + + @Test + void writeWhenRunningSpansDisabledOnlyCompletedSpansWritten() { + CoreTracer stubTracer = mock(CoreTracer.class); + CoreTracer.ConfigSnapshot traceConfig = mock(CoreTracer.ConfigSnapshot.class); + PendingTraceBuffer buffer = mock(PendingTraceBuffer.class); + HealthMetrics healthMetrics = mock(HealthMetrics.class); + + when(stubTracer.captureTraceConfig()).thenReturn(traceConfig); + when(traceConfig.getServiceMapping()).thenReturn(Collections.emptyMap()); + when(buffer.longRunningSpansEnabled()).thenReturn(true); + + PendingTrace trace = + new PendingTrace( + stubTracer, + DDTraceId.from(0), + buffer, + mock(TimeSource.class), + null, + false, + healthMetrics); + + DDSpan span1 = createSimpleSpanWithID(trace, 39); + span1.setDurationNano(31); + span1.setSamplingPriority(USER_KEEP); + trace.registerSpan(span1); + + DDSpan unfinishedSpan = createSimpleSpanWithID(trace, 191); + trace.registerSpan(unfinishedSpan); + + DDSpan span2 = createSimpleSpanWithID(trace, 9999); + span2.setDurationNano(9191); + trace.registerSpan(span2); + + List traceToWrite = new ArrayList<>(0); + int completedSpans = trace.enqueueSpansToWrite(traceToWrite, false); + + assertEquals(2, completedSpans); + assertEquals(2, traceToWrite.size()); + assertTrue(traceToWrite.containsAll(Arrays.asList(span1, span2))); + assertEquals(1, trace.getSpans().size()); + assertEquals(unfinishedSpan, trace.getSpans().iterator().next()); + } + + @Test + void writeWhenRunningSpansEnabledCompleteAndRunningSpansWritten() { + CoreTracer stubTracer = mock(CoreTracer.class); + CoreTracer.ConfigSnapshot traceConfig = mock(CoreTracer.ConfigSnapshot.class); + PendingTraceBuffer buffer = mock(PendingTraceBuffer.class); + HealthMetrics healthMetrics = mock(HealthMetrics.class); + + when(stubTracer.captureTraceConfig()).thenReturn(traceConfig); + when(traceConfig.getServiceMapping()).thenReturn(Collections.emptyMap()); + when(buffer.longRunningSpansEnabled()).thenReturn(true); + + PendingTrace trace = + new PendingTrace( + stubTracer, + DDTraceId.from(0), + buffer, + mock(TimeSource.class), + null, + false, + healthMetrics); + + DDSpan span1 = createSimpleSpanWithID(trace, 39); + span1.setDurationNano(31); + span1.setSamplingPriority(USER_KEEP); + trace.registerSpan(span1); + + DDSpan unfinishedSpan = createSimpleSpanWithID(trace, 191); + trace.registerSpan(unfinishedSpan); + + DDSpan span2 = createSimpleSpanWithID(trace, 9999); + span2.setServiceName("9191"); + span2.setDurationNano(9191); + trace.registerSpan(span2); + + DDSpan unfinishedSpan2 = createSimpleSpanWithID(trace, 77771); + trace.registerSpan(unfinishedSpan2); + + List traceToWrite = new ArrayList<>(0); + int completedSpans = trace.enqueueSpansToWrite(traceToWrite, true); + + assertEquals(2, completedSpans); + assertEquals(4, traceToWrite.size()); + assertTrue( + traceToWrite.containsAll(Arrays.asList(span1, span2, unfinishedSpan, unfinishedSpan2))); + assertEquals(2, trace.getSpans().size()); + assertTrue( + new ArrayList<>(trace.getSpans()) + .containsAll(Arrays.asList(unfinishedSpan, unfinishedSpan2))); + } +} diff --git a/dd-trace-core/src/test/java/datadog/trace/core/PendingTraceTestBase.java b/dd-trace-core/src/test/java/datadog/trace/core/PendingTraceTestBase.java new file mode 100644 index 00000000000..c6b5f5a54af --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/core/PendingTraceTestBase.java @@ -0,0 +1,285 @@ +package datadog.trace.core; + +import static datadog.trace.api.config.TracerConfig.PARTIAL_FLUSH_MIN_SPANS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import datadog.trace.common.writer.ListWriter; +import datadog.trace.junit.utils.config.WithConfig; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.LoggerFactory; +import org.tabletest.junit.TableTest; + +public abstract class PendingTraceTestBase extends DDCoreJavaSpecification { + + protected ListWriter writer; + protected CoreTracer tracer; + protected DDSpan rootSpan; + protected PendingTrace traceCollector; + + @BeforeEach + void setup() throws Exception { + writer = new ListWriter(); + tracer = tracerBuilder().writer(writer).build(); + rootSpan = (DDSpan) tracer.buildSpan("datadog", "fakeOperation").start(); + traceCollector = (PendingTrace) rootSpan.context().getTraceCollector(); + + assertEquals(0, traceCollector.size()); + assertEquals(1, traceCollector.getPendingReferenceCount()); + assertFalse(traceCollector.isRootSpanWritten()); + } + + @AfterEach + void cleanup() { + if (tracer != null) { + tracer.close(); + } + } + + @Test + void singleSpanWrittenWhenFinished() throws InterruptedException, TimeoutException { + rootSpan.finish(); + writer.waitForTraces(1); + + assertTrue(traceCollector.getSpans().isEmpty()); + assertEquals(Arrays.asList(Arrays.asList(rootSpan)), new ArrayList<>(writer)); + assertEquals(1, writer.getTraceCount()); + } + + @Test + void childFinishesBeforeParent() throws InterruptedException, TimeoutException { + DDSpan child = + (DDSpan) tracer.buildSpan("datadog", "child").asChildOf(rootSpan.context()).start(); + + assertEquals(2, traceCollector.getPendingReferenceCount()); + + child.finish(); + + assertEquals(1, traceCollector.getPendingReferenceCount()); + assertEquals(Arrays.asList(child), new ArrayList<>(traceCollector.getSpans())); + assertTrue(writer.isEmpty()); + + rootSpan.finish(); + writer.waitForTraces(1); + + assertEquals(0, traceCollector.getPendingReferenceCount()); + assertTrue(traceCollector.getSpans().isEmpty()); + assertEquals(Arrays.asList(Arrays.asList(rootSpan, child)), new ArrayList<>(writer)); + assertEquals(1, writer.getTraceCount()); + } + + @Test + void parentFinishesBeforeChild() throws InterruptedException, TimeoutException { + DDSpan child = + (DDSpan) tracer.buildSpan("datadog", "child").asChildOf(rootSpan.context()).start(); + + assertEquals(2, traceCollector.getPendingReferenceCount()); + + rootSpan.finish(); + + assertEquals(1, traceCollector.getPendingReferenceCount()); + assertEquals(Arrays.asList(rootSpan), new ArrayList<>(traceCollector.getSpans())); + assertTrue(writer.isEmpty()); + + child.finish(); + writer.waitForTraces(1); + + assertEquals(0, traceCollector.getPendingReferenceCount()); + assertTrue(traceCollector.getSpans().isEmpty()); + assertEquals(Arrays.asList(Arrays.asList(child, rootSpan)), new ArrayList<>(writer)); + assertEquals(1, writer.getTraceCount()); + } + + @Test + void childSpansCreatedAfterWrittenReportedSeparately() + throws InterruptedException, TimeoutException { + rootSpan.finish(); + // this shouldn't happen, but it's possible users of the api + // may incorrectly add spans after the trace is reported. + // in those cases we should still decrement the pending trace count + DDSpan childSpan = + (DDSpan) tracer.buildSpan("datadog", "child").asChildOf(rootSpan.context()).start(); + childSpan.finish(); + writer.waitForTraces(2); + + assertEquals(0, traceCollector.getPendingReferenceCount()); + assertTrue(traceCollector.getSpans().isEmpty()); + assertEquals( + Arrays.asList(Arrays.asList(rootSpan), Arrays.asList(childSpan)), new ArrayList<>(writer)); + } + + @Test + void testGetCurrentTimeNano() { + long diffSeconds = + Math.abs( + TimeUnit.NANOSECONDS.toSeconds(traceCollector.getCurrentTimeNano()) + - TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())); + // Generous 5 seconds to execute this test + assertTrue(diffSeconds < 5, "Expected time difference < 5 seconds, got: " + diffSeconds); + } + + @Test + @WithConfig(key = PARTIAL_FLUSH_MIN_SPANS, value = "2") + void partialFlush() throws InterruptedException, TimeoutException { + CoreTracer quickTracer = tracerBuilder().writer(writer).build(); + try { + DDSpan localRoot = (DDSpan) quickTracer.buildSpan("datadog", "root").start(); + PendingTrace trace = (PendingTrace) localRoot.context().getTraceCollector(); + DDSpan child1 = + (DDSpan) + quickTracer.buildSpan("datadog", "child1").asChildOf(localRoot.context()).start(); + DDSpan child2 = + (DDSpan) + quickTracer.buildSpan("datadog", "child2").asChildOf(localRoot.context()).start(); + + assertEquals(3, trace.getPendingReferenceCount()); + + child2.finish(); + + assertEquals(2, trace.getPendingReferenceCount()); + assertEquals(Arrays.asList(child2), new ArrayList<>(trace.getSpans())); + assertTrue(writer.isEmpty()); + assertEquals(0, writer.getTraceCount()); + + child1.finish(); + writer.waitForTraces(1); + + assertEquals(1, trace.getPendingReferenceCount()); + assertEquals(Arrays.asList(), new ArrayList<>(trace.getSpans())); + assertEquals(Arrays.asList(Arrays.asList(child1, child2)), new ArrayList<>(writer)); + assertEquals(1, writer.getTraceCount()); + + localRoot.finish(); + writer.waitForTraces(2); + + assertEquals(0, trace.getPendingReferenceCount()); + assertTrue(trace.getSpans().isEmpty()); + assertEquals( + Arrays.asList(Arrays.asList(child1, child2), Arrays.asList(localRoot)), + new ArrayList<>(writer)); + assertEquals(2, writer.getTraceCount()); + } finally { + quickTracer.close(); + } + } + + @Test + @WithConfig(key = PARTIAL_FLUSH_MIN_SPANS, value = "2") + void partialFlushWithRootSpanClosedLast() throws InterruptedException, TimeoutException { + CoreTracer quickTracer = tracerBuilder().writer(writer).build(); + try { + DDSpan localRoot = (DDSpan) quickTracer.buildSpan("datadog", "root").start(); + PendingTrace trace = (PendingTrace) localRoot.context().getTraceCollector(); + DDSpan child1 = + (DDSpan) + quickTracer.buildSpan("datadog", "child1").asChildOf(localRoot.context()).start(); + DDSpan child2 = + (DDSpan) + quickTracer.buildSpan("datadog", "child2").asChildOf(localRoot.context()).start(); + + assertEquals(3, trace.getPendingReferenceCount()); + + child1.finish(); + + assertEquals(2, trace.getPendingReferenceCount()); + assertEquals(Arrays.asList(child1), new ArrayList<>(trace.getSpans())); + assertTrue(writer.isEmpty()); + assertEquals(0, writer.getTraceCount()); + + child2.finish(); + writer.waitForTraces(1); + + assertEquals(1, trace.getPendingReferenceCount()); + assertTrue(trace.getSpans().isEmpty()); + assertEquals(Arrays.asList(Arrays.asList(child2, child1)), new ArrayList<>(writer)); + assertEquals(1, writer.getTraceCount()); + + localRoot.finish(); + writer.waitForTraces(2); + + assertEquals(0, trace.getPendingReferenceCount()); + assertTrue(trace.getSpans().isEmpty()); + assertEquals( + Arrays.asList(Arrays.asList(child2, child1), Arrays.asList(localRoot)), + new ArrayList<>(writer)); + assertEquals(2, writer.getTraceCount()); + } finally { + quickTracer.close(); + } + } + + // spotless:off + @TableTest({ + "scenario | threadCount | spanCount", + "1 thread 1 span | 1 | 1 ", + "2 threads 1 span | 2 | 1 ", + "1 thread 2 spans | 1 | 2 ", + // Sufficiently large to fill the buffer: + "5 threads 2000 | 5 | 2000 ", + "10 threads 1000 | 10 | 1000 ", + "50 threads 500 | 50 | 500 " + }) + // spotless:on + void partialFlushConcurrencyTest(int threadCount, int spanCount) + throws InterruptedException, TimeoutException { + // reduce logging noise + Logger logger = (Logger) LoggerFactory.getLogger("datadog.trace"); + Level previousLevel = logger.getLevel(); + logger.setLevel(Level.OFF); + try { + CountDownLatch latch = new CountDownLatch(1); + DDSpan localRoot = (DDSpan) tracer.buildSpan("test", "root").start(); + PendingTrace localTraceCollector = (PendingTrace) localRoot.context().getTraceCollector(); + List exceptions = new ArrayList<>(); + + List threads = new ArrayList<>(threadCount); + for (int t = 0; t < threadCount; t++) { + Thread thread = + new Thread( + () -> { + try { + latch.await(); + List spans = new ArrayList<>(spanCount); + for (int s = 0; s < spanCount; s++) { + spans.add((DDSpan) tracer.startSpan("test", "child", localRoot.context())); + } + for (DDSpan span : spans) { + span.finish(); + } + } catch (Throwable ex) { + exceptions.add(ex); + } + }); + thread.start(); + threads.add(thread); + } + // Finish root span so other spans are queued automatically + localRoot.finish(); + writer.waitForTraces(1); + + latch.countDown(); + for (Thread thread : threads) { + thread.join(); + } + localTraceCollector.getPendingTraceBuffer().flush(); + + assertTrue(exceptions.isEmpty(), "Exceptions in worker threads: " + exceptions); + assertEquals(0, localTraceCollector.getPendingReferenceCount()); + int totalSpans = writer.stream().mapToInt(List::size).sum(); + assertEquals(threadCount * spanCount + 1, totalSpans); + } finally { + logger.setLevel(previousLevel); + } + } +} diff --git a/dd-trace-core/src/test/java/datadog/trace/core/TraceInterceptorTest.java b/dd-trace-core/src/test/java/datadog/trace/core/TraceInterceptorTest.java new file mode 100644 index 00000000000..0d6f6788ca6 --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/core/TraceInterceptorTest.java @@ -0,0 +1,251 @@ +package datadog.trace.core; + +import static java.util.Collections.emptyList; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import datadog.trace.TestInterceptor; +import datadog.trace.api.GlobalTracer; +import datadog.trace.api.TagMap; +import datadog.trace.api.config.TracerConfig; +import datadog.trace.api.interceptor.MutableSpan; +import datadog.trace.api.interceptor.TraceInterceptor; +import datadog.trace.common.writer.ListWriter; +import datadog.trace.junit.utils.config.WithConfig; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.tabletest.junit.TableTest; + +@Timeout(10) +@WithConfig(key = TracerConfig.TRACE_GIT_METADATA_ENABLED, value = "false") +public class TraceInterceptorTest extends DDCoreJavaSpecification { + + private ListWriter writer; + private CoreTracer tracer; + + @BeforeEach + void setup() { + writer = new ListWriter(); + tracer = tracerBuilder().writer(writer).build(); + } + + @AfterEach + void cleanup() { + if (tracer != null) { + tracer.close(); + } + } + + @Test + void interceptorIsRegisteredAsService() { + assertInstanceOf(TestInterceptor.class, tracer.getInterceptors().interceptors()[0]); + } + + @Test + void interceptorsWithSamePriorityReplaced() { + int priority = 999; + TestInterceptor.priority = priority; + tracer + .getInterceptors() + .add( + new TraceInterceptor() { + @Override + public Collection onTraceComplete( + Collection trace) { + return emptyList(); + } + + @Override + public int priority() { + return priority; + } + }); + + TraceInterceptor[] interceptors = tracer.getInterceptors().interceptors(); + assertEquals(1, interceptors.length); + assertInstanceOf(TestInterceptor.class, interceptors[0]); + } + + @TableTest({ + "scenario | score | reverse", + "lower than existing | -1 | false ", + "higher than existing | 1000 | true " + }) + void interceptorsWithDifferentPrioritySorted(int score, boolean reverse) { + TraceInterceptor existingInterceptor = tracer.getInterceptors().interceptors()[0]; + TraceInterceptor newInterceptor = + new TraceInterceptor() { + @Override + public Collection onTraceComplete( + Collection trace) { + return emptyList(); + } + + @Override + public int priority() { + return score; + } + }; + tracer.getInterceptors().add(newInterceptor); + + List sorted = Arrays.asList(tracer.getInterceptors().interceptors()); + assertEquals(2, sorted.size()); + if (reverse) { + assertEquals(existingInterceptor, sorted.get(0)); + assertEquals(newInterceptor, sorted.get(1)); + } else { + assertEquals(newInterceptor, sorted.get(0)); + assertEquals(existingInterceptor, sorted.get(1)); + } + } + + @TableTest({ + "scenario | deltaPriority | expectedSize", + "below priority | -1 | 2 ", + "same priority | 0 | 1 ", + "above priority | 1 | 2 " + }) + void interceptorCanDiscardTrace(int deltaPriority, int expectedSize) + throws InterruptedException, TimeoutException { + int score = TestInterceptor.priority + deltaPriority; + AtomicBoolean called = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(1); + tracer + .getInterceptors() + .add( + new TraceInterceptor() { + @Override + public Collection onTraceComplete( + Collection trace) { + called.set(true); + latch.countDown(); + return emptyList(); + } + + @Override + public int priority() { + return score; + } + }); + + tracer.buildSpan("datadog", "test " + score).start().finish(); + if (score == TestInterceptor.priority) { + writer.waitForTraces(1); + } else { + latch.await(5, TimeUnit.SECONDS); + } + + TraceInterceptor[] interceptors = tracer.getInterceptors().interceptors(); + assertEquals(expectedSize, interceptors.length); + assertEquals(score != TestInterceptor.priority, called.get()); + assertEquals(score != TestInterceptor.priority, writer.isEmpty()); + } + + @Test + void interceptorCanModifySpan() throws InterruptedException, TimeoutException { + tracer + .getInterceptors() + .add( + new TraceInterceptor() { + @Override + public Collection onTraceComplete( + Collection trace) { + for (MutableSpan span : trace) { + span.setOperationName("modifiedON-" + span.getOperationName()) + .setServiceName("modifiedSN-" + span.getServiceName()) + .setResourceName("modifiedRN-" + span.getResourceName()) + .setSpanType("modifiedST-" + span.getSpanType()) + .setTag("boolean-tag", true) + .setTag("number-tag", 5.0) + .setTag("string-tag", "howdy") + .setError(true); + } + return trace; + } + + @Override + public int priority() { + return 1; + } + }); + + tracer.buildSpan("datadog", "test").start().finish(); + writer.waitForTraces(1); + + List trace = writer.firstTrace(); + assertEquals(1, trace.size()); + + DDSpan span = trace.get(0); + assertEquals("modifiedON-test", span.context().getOperationName().toString()); + assertTrue(span.getServiceName().startsWith("modifiedSN-")); + assertEquals("modifiedRN-modifiedON-test", span.getResourceName().toString()); + assertEquals("modifiedST-null", span.getSpanType()); + assertTrue(span.context().getErrorFlag()); + + TagMap tags = span.context().getTags(); + assertEquals(true, tags.get("boolean-tag")); + assertEquals(5.0, tags.get("number-tag")); + assertEquals("howdy", tags.get("string-tag")); + assertNotNull(tags.get("thread.name")); + assertNotNull(tags.get("thread.id")); + assertNotNull(tags.get("runtime-id")); + assertNotNull(tags.get("language")); + assertTrue(tags.size() >= 7); + } + + @Test + void robustWhenInterceptorReturnsNull() { + tracer + .getInterceptors() + .add( + new TraceInterceptor() { + @Override + public Collection onTraceComplete( + Collection trace) { + return null; + } + + @Override + public int priority() { + return 0; + } + }); + + DDSpan span = (DDSpan) tracer.startSpan("test", "test"); + span.phasedFinish(); + assertDoesNotThrow(() -> tracer.write(SpanList.of(span))); + } + + @Test + void registerInterceptorThroughBridge() { + GlobalTracer.registerIfAbsent(tracer); + TraceInterceptor interceptor = + new TraceInterceptor() { + @Override + public Collection onTraceComplete( + Collection trace) { + return trace; + } + + @Override + public int priority() { + return 38; + } + }; + + assertTrue(GlobalTracer.get().addTraceInterceptor(interceptor)); + assertTrue(Arrays.asList(tracer.getInterceptors().interceptors()).contains(interceptor)); + } +} diff --git a/dd-trace-core/src/test/java/datadog/trace/core/TracingConfigPollerTest.java b/dd-trace-core/src/test/java/datadog/trace/core/TracingConfigPollerTest.java new file mode 100644 index 00000000000..a5105d326c9 --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/core/TracingConfigPollerTest.java @@ -0,0 +1,301 @@ +package datadog.trace.core; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import datadog.communication.ddagent.DDAgentFeaturesDiscovery; +import datadog.communication.ddagent.SharedCommunicationObjects; +import datadog.metrics.api.Monitoring; +import datadog.remoteconfig.ConfigurationPoller; +import datadog.remoteconfig.Product; +import datadog.remoteconfig.state.ParsedConfigKey; +import datadog.remoteconfig.state.ProductListener; +import datadog.trace.api.datastreams.DataStreamsTransactionExtractor; +import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.tabletest.junit.TableTest; + +@Timeout(10) +public class TracingConfigPollerTest extends DDCoreJavaSpecification { + + @Test + void mergeLibConfigsWithNullAndNonNullValues() { + TracingConfigPoller.LibConfig config1 = new TracingConfigPoller.LibConfig(); // all nulls + TracingConfigPoller.LibConfig config2 = new TracingConfigPoller.LibConfig(); + config2.tracingEnabled = true; + config2.debugEnabled = false; + config2.runtimeMetricsEnabled = true; + config2.logsInjectionEnabled = false; + config2.dataStreamsEnabled = true; + config2.traceSampleRate = 0.5; + config2.dynamicInstrumentationEnabled = true; + config2.exceptionReplayEnabled = false; + config2.codeOriginEnabled = true; + config2.liveDebuggingEnabled = false; + + TracingConfigPoller.LibConfig config3 = new TracingConfigPoller.LibConfig(); + config3.tracingEnabled = false; + config3.debugEnabled = true; + config3.runtimeMetricsEnabled = false; + config3.logsInjectionEnabled = true; + config3.dataStreamsEnabled = false; + config3.traceSampleRate = 0.8; + config3.dynamicInstrumentationEnabled = false; + config3.exceptionReplayEnabled = true; + config3.codeOriginEnabled = false; + config3.liveDebuggingEnabled = true; + + TracingConfigPoller.LibConfig merged = + TracingConfigPoller.LibConfig.mergeLibConfigs(Arrays.asList(config1, config2, config3)); + + assertNotNull(merged); + // Should take first non-null values from config2 + assertEquals(Boolean.TRUE, merged.tracingEnabled); + assertEquals(Boolean.FALSE, merged.debugEnabled); + assertEquals(Boolean.TRUE, merged.runtimeMetricsEnabled); + assertEquals(Boolean.FALSE, merged.logsInjectionEnabled); + assertEquals(Boolean.TRUE, merged.dataStreamsEnabled); + assertEquals(0.5, merged.traceSampleRate); + assertEquals(Boolean.TRUE, merged.dynamicInstrumentationEnabled); + assertEquals(Boolean.FALSE, merged.exceptionReplayEnabled); + assertEquals(Boolean.TRUE, merged.codeOriginEnabled); + assertEquals(Boolean.FALSE, merged.liveDebuggingEnabled); + } + + @TableTest({ + "scenario | service | env | clusterName | expectedPriority", + "service and env | test-service | staging | | 5 ", + "service and wildcard | test-service | * | | 4 ", + "wildcard and env | * | staging | | 3 ", + "cluster target | | | test-cluster | 2 ", + "wildcard org level | * | * | | 1 " + }) + void configPriorityCalculation( + String service, String env, String clusterName, int expectedPriority) { + TracingConfigPoller.ConfigOverrides configOverrides = new TracingConfigPoller.ConfigOverrides(); + if (service != null || env != null) { + configOverrides.serviceTarget = new TracingConfigPoller.ServiceTarget(); + configOverrides.serviceTarget.service = service; + configOverrides.serviceTarget.env = env; + } + if (clusterName != null) { + TracingConfigPoller.ClusterTarget clusterTarget = new TracingConfigPoller.ClusterTarget(); + clusterTarget.clusterName = clusterName; + clusterTarget.enabled = true; + configOverrides.k8sTargetV2 = new TracingConfigPoller.K8sTargetV2(); + configOverrides.k8sTargetV2.clusterTargets = Collections.singletonList(clusterTarget); + } + configOverrides.libConfig = new TracingConfigPoller.LibConfig(); + + assertEquals(expectedPriority, configOverrides.getOverridePriority()); + } + + @Test + void actualConfigCommitWithServiceAndOrgLevelConfigs() throws Exception { + ParsedConfigKey orgKey = ParsedConfigKey.parse("datadog/2/APM_TRACING/org_config/config"); + ParsedConfigKey serviceKey = + ParsedConfigKey.parse("datadog/2/APM_TRACING/service_config/config"); + ConfigurationPoller poller = mock(ConfigurationPoller.class); + SharedCommunicationObjects sco = createScoWithPoller(poller); + + ProductListener[] capturedUpdater = {null}; + doAnswer( + inv -> { + // capture config updater for further testing + capturedUpdater[0] = inv.getArgument(1, ProductListener.class); + return null; + }) + .when(poller) + .addListener(eq(Product.APM_TRACING), any(ProductListener.class)); + + CoreTracer tracer = + CoreTracer.builder().sharedCommunicationObjects(sco).pollForTracingConfiguration().build(); + unclosedTracers.add(tracer); + + try { + verify(poller).addListener(eq(Product.APM_TRACING), any(ProductListener.class)); + assertNotNull(capturedUpdater[0]); + assertEquals(Collections.emptyMap(), tracer.captureTraceConfig().getServiceMapping()); + assertNull(tracer.captureTraceConfig().getTraceSampleRate()); + + ProductListener updater = capturedUpdater[0]; + // Add org level config (priority 1) - should set service mapping + updater.accept( + orgKey, + ("{\n" + + " \"service_target\": {\n" + + " \"service\": \"*\",\n" + + " \"env\": \"*\"\n" + + " },\n" + + " \"lib_config\": {\n" + + " \"tracing_service_mapping\": [{\n" + + " \"from_key\": \"org-service\",\n" + + " \"to_name\": \"org-mapped\"\n" + + " }],\n" + + " \"tracing_sampling_rate\": 0.7\n" + + " }\n" + + "}") + .getBytes(StandardCharsets.UTF_8), + null); + // Add service level config (priority 4) - should override service mapping and add header tags + updater.accept( + serviceKey, + ("{\n" + + " \"service_target\": {\n" + + " \"service\": \"test-service\",\n" + + " \"env\": \"*\"\n" + + " },\n" + + " \"lib_config\": {\n" + + " \"tracing_service_mapping\": [{\n" + + " \"from_key\": \"service-specific\",\n" + + " \"to_name\": \"service-mapped\"\n" + + " }],\n" + + " \"tracing_header_tags\": [{\n" + + " \"header\": \"X-Custom-Header\",\n" + + " \"tag_name\": \"custom.header\"\n" + + " }],\n" + + " \"tracing_sampling_rate\": 1.3,\n" + + " \"data_streams_transaction_extractors\": [{\n" + + " \"name\": \"test\",\n" + + " \"type\": \"unknown\",\n" + + " \"value\": \"value\"\n" + + " }]\n" + + " }\n" + + "}") + .getBytes(StandardCharsets.UTF_8), + null); + // Commit both configs + updater.commit(null); + // Service level config should take precedence due to higher priority (4 vs 1) + assertEquals( + Collections.singletonMap("service-specific", "service-mapped"), + tracer.captureTraceConfig().getServiceMapping()); + assertEquals(1.0, tracer.captureTraceConfig().getTraceSampleRate()); + assertEquals( + Collections.singletonMap("x-custom-header", "custom.header"), + tracer.captureTraceConfig().getRequestHeaderTags()); + assertEquals( + Collections.singletonMap("x-custom-header", "custom.header"), + tracer.captureTraceConfig().getResponseHeaderTags()); + List extractors = + tracer.captureTraceConfig().getDataStreamsTransactionExtractors(); + assertEquals(1, extractors.size()); + assertEquals("test", extractors.get(0).getName()); + assertEquals(DataStreamsTransactionExtractor.Type.UNKNOWN, extractors.get(0).getType()); + assertEquals("value", extractors.get(0).getValue()); + // Remove service level config + updater.remove(serviceKey, null); + updater.commit(null); + // Should fall back to org level config + assertEquals( + Collections.singletonMap("org-service", "org-mapped"), + tracer.captureTraceConfig().getServiceMapping()); + assertEquals(0.7, tracer.captureTraceConfig().getTraceSampleRate()); + assertEquals(Collections.emptyMap(), tracer.captureTraceConfig().getRequestHeaderTags()); + assertEquals(Collections.emptyMap(), tracer.captureTraceConfig().getResponseHeaderTags()); + // Remove org level config + updater.remove(orgKey, null); + updater.commit(null); + // Should have no configs + assertEquals(Collections.emptyMap(), tracer.captureTraceConfig().getServiceMapping()); + assertNull(tracer.captureTraceConfig().getTraceSampleRate()); + } finally { + tracer.close(); + } + } + + @Test + void twoOrgLevelsConfigSettingDifferentFlagsWorks() throws Exception { + ParsedConfigKey orgConfig1Key = + ParsedConfigKey.parse("datadog/2/APM_TRACING/org_config/config1"); + ParsedConfigKey orgConfig2Key = + ParsedConfigKey.parse("datadog/2/APM_TRACING/org_config/config2"); + ConfigurationPoller poller = mock(ConfigurationPoller.class); + SharedCommunicationObjects sco = createScoWithPoller(poller); + + ProductListener[] capturedUpdater = {null}; + doAnswer( + inv -> { + // capture config updater for further testing + capturedUpdater[0] = inv.getArgument(1, ProductListener.class); + return null; + }) + .when(poller) + .addListener(eq(Product.APM_TRACING), any(ProductListener.class)); + + CoreTracer tracer = + CoreTracer.builder().sharedCommunicationObjects(sco).pollForTracingConfiguration().build(); + unclosedTracers.add(tracer); + + try { + verify(poller).addListener(eq(Product.APM_TRACING), any(ProductListener.class)); + assertTrue(tracer.captureTraceConfig().isTraceEnabled()); + assertFalse(tracer.captureTraceConfig().isDataStreamsEnabled()); + + ProductListener updater = capturedUpdater[0]; + // Add org level config with ApmTracing enabled + updater.accept( + orgConfig1Key, + ("{\n" + + " \"service_target\": {\n" + + " \"service\": \"*\",\n" + + " \"env\": \"*\"\n" + + " },\n" + + " \"lib_config\": {\n" + + " \"tracing_enabled\": true\n" + + " }\n" + + "}") + .getBytes(StandardCharsets.UTF_8), + null); + // Add second org level config with DataStreams enabled + updater.accept( + orgConfig2Key, + ("{\n" + + " \"service_target\": {\n" + + " \"service\": \"*\",\n" + + " \"env\": \"*\"\n" + + " },\n" + + " \"lib_config\": {\n" + + " \"data_streams_enabled\": true\n" + + " }\n" + + "}") + .getBytes(StandardCharsets.UTF_8), + null); + // Commit both configs + updater.commit(null); + // Both org level configs should be merged, with data streams enabled + assertTrue(tracer.captureTraceConfig().isTraceEnabled()); + assertTrue(tracer.captureTraceConfig().isDataStreamsEnabled()); + } finally { + tracer.close(); + } + } + + private SharedCommunicationObjects createScoWithPoller(ConfigurationPoller poller) + throws Exception { + SharedCommunicationObjects sco = new SharedCommunicationObjects(); + sco.agentHttpClient = mock(OkHttpClient.class); + sco.monitoring = mock(Monitoring.class); + sco.agentUrl = HttpUrl.get("https://example.com"); + sco.setFeaturesDiscovery(mock(DDAgentFeaturesDiscovery.class)); + Field pollerField = SharedCommunicationObjects.class.getDeclaredField("configurationPoller"); + pollerField.setAccessible(true); + pollerField.set(sco, poller); + return sco; + } +}