diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java new file mode 100644 index 00000000000..f407167be37 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java @@ -0,0 +1,433 @@ +package datadog.trace.common.metrics; + +import static datadog.trace.bootstrap.instrumentation.api.UTF8BytesString.EMPTY; + +import datadog.metrics.api.Histogram; +import datadog.trace.api.Pair; +import datadog.trace.api.cache.DDCache; +import datadog.trace.api.cache.DDCaches; +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import datadog.trace.util.Hashtable; +import datadog.trace.util.LongHashingUtils; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; +import javax.annotation.Nullable; + +/** + * Hashtable entry for the consumer-side aggregator. Holds the UTF8-encoded label fields that {@link + * SerializingMetricWriter} writes to the wire plus the mutable counter/histogram state for the key. + * + *

{@link #matches(SpanSnapshot)} compares the entry's stored UTF8 forms against the snapshot's + * raw {@code CharSequence}/{@code String}/{@code String[]} fields via content-equality, so {@code + * String} vs {@code UTF8BytesString} mixing on the same logical key collapses into one entry + * instead of splitting. + * + *

The static UTF8 caches that used to live on {@code MetricKey} and {@code + * ConflatingMetricsAggregator} are consolidated here. + * + *

Deliberate cohesion. This class concentrates five responsibilities -- the static UTF8 + * caches, the canonicalized label fields, the raw {@code peerTagNames}/{@code peerTagValues} arrays + * used by {@link #matches}, the pre-encoded {@code peerTags} list used by the serializer, and the + * mutable counter/histogram aggregate state -- on a single object. The prior design split the label + * fields and aggregate state across separate {@code MetricKey} and {@code AggregateMetric} + * instances, allocating both per unique key on miss; folding them yields one allocation per unique + * key. The class is wider than its predecessors as a result, but that's the trade we explicitly + * chose. + * + *

Required vs optional field absence. Required label fields ({@code resource}, {@code + * service}, {@code operationName}, {@code type}, {@code spanKind}) canonicalize a {@code null} + * snapshot value into {@link UTF8BytesString#EMPTY} via {@link #canonicalize} -- they are never + * {@code null} on a constructed entry. Optional label fields ({@code serviceSource}, {@code + * httpMethod}, {@code httpEndpoint}, {@code grpcStatusCode}) stay {@code null} on the entry when + * the snapshot value was {@code null}; the serializer uses {@code != null} to decide whether to + * emit them on the wire. {@link #contentEquals} treats {@code null} and length-0 as equivalent so + * {@link #matches} works against either form. + * + *

Not thread-safe. Counter and histogram updates are performed by the single aggregator + * thread; producer threads tag durations via {@link #ERROR_TAG} / {@link #TOP_LEVEL_TAG} bits and + * hand them off through the snapshot inbox. + * + *

Single-writer invariant relies on convention. The aggregator thread is the only mutator + * of this class and of {@link AggregateTable}. Nothing enforces this at runtime -- a stray mutation + * from a different thread (e.g. an HTTP-client callback) would corrupt counters or hashtable chains + * silently. The {@code ClearSignal} routing in {@link Aggregator} is the explicit mechanism for + * funneling cross-thread requests (e.g. {@code disable()}) back onto the aggregator thread; any new + * entry point that mutates aggregate state must do the same. + */ +final class AggregateEntry extends Hashtable.Entry { + + static final long ERROR_TAG = 0x8000000000000000L; + static final long TOP_LEVEL_TAG = 0x4000000000000000L; + + // UTF8 caches consolidated from the previous MetricKey + ConflatingMetricsAggregator split. + private static final DDCache RESOURCE_CACHE = + DDCaches.newFixedSizeCache(32); + private static final DDCache SERVICE_CACHE = + DDCaches.newFixedSizeCache(32); + private static final DDCache OPERATION_CACHE = + DDCaches.newFixedSizeCache(64); + private static final DDCache SERVICE_SOURCE_CACHE = + DDCaches.newFixedSizeCache(16); + private static final DDCache TYPE_CACHE = DDCaches.newFixedSizeCache(8); + private static final DDCache SPAN_KIND_CACHE = + DDCaches.newFixedSizeCache(16); + private static final DDCache HTTP_METHOD_CACHE = + DDCaches.newFixedSizeCache(8); + private static final DDCache HTTP_ENDPOINT_CACHE = + DDCaches.newFixedSizeCache(32); + private static final DDCache GRPC_STATUS_CODE_CACHE = + DDCaches.newFixedSizeCache(32); + + /** + * Outer cache keyed by peer-tag name, with an inner per-name cache keyed by value. The inner + * cache produces the "name:value" encoded form the serializer writes. + */ + private static final DDCache< + String, Pair, Function>> + PEER_TAGS_CACHE = DDCaches.newFixedSizeCache(64); + + private static final Function< + String, Pair, Function>> + PEER_TAGS_CACHE_ADDER = + key -> + Pair.of( + DDCaches.newFixedSizeCache(512), + value -> UTF8BytesString.create(key + ":" + value)); + + private final UTF8BytesString resource; + private final UTF8BytesString service; + private final UTF8BytesString operationName; + @Nullable private final UTF8BytesString serviceSource; + private final UTF8BytesString type; + private final UTF8BytesString spanKind; + @Nullable private final UTF8BytesString httpMethod; + @Nullable private final UTF8BytesString httpEndpoint; + @Nullable private final UTF8BytesString grpcStatusCode; + private final short httpStatusCode; + + /** Whether the root span carried the {@code synthetics} origin tag (synthetic-monitoring run). */ + private final boolean synthetic; + + /** Whether this span is the trace root ({@code parentId == 0}). */ + private final boolean traceRoot; + + // Peer tags carried in two forms: parallel String[] arrays mirroring the snapshot's (schema + + // values) shape for matches(), and pre-encoded List ("name:value") for the + // serializer. peerTagNames is the schema's names array (shared by-reference when the schema + // hasn't been replaced); peerTagValues is the per-span String[] parallel to it. + // + // Package-private so the in-package test helper (AggregateEntryTestUtils) can compare entries + // by raw layout; production access comes from this class's own matches() + constructor. + @Nullable final String[] peerTagNames; + @Nullable final String[] peerTagValues; + private final List peerTags; + + // Mutable aggregate state -- single-thread (consumer/aggregator) writer. + private final Histogram okLatencies = Histogram.newHistogram(); + private final Histogram errorLatencies = Histogram.newHistogram(); + private int errorCount; + private int hitCount; + private int topLevelCount; + private long duration; + + /** Hot-path constructor for the producer/consumer flow. Builds UTF8 fields via the caches. */ + AggregateEntry(SpanSnapshot s, long keyHash) { + super(keyHash); + this.resource = canonicalize(RESOURCE_CACHE, s.resourceName); + this.service = canonicalize(SERVICE_CACHE, s.serviceName); + this.operationName = canonicalize(OPERATION_CACHE, s.operationName); + this.serviceSource = canonicalizeOptional(SERVICE_SOURCE_CACHE, s.serviceNameSource); + this.type = canonicalize(TYPE_CACHE, s.spanType); + this.spanKind = canonicalize(SPAN_KIND_CACHE, s.spanKind); + this.httpMethod = canonicalizeOptional(HTTP_METHOD_CACHE, s.httpMethod); + this.httpEndpoint = canonicalizeOptional(HTTP_ENDPOINT_CACHE, s.httpEndpoint); + this.grpcStatusCode = canonicalizeOptional(GRPC_STATUS_CODE_CACHE, s.grpcStatusCode); + this.httpStatusCode = s.httpStatusCode; + this.synthetic = s.synthetic; + this.traceRoot = s.traceRoot; + this.peerTagNames = s.peerTagSchema == null ? null : s.peerTagSchema.names; + this.peerTagValues = s.peerTagValues; + this.peerTags = materializePeerTags(this.peerTagNames, this.peerTagValues); + } + + /** + * Records a single hit. {@code tagAndDuration} carries the duration nanos with optional {@link + * #ERROR_TAG} / {@link #TOP_LEVEL_TAG} bits OR-ed in. + */ + void recordOneDuration(long tagAndDuration) { + ++hitCount; + if ((tagAndDuration & TOP_LEVEL_TAG) == TOP_LEVEL_TAG) { + tagAndDuration ^= TOP_LEVEL_TAG; + ++topLevelCount; + } + if ((tagAndDuration & ERROR_TAG) == ERROR_TAG) { + tagAndDuration ^= ERROR_TAG; + errorLatencies.accept(tagAndDuration); + ++errorCount; + } else { + okLatencies.accept(tagAndDuration); + } + duration += tagAndDuration; + } + + int getErrorCount() { + return errorCount; + } + + int getHitCount() { + return hitCount; + } + + int getTopLevelCount() { + return topLevelCount; + } + + long getDuration() { + return duration; + } + + Histogram getOkLatencies() { + return okLatencies; + } + + Histogram getErrorLatencies() { + return errorLatencies; + } + + /** + * Resets the per-cycle counters and histograms. Label fields ({@code resource}, {@code service}, + * ..., {@code peerTagNames}, {@code peerTagValues}) are deliberately left intact -- they're the + * entry's bucket identity and must persist so a subsequent snapshot with the same key reuses this + * entry instead of allocating a fresh one. Entries that stay at {@code hitCount == 0} across a + * cycle are reaped by {@link AggregateTable#expungeStaleAggregates}. + */ + void clear() { + this.errorCount = 0; + this.hitCount = 0; + this.topLevelCount = 0; + this.duration = 0; + this.okLatencies.clear(); + this.errorLatencies.clear(); + } + + boolean matches(SpanSnapshot s) { + String[] snapshotNames = s.peerTagSchema == null ? null : s.peerTagSchema.names; + return httpStatusCode == s.httpStatusCode + && synthetic == s.synthetic + && traceRoot == s.traceRoot + && contentEquals(resource, s.resourceName) + && contentEquals(service, s.serviceName) + && contentEquals(operationName, s.operationName) + && contentEquals(serviceSource, s.serviceNameSource) + && contentEquals(type, s.spanType) + && contentEquals(spanKind, s.spanKind) + && Arrays.equals(peerTagNames, snapshotNames) + && Arrays.equals(peerTagValues, s.peerTagValues) + && contentEquals(httpMethod, s.httpMethod) + && contentEquals(httpEndpoint, s.httpEndpoint) + && contentEquals(grpcStatusCode, s.grpcStatusCode); + } + + /** + * Pre-checks {@link #keyHash} against {@code keyHash} before delegating to {@link + * #matches(SpanSnapshot)}. The hash check is cheap and rules out most mismatches without touching + * the field-by-field comparison. + */ + boolean matches(long keyHash, SpanSnapshot s) { + return this.keyHash == keyHash && matches(s); + } + + /** + * Computes the 64-bit lookup hash for a {@link SpanSnapshot}. Chained per-field calls -- no + * varargs / Object[] allocation, no autoboxing on primitive overloads. The constructor's + * super({@code hashOf(s)}) call uses the same function so an entry built from a snapshot hashes + * to the same bucket the snapshot itself looks up. + * + *

Hashes are content-stable across {@code String} / {@code UTF8BytesString}: {@link + * UTF8BytesString#hashCode()} returns the underlying {@code String}'s hash. + */ + static long hashOf(SpanSnapshot s) { + long h = 0; + h = LongHashingUtils.addToHash(h, s.resourceName); + h = LongHashingUtils.addToHash(h, s.serviceName); + h = LongHashingUtils.addToHash(h, s.operationName); + h = LongHashingUtils.addToHash(h, s.serviceNameSource); + h = LongHashingUtils.addToHash(h, s.spanType); + h = LongHashingUtils.addToHash(h, s.httpStatusCode); + h = LongHashingUtils.addToHash(h, s.synthetic); + h = LongHashingUtils.addToHash(h, s.traceRoot); + h = LongHashingUtils.addToHash(h, s.spanKind); + // Always mix in both the schema's content hash and the values' content hash, unconditionally + // (no null-skip). Arrays.hashCode is content-based for both String[]s; the default + // Object[].hashCode is identity-based, which would let two snapshots with content-equal but + // distinct PeerTagSchema instances hash to different buckets. Null inputs hash to 0 here, + // distinct from {@code Arrays.hashCode(empty)} = 1 or any non-empty array. + h = + LongHashingUtils.addToHash( + h, s.peerTagSchema == null ? 0 : Arrays.hashCode(s.peerTagSchema.names)); + h = LongHashingUtils.addToHash(h, Arrays.hashCode(s.peerTagValues)); + h = LongHashingUtils.addToHash(h, s.httpMethod); + h = LongHashingUtils.addToHash(h, s.httpEndpoint); + h = LongHashingUtils.addToHash(h, s.grpcStatusCode); + return h; + } + + // Accessors for SerializingMetricWriter. + UTF8BytesString getResource() { + return resource; + } + + UTF8BytesString getService() { + return service; + } + + UTF8BytesString getOperationName() { + return operationName; + } + + @Nullable + UTF8BytesString getServiceSource() { + return serviceSource; + } + + UTF8BytesString getType() { + return type; + } + + UTF8BytesString getSpanKind() { + return spanKind; + } + + @Nullable + UTF8BytesString getHttpMethod() { + return httpMethod; + } + + @Nullable + UTF8BytesString getHttpEndpoint() { + return httpEndpoint; + } + + @Nullable + UTF8BytesString getGrpcStatusCode() { + return grpcStatusCode; + } + + int getHttpStatusCode() { + return httpStatusCode; + } + + boolean isSynthetics() { + return synthetic; + } + + boolean isTraceRoot() { + return traceRoot; + } + + List getPeerTags() { + return peerTags; + } + + // Production AggregateEntry intentionally has no equals/hashCode override -- AggregateTable + // bucketing uses keyHash + matches(SpanSnapshot) directly and never invokes Object.equals. + // For tests that need value-equality (Spock argument matchers), use AggregateEntryTestUtils in + // src/test, which provides equals/hashCode helpers without exposing the contract in production. + + // ----- helpers ----- + + private static UTF8BytesString canonicalize( + DDCache cache, CharSequence charSeq) { + if (charSeq == null) { + return EMPTY; + } + if (charSeq instanceof UTF8BytesString) { + return (UTF8BytesString) charSeq; + } + return cache.computeIfAbsent(charSeq.toString(), UTF8BytesString::create); + } + + /** + * Like {@link #canonicalize} but returns {@code null} for a {@code null} input (rather than + * {@link UTF8BytesString#EMPTY}). Used for the four optional fields so the serializer can + * distinguish "absent" via a {@code != null} check and elide the field on the wire. + * + *

The {@code instanceof UTF8BytesString} short-circuit is dead code for {@link + * SpanSnapshot#httpMethod}/{@code httpEndpoint}/{@code grpcStatusCode} (statically {@code + * String}) but live for {@link SpanSnapshot#serviceNameSource} ({@link CharSequence}); keeping a + * single helper keeps the constructor consistent. + */ + @Nullable + private static UTF8BytesString canonicalizeOptional( + DDCache cache, @Nullable CharSequence charSeq) { + if (charSeq == null) { + return null; + } + if (charSeq instanceof UTF8BytesString) { + return (UTF8BytesString) charSeq; + } + return cache.computeIfAbsent(charSeq.toString(), UTF8BytesString::create); + } + + /** + * UTF8 vs raw CharSequence content-equality, no allocation in the common (String) case. + * + *

Treats {@code null} and empty (length 0) as equivalent on either side. This matches the + * canonicalization semantics: {@link #canonicalize} maps a {@code null} input to {@link + * UTF8BytesString#EMPTY}, so an entry built from a snapshot with a null field needs to match a + * subsequent snapshot whose field is still null. {@code intHash(null) == 0 == "".hashCode()}, so + * the hash already agrees with this view. + */ + private static boolean contentEquals(UTF8BytesString a, CharSequence b) { + if (a == null || a.length() == 0) { + return b == null || b.length() == 0; + } + // UTF8BytesString.toString() returns the underlying String -- O(1), no allocation. + return b != null && a.toString().contentEquals(b); + } + + /** + * Encodes the per-span peer-tag values into the {@code List} the serializer + * consumes. Reads name/value pairs at the same index from the schema's names and the snapshot's + * values; null value slots are skipped (the span didn't set that peer tag). Counts hits once for + * exact-size allocation and preserves the singletonList fast path for the common one-entry case + * (e.g. internal-kind base.service). + */ + private static List materializePeerTags( + @Nullable String[] names, @Nullable String[] values) { + if (names == null || values == null) { + return Collections.emptyList(); + } + int n = names.length; + int firstHit = -1; + int hitCount = 0; + for (int i = 0; i < n; i++) { + if (values[i] != null) { + if (hitCount == 0) firstHit = i; + hitCount++; + } + } + if (hitCount == 0) { + return Collections.emptyList(); + } + if (hitCount == 1) { + return Collections.singletonList(encodePeerTag(names[firstHit], values[firstHit])); + } + List tags = new ArrayList<>(hitCount); + for (int i = firstHit; i < n; i++) { + if (values[i] != null) { + tags.add(encodePeerTag(names[i], values[i])); + } + } + return tags; + } + + private static UTF8BytesString encodePeerTag(String name, String value) { + final Pair, Function> + cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(name, PEER_TAGS_CACHE_ADDER); + return cacheAndCreator.getLeft().computeIfAbsent(value, cacheAndCreator.getRight()); + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java deleted file mode 100644 index dba66a5ab9c..00000000000 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java +++ /dev/null @@ -1,103 +0,0 @@ -package datadog.trace.common.metrics; - -import datadog.metrics.api.Histogram; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.concurrent.atomic.AtomicLongArray; - -/** Not thread-safe. Accumulates counts and durations. */ -@SuppressFBWarnings( - value = {"AT_NONATOMIC_OPERATIONS_ON_SHARED_VARIABLE", "AT_STALE_THREAD_WRITE_OF_PRIMITIVE"}, - justification = "Explicitly not thread-safe. Accumulates counts and durations.") -public final class AggregateMetric { - - static final long ERROR_TAG = 0x8000000000000000L; - static final long TOP_LEVEL_TAG = 0x4000000000000000L; - - private final Histogram okLatencies; - private final Histogram errorLatencies; - private int errorCount; - private int hitCount; - private int topLevelCount; - private long duration; - - public AggregateMetric() { - okLatencies = Histogram.newHistogram(); - errorLatencies = Histogram.newHistogram(); - } - - public AggregateMetric recordDurations(int count, AtomicLongArray durations) { - this.hitCount += count; - for (int i = 0; i < count && i < durations.length(); ++i) { - long duration = durations.getAndSet(i, 0); - if ((duration & TOP_LEVEL_TAG) == TOP_LEVEL_TAG) { - duration ^= TOP_LEVEL_TAG; - ++topLevelCount; - } - if ((duration & ERROR_TAG) == ERROR_TAG) { - // then it's an error - duration ^= ERROR_TAG; - errorLatencies.accept(duration); - ++errorCount; - } else { - okLatencies.accept(duration); - } - this.duration += duration; - } - return this; - } - - /** - * Records a single hit. {@code tagAndDuration} carries the duration nanos with optional {@link - * #ERROR_TAG} / {@link #TOP_LEVEL_TAG} bits OR-ed in. - */ - public AggregateMetric recordOneDuration(long tagAndDuration) { - ++hitCount; - if ((tagAndDuration & TOP_LEVEL_TAG) == TOP_LEVEL_TAG) { - tagAndDuration ^= TOP_LEVEL_TAG; - ++topLevelCount; - } - if ((tagAndDuration & ERROR_TAG) == ERROR_TAG) { - tagAndDuration ^= ERROR_TAG; - errorLatencies.accept(tagAndDuration); - ++errorCount; - } else { - okLatencies.accept(tagAndDuration); - } - duration += tagAndDuration; - return this; - } - - public int getErrorCount() { - return errorCount; - } - - public int getHitCount() { - return hitCount; - } - - public int getTopLevelCount() { - return topLevelCount; - } - - public long getDuration() { - return duration; - } - - public Histogram getOkLatencies() { - return okLatencies; - } - - public Histogram getErrorLatencies() { - return errorLatencies; - } - - @SuppressFBWarnings("AT_NONATOMIC_64BIT_PRIMITIVE") - public void clear() { - this.errorCount = 0; - this.hitCount = 0; - this.topLevelCount = 0; - this.duration = 0; - this.okLatencies.clear(); - this.errorLatencies.clear(); - } -} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java new file mode 100644 index 00000000000..abadc7e5f17 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java @@ -0,0 +1,140 @@ +package datadog.trace.common.metrics; + +import datadog.trace.util.Hashtable; +import datadog.trace.util.Hashtable.MutatingTableIterator; +import datadog.trace.util.Hashtable.Support; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +/** + * Consumer-side {@link AggregateEntry} store, keyed on the raw fields of a {@link SpanSnapshot}. + * + *

Replaces the prior {@code LRUCache}. The win is on the + * steady-state hit path: a snapshot lookup is a 64-bit hash compute + bucket walk + field-wise + * {@code matches}, with no per-snapshot {@link AggregateEntry} allocation and no UTF8 cache + * lookups. The UTF8-encoded forms (formerly held on {@code MetricKey}) and the mutable counters + * (formerly held on {@code AggregateMetric}) both live on the {@link AggregateEntry} now, built + * once per unique key at insert time. + * + *

Not thread-safe. The aggregator thread is the sole writer of both this table and its + * contained {@link AggregateEntry} state. Any cross-thread request that needs to mutate -- e.g. + * {@link ConflatingMetricsAggregator#disable()} -- must funnel onto the aggregator thread via the + * inbox (see the {@code ClearSignal} routing in {@link Aggregator}). The invariant is convention- + * enforced; nothing here checks the calling thread at runtime, so a wrong-thread call would corrupt + * bucket chains silently. + */ +final class AggregateTable { + + private final Hashtable.Entry[] buckets; + private final int maxAggregates; + private int size; + + /** + * Bucket index where the last {@link #evictOneStale} successfully removed an entry. The next call + * resumes from this bucket so a fast-evicting workload doesn't repeatedly re-walk the same hot + * entries clustered near bucket 0. Reset to {@code 0} by {@link #clear}. + */ + private int evictCursor; + + AggregateTable(int maxAggregates) { + this.buckets = Support.create(maxAggregates, Support.MAX_RATIO); + this.maxAggregates = maxAggregates; + } + + int size() { + return size; + } + + boolean isEmpty() { + return size == 0; + } + + /** + * Returns the {@link AggregateEntry} to update for {@code snapshot}, lazily creating one on miss. + * Returns {@code null} when the table is at capacity and no stale entry can be evicted -- the + * caller should drop the data point in that case. + */ + AggregateEntry findOrInsert(SpanSnapshot snapshot) { + long keyHash = AggregateEntry.hashOf(snapshot); + for (AggregateEntry candidate = Support.bucket(buckets, keyHash); + candidate != null; + candidate = candidate.next()) { + if (candidate.matches(keyHash, snapshot)) { + return candidate; + } + } + if (size >= maxAggregates && !evictOneStale()) { + return null; + } + AggregateEntry entry = new AggregateEntry(snapshot, keyHash); + Support.insertHeadEntry(buckets, keyHash, entry); + size++; + return entry; + } + + /** + * Unlinks the first entry whose {@code getHitCount() == 0}, resuming the scan from {@link + * #evictCursor} so back-to-back evictions amortize to O(1) per call. Worst case for a single call + * is still O(N) when nearly every entry is hot, but a sustained eviction stream never re-scans + * the hot prefix more than twice across N evictions. + * + *

The semantic intent: at cap with all entries live, drop the new key (reported via {@code + * onStatsAggregateDropped}) rather than evicting an established one. Cap is sized to the + * steady-state working set, so eviction is rare; this cursor optimization handles the + * pathological "persistently at cap" case. + */ + private boolean evictOneStale() { + // Two passes -- [cursor, length) then [0, cursor) -- using the half-open-range iterator. The + // second pass is naturally empty when cursor==0, so no extra check needed. + return evictOneStaleInRange(evictCursor, buckets.length) + || evictOneStaleInRange(0, evictCursor); + } + + /** Scans {@code [startBucket, endBucket)} for the first stale entry and unlinks it. */ + private boolean evictOneStaleInRange(int startBucket, int endBucket) { + MutatingTableIterator iter = + Support.mutatingTableIterator(buckets, startBucket, endBucket); + while (iter.hasNext()) { + AggregateEntry e = iter.next(); + if (e.getHitCount() == 0) { + int bucket = iter.currentBucket(); + iter.remove(); + size--; + evictCursor = bucket; + return true; + } + } + return false; + } + + void forEach(Consumer consumer) { + Support.forEach(buckets, consumer); + } + + /** + * Context-passing forEach. Useful for callers that want to avoid a capturing-lambda allocation on + * each invocation -- pass a non-capturing {@link BiConsumer} (typically a {@code static final}) + * plus whatever side-band state it needs as {@code context}. + */ + void forEach(T context, BiConsumer consumer) { + Support.forEach(buckets, context, consumer); + } + + /** Removes entries whose {@code getHitCount() == 0}. */ + void expungeStaleAggregates() { + for (MutatingTableIterator iter = Support.mutatingTableIterator(buckets); + iter.hasNext(); ) { + AggregateEntry e = iter.next(); + if (e.getHitCount() == 0) { + iter.remove(); + size--; + } + } + } + + void clear() { + Support.clear(buckets); + size = 0; + evictCursor = 0; + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java index 9998c21ed0b..d809d452522 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java @@ -1,26 +1,12 @@ package datadog.trace.common.metrics; -import static datadog.trace.api.Functions.UTF8_ENCODE; -import static datadog.trace.common.metrics.ConflatingMetricsAggregator.PEER_TAGS_CACHE; -import static datadog.trace.common.metrics.ConflatingMetricsAggregator.PEER_TAGS_CACHE_ADDER; -import static datadog.trace.common.metrics.ConflatingMetricsAggregator.SERVICE_NAMES; -import static datadog.trace.common.metrics.ConflatingMetricsAggregator.SPAN_KINDS; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import datadog.trace.api.Pair; -import datadog.trace.api.cache.DDCache; -import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import datadog.trace.common.metrics.SignalItem.ClearSignal; import datadog.trace.common.metrics.SignalItem.StopSignal; import datadog.trace.core.monitor.HealthMetrics; -import datadog.trace.core.util.LRUCache; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import org.jctools.queues.MessagePassingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,8 +18,9 @@ final class Aggregator implements Runnable { private static final Logger log = LoggerFactory.getLogger(Aggregator.class); private final MessagePassingQueue inbox; - private final LRUCache aggregates; + private final AggregateTable aggregates; private final MetricWriter writer; + private final HealthMetrics healthMetrics; // the reporting interval controls how much history will be buffered // when the agent is unresponsive (only 10 pending requests will be // buffered by OkHttpSink) @@ -85,34 +72,13 @@ final class Aggregator implements Runnable { Runnable onReportCycle) { this.writer = writer; this.inbox = inbox; - this.aggregates = - new LRUCache<>( - new AggregateExpiry(healthMetrics), maxAggregates * 4 / 3, 0.75f, maxAggregates); + this.aggregates = new AggregateTable(maxAggregates); this.reportingIntervalNanos = reportingIntervalTimeUnit.toNanos(reportingInterval); this.sleepMillis = sleepMillis; + this.healthMetrics = healthMetrics; this.onReportCycle = onReportCycle; } - private static final class AggregateExpiry - implements LRUCache.ExpiryListener { - private final HealthMetrics healthMetrics; - - AggregateExpiry(HealthMetrics healthMetrics) { - this.healthMetrics = healthMetrics; - } - - @Override - public void accept(Map.Entry expired) { - if (expired.getValue().getHitCount() > 0) { - healthMetrics.onStatsAggregateDropped(); - } - } - } - - public void clearAggregates() { - this.aggregates.clear(); - } - @Override public void run() { Thread currentThread = Thread.currentThread(); @@ -139,7 +105,31 @@ private final class Drainer implements MessagePassingQueue.Consumer { @Override public void accept(InboxItem item) { - if (item instanceof SignalItem) { + if (item == ClearSignal.CLEAR) { + // ClearSignal is routed through the inbox (rather than letting the caller mutate + // AggregateTable directly) so the aggregator thread stays the sole writer. AggregateTable + // is not thread-safe; a direct clear() from e.g. the OkHttpSink callback thread would + // race with Drainer.accept on this thread. + // + // We deliberately do NOT call inbox.clear() here. Doing so would erase any queued STOP + // (or REPORT) signals that happen to sit behind CLEAR -- a real concern when a + // downgrade is followed quickly by close(), where the trampled STOP leaves the + // aggregator thread spinning until thread.join times out. features.supportsMetrics() is + // already false by the time CLEAR was offered, so producers have stopped publishing; + // any in-flight snapshots will drain naturally into the just-cleared table, get + // re-aggregated, and flushed on the next report -- where the agent rejects them again, + // triggering another DOWNGRADED -> disable() -> CLEAR cycle. Worst case: one extra + // reporting cycle of wasted work, which we accept for the safety of preserving STOP. + if (!stopped) { + aggregates.clear(); + // Clear dirty too -- without this, the next report() would see dirty=true, run + // expungeStaleAggregates against the (now-empty) table, find isEmpty()=true, and skip + // the flush anyway. Same observable outcome, but resetting here keeps the invariant + // "dirty implies there's data to flush" honest. + dirty = false; + } + ((SignalItem) item).complete(); + } else if (item instanceof SignalItem) { SignalItem signal = (SignalItem) item; if (!stopped) { report(wallClockTime(), signal); @@ -152,74 +142,16 @@ public void accept(InboxItem item) { } } else if (item instanceof SpanSnapshot && !stopped) { SpanSnapshot snapshot = (SpanSnapshot) item; - MetricKey key = buildMetricKey(snapshot); - AggregateMetric aggregate = aggregates.computeIfAbsent(key, k -> new AggregateMetric()); - aggregate.recordOneDuration(snapshot.tagAndDuration); - dirty = true; - } - } - } - - private static MetricKey buildMetricKey(SpanSnapshot s) { - return new MetricKey( - s.resourceName, - SERVICE_NAMES.computeIfAbsent(s.serviceName, UTF8_ENCODE), - s.operationName, - s.serviceNameSource, - s.spanType, - s.httpStatusCode, - s.synthetic, - s.traceRoot, - SPAN_KINDS.computeIfAbsent(s.spanKind, UTF8BytesString::create), - materializePeerTags(s.peerTagSchema, s.peerTagValues), - s.httpMethod, - s.httpEndpoint, - s.grpcStatusCode); - } - - /** - * Encodes the per-span peer-tag values into the {@code List} the {@link - * MetricKey} consumes. Reads name/value pairs at the same index from the schema's names and the - * snapshot's values; null value slots are skipped (the span didn't set that peer tag). - */ - private static List materializePeerTags(PeerTagSchema schema, String[] values) { - if (schema == null || values == null) { - return Collections.emptyList(); - } - String[] names = schema.names; - int n = names.length; - // First pass: count how many tags fired and remember the first index. The single-entry case - // is common (e.g. INTERNAL spans only emit base.service) and gets a singletonList to avoid an - // ArrayList allocation on the hot path. - int firstHit = -1; - int hitCount = 0; - for (int i = 0; i < n; i++) { - if (values[i] != null) { - if (hitCount == 0) { - firstHit = i; + AggregateEntry entry = aggregates.findOrInsert(snapshot); + if (entry != null) { + entry.recordOneDuration(snapshot.tagAndDuration); + dirty = true; + } else { + // table at cap with no stale entry available to evict + healthMetrics.onStatsAggregateDropped(); } - hitCount++; - } - } - if (hitCount == 0) { - return Collections.emptyList(); - } - if (hitCount == 1) { - return Collections.singletonList(encodePeerTag(names[firstHit], values[firstHit])); - } - List tags = new ArrayList<>(hitCount); - for (int i = firstHit; i < n; i++) { - if (values[i] != null) { - tags.add(encodePeerTag(names[i], values[i])); } } - return tags; - } - - private static UTF8BytesString encodePeerTag(String name, String value) { - final Pair, Function> - cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(name, PEER_TAGS_CACHE_ADDER); - return cacheAndCreator.getLeft().computeIfAbsent(value, cacheAndCreator.getRight()); } private void report(long when, SignalItem signal) { @@ -234,14 +166,16 @@ private void report(long when, SignalItem signal) { boolean skipped = true; if (dirty) { try { - expungeStaleAggregates(); + aggregates.expungeStaleAggregates(); if (!aggregates.isEmpty()) { skipped = false; writer.startBucket(aggregates.size(), when, reportingIntervalNanos); - for (Map.Entry aggregate : aggregates.entrySet()) { - writer.add(aggregate.getKey(), aggregate.getValue()); - aggregate.getValue().clear(); - } + aggregates.forEach( + writer, + (w, entry) -> { + w.add(entry); + entry.clear(); + }); // note that this may do IO and block writer.finishBucket(); } @@ -257,17 +191,6 @@ private void report(long when, SignalItem signal) { } } - private void expungeStaleAggregates() { - Iterator> it = aggregates.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry pair = it.next(); - AggregateMetric metric = pair.getValue(); - if (metric.getHitCount() == 0) { - it.remove(); - } - } - } - private long wallClockTime() { return MILLISECONDS.toNanos(System.currentTimeMillis()); } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index dc5d698bcc1..895ee434854 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -5,8 +5,9 @@ import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_ENDPOINT; import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_METHOD; import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; -import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG; -import static datadog.trace.common.metrics.AggregateMetric.TOP_LEVEL_TAG; +import static datadog.trace.common.metrics.AggregateEntry.ERROR_TAG; +import static datadog.trace.common.metrics.AggregateEntry.TOP_LEVEL_TAG; +import static datadog.trace.common.metrics.SignalItem.ClearSignal.CLEAR; import static datadog.trace.common.metrics.SignalItem.ReportSignal.REPORT; import static datadog.trace.common.metrics.SignalItem.StopSignal.STOP; import static datadog.trace.util.AgentThreadFactory.AgentThread.METRICS_AGGREGATOR; @@ -18,12 +19,8 @@ import datadog.communication.ddagent.DDAgentFeaturesDiscovery; import datadog.communication.ddagent.SharedCommunicationObjects; import datadog.trace.api.Config; -import datadog.trace.api.Pair; import datadog.trace.api.WellKnownTags; -import datadog.trace.api.cache.DDCache; -import datadog.trace.api.cache.DDCaches; import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; -import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.common.metrics.SignalItem.ReportSignal; import datadog.trace.common.writer.ddagent.DDAgentApi; import datadog.trace.core.CoreSpan; @@ -39,7 +36,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import org.jctools.queues.MessagePassingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,22 +47,6 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve private static final Map DEFAULT_HEADERS = Collections.singletonMap(DDAgentApi.DATADOG_META_TRACER_VERSION, DDTraceCoreInfo.VERSION); - static final DDCache SERVICE_NAMES = DDCaches.newFixedSizeCache(32); - - static final DDCache SPAN_KINDS = DDCaches.newFixedSizeCache(16); - static final DDCache< - String, Pair, Function>> - PEER_TAGS_CACHE = - DDCaches.newFixedSizeCache( - 64); // it can be unbounded since those values are returned by the agent and should be - // under control. 64 entries is enough in this case to contain all the peer tags. - static final Function< - String, Pair, Function>> - PEER_TAGS_CACHE_ADDER = - key -> - Pair.of( - DDCaches.newFixedSizeCache(512), - value -> UTF8BytesString.create(key + ":" + value)); private static final CharSequence SYNTHETICS_ORIGIN = "synthetics"; private static final SpanKindFilter METRICS_ELIGIBLE_KINDS = @@ -532,8 +512,17 @@ private void disable() { features.discover(); if (!features.supportsMetrics()) { log.debug("Disabling metric reporting because an agent downgrade was detected"); - this.inbox.clear(); - this.aggregator.clearAggregates(); + // Route the clear through the inbox so the aggregator thread is the only writer. + // AggregateTable is not thread-safe; mutating it directly from this thread would race + // with Drainer.accept on the aggregator thread. + // + // Best-effort single offer rather than the retry-loop pattern in report(). If the inbox is + // full at downgrade time the clear is dropped, but the system self-heals: features.discover() + // already flipped supportsMetrics() false, so producer publish() calls now skip the inbox; + // the aggregator drains existing snapshots and ships them on the next report cycle; the + // sink rejects that payload and fires DOWNGRADED again, which retries disable() against a + // now-empty inbox. Worst case: one extra reporting cycle of stale data. + inbox.offer(CLEAR); } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/InboxItem.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/InboxItem.java index 7d66cad6a15..e7c37f91768 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/InboxItem.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/InboxItem.java @@ -4,6 +4,17 @@ interface InboxItem {} +/** + * Inbox-routed control message. Each subclass exposes a process-wide {@code static final} singleton + * ({@link StopSignal#STOP}, {@link ReportSignal#REPORT}, {@link ClearSignal#CLEAR}) for the common + * fire-and-forget case and is also directly instantiable when a caller needs to await handling. + * + *

Singletons are fire-and-forget. The inherited {@link #future} is completed on first + * handling by the aggregator thread and never reset, so a second posting of the same singleton + * cannot signal completion to a fresh awaiter -- the future is already done. Callers that want + * one-shot completion semantics (e.g. {@code forceReport()}) must allocate a fresh instance ({@code + * new ReportSignal()}) rather than reusing the singleton. + */ abstract class SignalItem implements InboxItem { final CompletableFuture future; @@ -20,12 +31,26 @@ void ignore() { } static final class StopSignal extends SignalItem { + /** Fire-and-forget singleton. See class-level note on {@link SignalItem}. */ static final StopSignal STOP = new StopSignal(); private StopSignal() {} } static final class ReportSignal extends SignalItem { + /** Fire-and-forget singleton; {@code forceReport()} allocates fresh instances. */ static final ReportSignal REPORT = new ReportSignal(); } + + /** + * Posted from arbitrary threads (e.g. the Sink event thread during agent downgrade) so the + * aggregator thread is the one that actually performs the table reset. Keeps {@link + * AggregateTable} and {@code inbox.clear()} single-writer. + */ + static final class ClearSignal extends SignalItem { + /** Fire-and-forget singleton. See class-level note on {@link SignalItem}. */ + static final ClearSignal CLEAR = new ClearSignal(); + + private ClearSignal() {} + } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java deleted file mode 100644 index 9e2e2098d1f..00000000000 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java +++ /dev/null @@ -1,178 +0,0 @@ -package datadog.trace.common.metrics; - -import static datadog.trace.bootstrap.instrumentation.api.UTF8BytesString.EMPTY; - -import datadog.trace.api.cache.DDCache; -import datadog.trace.api.cache.DDCaches; -import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; -import datadog.trace.util.HashingUtils; -import java.util.Collections; -import java.util.List; -import java.util.Objects; - -/** The aggregation key for tracked metrics. */ -public final class MetricKey { - static final DDCache RESOURCE_CACHE = DDCaches.newFixedSizeCache(32); - static final DDCache SERVICE_CACHE = DDCaches.newFixedSizeCache(8); - static final DDCache SERVICE_SOURCE_CACHE = - DDCaches.newFixedSizeCache(16); - static final DDCache OPERATION_CACHE = DDCaches.newFixedSizeCache(64); - static final DDCache TYPE_CACHE = DDCaches.newFixedSizeCache(8); - static final DDCache KIND_CACHE = DDCaches.newFixedSizeCache(8); - static final DDCache HTTP_METHOD_CACHE = DDCaches.newFixedSizeCache(8); - static final DDCache HTTP_ENDPOINT_CACHE = - DDCaches.newFixedSizeCache(32); - static final DDCache GRPC_STATUS_CODE_CACHE = - DDCaches.newFixedSizeCache(32); - - private final UTF8BytesString resource; - private final UTF8BytesString service; - private final UTF8BytesString serviceSource; - private final UTF8BytesString operationName; - private final UTF8BytesString type; - private final int httpStatusCode; - private final boolean synthetics; - private final int hash; - private final boolean isTraceRoot; - private final UTF8BytesString spanKind; - private final List peerTags; - private final UTF8BytesString httpMethod; - private final UTF8BytesString httpEndpoint; - private final UTF8BytesString grpcStatusCode; - - public MetricKey( - CharSequence resource, - CharSequence service, - CharSequence operationName, - CharSequence serviceSource, - CharSequence type, - int httpStatusCode, - boolean synthetics, - boolean isTraceRoot, - CharSequence spanKind, - List peerTags, - CharSequence httpMethod, - CharSequence httpEndpoint, - CharSequence grpcStatusCode) { - this.resource = null == resource ? EMPTY : utf8(RESOURCE_CACHE, resource); - this.service = null == service ? EMPTY : utf8(SERVICE_CACHE, service); - this.serviceSource = null == serviceSource ? null : utf8(SERVICE_SOURCE_CACHE, serviceSource); - this.operationName = null == operationName ? EMPTY : utf8(OPERATION_CACHE, operationName); - this.type = null == type ? EMPTY : utf8(TYPE_CACHE, type); - this.httpStatusCode = httpStatusCode; - this.synthetics = synthetics; - this.isTraceRoot = isTraceRoot; - this.spanKind = null == spanKind ? EMPTY : utf8(KIND_CACHE, spanKind); - this.peerTags = peerTags == null ? Collections.emptyList() : peerTags; - this.httpMethod = httpMethod == null ? null : utf8(HTTP_METHOD_CACHE, httpMethod); - this.httpEndpoint = httpEndpoint == null ? null : utf8(HTTP_ENDPOINT_CACHE, httpEndpoint); - this.grpcStatusCode = - grpcStatusCode == null ? null : utf8(GRPC_STATUS_CODE_CACHE, grpcStatusCode); - - int tmpHash = 0; - tmpHash = HashingUtils.addToHash(tmpHash, this.isTraceRoot); - tmpHash = HashingUtils.addToHash(tmpHash, this.spanKind); - tmpHash = HashingUtils.addToHash(tmpHash, this.peerTags); - tmpHash = HashingUtils.addToHash(tmpHash, this.resource); - tmpHash = HashingUtils.addToHash(tmpHash, this.service); - tmpHash = HashingUtils.addToHash(tmpHash, this.operationName); - tmpHash = HashingUtils.addToHash(tmpHash, this.type); - tmpHash = HashingUtils.addToHash(tmpHash, this.httpStatusCode); - tmpHash = HashingUtils.addToHash(tmpHash, this.synthetics); - tmpHash = HashingUtils.addToHash(tmpHash, this.serviceSource); - tmpHash = HashingUtils.addToHash(tmpHash, this.httpEndpoint); - tmpHash = HashingUtils.addToHash(tmpHash, this.httpMethod); - tmpHash = HashingUtils.addToHash(tmpHash, this.grpcStatusCode); - this.hash = tmpHash; - } - - static UTF8BytesString utf8(DDCache cache, CharSequence charSeq) { - if (charSeq instanceof UTF8BytesString) { - return (UTF8BytesString) charSeq; - } else { - return cache.computeIfAbsent(charSeq.toString(), UTF8BytesString::create); - } - } - - public UTF8BytesString getResource() { - return resource; - } - - public UTF8BytesString getService() { - return service; - } - - public UTF8BytesString getServiceSource() { - return serviceSource; - } - - public UTF8BytesString getOperationName() { - return operationName; - } - - public UTF8BytesString getType() { - return type; - } - - public int getHttpStatusCode() { - return httpStatusCode; - } - - public boolean isSynthetics() { - return synthetics; - } - - public boolean isTraceRoot() { - return isTraceRoot; - } - - public UTF8BytesString getSpanKind() { - return spanKind; - } - - public List getPeerTags() { - return peerTags; - } - - public UTF8BytesString getHttpMethod() { - return httpMethod; - } - - public UTF8BytesString getHttpEndpoint() { - return httpEndpoint; - } - - public UTF8BytesString getGrpcStatusCode() { - return grpcStatusCode; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if ((o instanceof MetricKey)) { - MetricKey metricKey = (MetricKey) o; - return hash == metricKey.hash - && synthetics == metricKey.synthetics - && httpStatusCode == metricKey.httpStatusCode - && resource.equals(metricKey.resource) - && service.equals(metricKey.service) - && operationName.equals(metricKey.operationName) - && type.equals(metricKey.type) - && isTraceRoot == metricKey.isTraceRoot - && spanKind.equals(metricKey.spanKind) - && peerTags.equals(metricKey.peerTags) - && Objects.equals(serviceSource, metricKey.serviceSource) - && Objects.equals(httpMethod, metricKey.httpMethod) - && Objects.equals(httpEndpoint, metricKey.httpEndpoint) - && Objects.equals(grpcStatusCode, metricKey.grpcStatusCode); - } - return false; - } - - @Override - public int hashCode() { - return hash; - } -} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricWriter.java index fa26ed2e5db..905ba498760 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricWriter.java @@ -3,7 +3,11 @@ public interface MetricWriter { void startBucket(int metricCount, long start, long duration); - void add(MetricKey key, AggregateMetric aggregate); + /** + * Serialize one aggregate. The {@link AggregateEntry} carries both the label fields (resource, + * service, span.kind, peer tags, etc.) and the counters being reported. + */ + void add(AggregateEntry entry); void finishBucket(); diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java index 0f84964e9db..7644ebaf044 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java @@ -142,12 +142,12 @@ public void startBucket(int metricCount, long start, long duration) { } @Override - public void add(MetricKey key, AggregateMetric aggregate) { + public void add(AggregateEntry entry) { // Calculate dynamic map size based on optional fields - final boolean hasHttpMethod = key.getHttpMethod() != null; - final boolean hasHttpEndpoint = key.getHttpEndpoint() != null; - final boolean hasServiceSource = key.getServiceSource() != null; - final boolean hasGrpcStatusCode = key.getGrpcStatusCode() != null; + final boolean hasHttpMethod = entry.getHttpMethod() != null; + final boolean hasHttpEndpoint = entry.getHttpEndpoint() != null; + final boolean hasServiceSource = entry.getServiceSource() != null; + final boolean hasGrpcStatusCode = entry.getGrpcStatusCode() != null; final int mapSize = 15 + (hasServiceSource ? 1 : 0) @@ -158,31 +158,31 @@ public void add(MetricKey key, AggregateMetric aggregate) { writer.startMap(mapSize); writer.writeUTF8(NAME); - writer.writeUTF8(key.getOperationName()); + writer.writeUTF8(entry.getOperationName()); writer.writeUTF8(SERVICE); - writer.writeUTF8(key.getService()); + writer.writeUTF8(entry.getService()); writer.writeUTF8(RESOURCE); - writer.writeUTF8(key.getResource()); + writer.writeUTF8(entry.getResource()); writer.writeUTF8(TYPE); - writer.writeUTF8(key.getType()); + writer.writeUTF8(entry.getType()); writer.writeUTF8(HTTP_STATUS_CODE); - writer.writeInt(key.getHttpStatusCode()); + writer.writeInt(entry.getHttpStatusCode()); writer.writeUTF8(SYNTHETICS); - writer.writeBoolean(key.isSynthetics()); + writer.writeBoolean(entry.isSynthetics()); writer.writeUTF8(IS_TRACE_ROOT); - writer.writeInt(key.isTraceRoot() ? TRISTATE_TRUE : TRISTATE_FALSE); + writer.writeInt(entry.isTraceRoot() ? TRISTATE_TRUE : TRISTATE_FALSE); writer.writeUTF8(SPAN_KIND); - writer.writeUTF8(key.getSpanKind()); + writer.writeUTF8(entry.getSpanKind()); writer.writeUTF8(PEER_TAGS); - final List peerTags = key.getPeerTags(); + final List peerTags = entry.getPeerTags(); writer.startArray(peerTags.size()); for (UTF8BytesString peerTag : peerTags) { @@ -191,43 +191,43 @@ public void add(MetricKey key, AggregateMetric aggregate) { if (hasServiceSource) { writer.writeUTF8(SERVICE_SOURCE); - writer.writeUTF8(key.getServiceSource()); + writer.writeUTF8(entry.getServiceSource()); } // Only include HTTPMethod if present if (hasHttpMethod) { writer.writeUTF8(HTTP_METHOD); - writer.writeUTF8(key.getHttpMethod()); + writer.writeUTF8(entry.getHttpMethod()); } // Only include HTTPEndpoint if present if (hasHttpEndpoint) { writer.writeUTF8(HTTP_ENDPOINT); - writer.writeUTF8(key.getHttpEndpoint()); + writer.writeUTF8(entry.getHttpEndpoint()); } // Only include GRPCStatusCode if present (rpc-type spans) if (hasGrpcStatusCode) { writer.writeUTF8(GRPC_STATUS_CODE); - writer.writeUTF8(key.getGrpcStatusCode()); + writer.writeUTF8(entry.getGrpcStatusCode()); } writer.writeUTF8(HITS); - writer.writeInt(aggregate.getHitCount()); + writer.writeInt(entry.getHitCount()); writer.writeUTF8(ERRORS); - writer.writeInt(aggregate.getErrorCount()); + writer.writeInt(entry.getErrorCount()); writer.writeUTF8(TOP_LEVEL_HITS); - writer.writeInt(aggregate.getTopLevelCount()); + writer.writeInt(entry.getTopLevelCount()); writer.writeUTF8(DURATION); - writer.writeLong(aggregate.getDuration()); + writer.writeLong(entry.getDuration()); writer.writeUTF8(OK_SUMMARY); - writer.writeBinary(aggregate.getOkLatencies().serialize()); + writer.writeBinary(entry.getOkLatencies().serialize()); writer.writeUTF8(ERROR_SUMMARY); - writer.writeBinary(aggregate.getErrorLatencies().serialize()); + writer.writeBinary(entry.getErrorLatencies().serialize()); } @Override diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java index eb9b741cea6..152ac42bb55 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java @@ -2,7 +2,8 @@ /** * Immutable per-span value posted from the producer to the aggregator thread. Carries the raw - * inputs the aggregator needs to build a {@link MetricKey} and update an {@link AggregateMetric}. + * inputs the aggregator needs to look up or build an {@link AggregateEntry} and update its + * counters. * *

All cache-canonicalization (service-name, span-kind, peer-tag string interning) happens on the * aggregator thread; the producer just shuffles references. diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy deleted file mode 100644 index 140149d8324..00000000000 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy +++ /dev/null @@ -1,105 +0,0 @@ -package datadog.trace.common.metrics - -import datadog.metrics.agent.AgentMeter -import datadog.metrics.impl.DDSketchHistograms -import datadog.metrics.impl.MonitoringImpl -import datadog.metrics.api.statsd.StatsDClient -import datadog.trace.test.util.DDSpecification - -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicLongArray - -import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG -import static datadog.trace.common.metrics.AggregateMetric.TOP_LEVEL_TAG - -class AggregateMetricTest extends DDSpecification { - - def setupSpec() { - // Initialize AgentMeter with monitoring - this is the standard mechanism used in production - def monitoring = new MonitoringImpl(StatsDClient.NO_OP, 1, TimeUnit.SECONDS) - AgentMeter.registerIfAbsent(StatsDClient.NO_OP, monitoring, DDSketchHistograms.FACTORY) - // Create a timer to trigger DDSketchHistograms loading and Factory registration - // This simulates what happens during CoreTracer initialization (traceWriteTimer) - monitoring.newTimer("test.init") - } - - def "record durations sums up to total"() { - given: - AggregateMetric aggregate = new AggregateMetric() - when: - aggregate.recordDurations(3, new AtomicLongArray(1, 2, 3)) - then: - aggregate.getDuration() == 6 - } - - def "total durations include errors"() { - given: - AggregateMetric aggregate = new AggregateMetric() - when: - aggregate.recordDurations(3, new AtomicLongArray(1, 2, 3)) - then: - aggregate.getDuration() == 6 - } - - def "clear"() { - given: - AggregateMetric aggregate = new AggregateMetric() - .recordDurations(3, new AtomicLongArray(5, ERROR_TAG | 6, TOP_LEVEL_TAG | 7)) - when: - aggregate.clear() - then: - aggregate.getDuration() == 0 - aggregate.getErrorCount() == 0 - aggregate.getTopLevelCount() == 0 - aggregate.getHitCount() == 0 - } - - def "recordOneDuration accumulates ok and error and top-level"() { - given: - AggregateMetric aggregate = new AggregateMetric() - .recordOneDuration(10L) - .recordOneDuration(10L | TOP_LEVEL_TAG) - .recordOneDuration(10L | ERROR_TAG) - - expect: - aggregate.getHitCount() == 3 - aggregate.getDuration() == 30 - aggregate.getErrorCount() == 1 - aggregate.getTopLevelCount() == 1 - } - - def "ignore trailing zeros"() { - given: - AggregateMetric aggregate = new AggregateMetric() - when: - aggregate.recordDurations(3, new AtomicLongArray(1, 2, 3, 0, 0, 0)) - then: - aggregate.getDuration() == 6 - aggregate.getHitCount() == 3 - aggregate.getErrorCount() == 0 - } - - def "hit count includes errors"() { - given: - AggregateMetric aggregate = new AggregateMetric() - when: - aggregate.recordDurations(3, new AtomicLongArray(1, 2, 3 | ERROR_TAG)) - then: - aggregate.getHitCount() == 3 - aggregate.getErrorCount() == 1 - } - - def "ok and error durations tracked separately"() { - given: - AggregateMetric aggregate = new AggregateMetric() - when: - aggregate.recordDurations(10, - new AtomicLongArray(1, 100 | ERROR_TAG, 2, 99 | ERROR_TAG, 3, - 98 | ERROR_TAG, 4, 97 | ERROR_TAG)) - then: - def errorLatencies = aggregate.getErrorLatencies() - def okLatencies = aggregate.getOkLatencies() - errorLatencies.getMaxValue() >= 99 - okLatencies.getMaxValue() <= 5 - } -} diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy index a95f6bcbdbc..00bd706b8fb 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy @@ -119,23 +119,25 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: latchTriggered 1 * writer.startBucket(1, _, _) - 1 * writer.add(new MetricKey( - null, - "service", - "operation", - null, - "type", - HTTP_OK, - false, - false, - "baz", - [], - null, - null, - null - ), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == 1 && value.getTopLevelCount() == 1 && value.getDuration() == 100 - } + 1 * writer.add({ + AggregateEntryTestUtils.equals(it,AggregateEntryTestUtils.of( + null, + "service", + "operation", + null, + "type", + HTTP_OK, + false, + false, + "baz", + [], + null, + null, + null + )) + }) >> { AggregateEntry e -> + assert e.getHitCount() == 1 && e.getTopLevelCount() == 1 && e.getDuration() == 100 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -165,23 +167,25 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: latchTriggered 1 * writer.startBucket(1, _, _) - 1 * writer.add(new MetricKey( - "resource", - "service", - "operation", - null, - "type", - HTTP_OK, - false, - false, - "baz", - [], - null, - null, - null - ), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == 1 && value.getTopLevelCount() == 1 && value.getDuration() == 100 - } + 1 * writer.add({ + AggregateEntryTestUtils.equals(it,AggregateEntryTestUtils.of( + "resource", + "service", + "operation", + null, + "type", + HTTP_OK, + false, + false, + "baz", + [], + null, + null, + null + )) + }) >> { AggregateEntry e -> + assert e.getHitCount() == 1 && e.getTopLevelCount() == 1 && e.getDuration() == 100 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -216,24 +220,26 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: latchTriggered == statsComputed (statsComputed ? 1 : 0) * writer.startBucket(1, _, _) - (statsComputed ? 1 : 0) * writer.add( - new MetricKey( - "resource", - "service", - "operation", - null, - "type", - HTTP_OK, - false, - false, - kind, - [], - httpMethod, - httpEndpoint, - null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 - }) + (statsComputed ? 1 : 0) * writer.add({ + AggregateEntryTestUtils.equals(it, + AggregateEntryTestUtils.of( + "resource", + "service", + "operation", + null, + "type", + HTTP_OK, + false, + false, + kind, + [], + httpMethod, + httpEndpoint, + null + )) + }) >> { AggregateEntry e -> + assert e.getHitCount() == 1 && e.getTopLevelCount() == 0 && e.getDuration() == 100 + } (statsComputed ? 1 : 0) * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -293,42 +299,46 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: cycle1Triggered cycle2Triggered - 1 * writer.add( - new MetricKey( - "resource", - "service", - "operation", - null, - "type", - HTTP_OK, - false, - false, - "client", - [UTF8BytesString.create("country:france")], - null, - null, - null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 - }) - 1 * writer.add( - new MetricKey( - "resource", - "service", - "operation", - null, - "type", - HTTP_OK, - false, - false, - "client", - [UTF8BytesString.create("country:france"), UTF8BytesString.create("georegion:europe")], - null, - null, - null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 - }) + 1 * writer.add({ + AggregateEntryTestUtils.equals(it, + AggregateEntryTestUtils.of( + "resource", + "service", + "operation", + null, + "type", + HTTP_OK, + false, + false, + "client", + [UTF8BytesString.create("country:france")], + null, + null, + null + )) + }) >> { AggregateEntry e -> + assert e.getHitCount() == 1 && e.getTopLevelCount() == 0 && e.getDuration() == 100 + } + 1 * writer.add({ + AggregateEntryTestUtils.equals(it, + AggregateEntryTestUtils.of( + "resource", + "service", + "operation", + null, + "type", + HTTP_OK, + false, + false, + "client", + [UTF8BytesString.create("country:france"), UTF8BytesString.create("georegion:europe")], + null, + null, + null + )) + }) >> { AggregateEntry e -> + assert e.getHitCount() == 1 && e.getTopLevelCount() == 0 && e.getDuration() == 100 + } 2 * writer.finishBucket() >> { latch1.countDown(); latch2.countDown() } cleanup: @@ -358,24 +368,26 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: latchTriggered 1 * writer.startBucket(1, _, _) - 1 * writer.add( - new MetricKey( - "resource", - "service", - "operation", - null, - "type", - HTTP_OK, - false, - false, - kind, - expectedPeerTags, - null, - null, - null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 - }) + 1 * writer.add({ + AggregateEntryTestUtils.equals(it, + AggregateEntryTestUtils.of( + "resource", + "service", + "operation", + null, + "type", + HTTP_OK, + false, + false, + kind, + expectedPeerTags, + null, + null, + null + )) + }) >> { AggregateEntry e -> + assert e.getHitCount() == 1 && e.getTopLevelCount() == 0 && e.getDuration() == 100 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -411,23 +423,25 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: latchTriggered 1 * writer.startBucket(1, _, _) - 1 * writer.add(new MetricKey( - "resource", - "service", - "operation", - null, - "type", - HTTP_OK, - false, - false, - "baz", - [], - null, - null, - null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getTopLevelCount() == topLevelCount && value.getDuration() == 100 - }) + 1 * writer.add({ + AggregateEntryTestUtils.equals(it,AggregateEntryTestUtils.of( + "resource", + "service", + "operation", + null, + "type", + HTTP_OK, + false, + false, + "baz", + [], + null, + null, + null + )) + }) >> { AggregateEntry e -> + assert e.getHitCount() == 1 && e.getTopLevelCount() == topLevelCount && e.getDuration() == 100 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -470,40 +484,44 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.finishBucket() >> { latch.countDown() } 1 * writer.startBucket(2, _, SECONDS.toNanos(reportingInterval)) - 1 * writer.add(new MetricKey( - "resource", - "service", - "operation", - null, - "type", - HTTP_OK, - false, - false, - "baz", - [], - null, - null, - null - ), { AggregateMetric value -> - value.getHitCount() == count && value.getDuration() == count * duration - }) - 1 * writer.add(new MetricKey( - "resource2", - "service2", - "operation2", - null, - "type", - HTTP_OK, - false, - false, - "baz", - [], - null, - null, - null - ), { AggregateMetric value -> - value.getHitCount() == count && value.getDuration() == count * duration * 2 - }) + 1 * writer.add({ + AggregateEntryTestUtils.equals(it,AggregateEntryTestUtils.of( + "resource", + "service", + "operation", + null, + "type", + HTTP_OK, + false, + false, + "baz", + [], + null, + null, + null + )) + }) >> { AggregateEntry e -> + assert e.getHitCount() == count && e.getDuration() == count * duration + } + 1 * writer.add({ + AggregateEntryTestUtils.equals(it,AggregateEntryTestUtils.of( + "resource2", + "service2", + "operation2", + null, + "type", + HTTP_OK, + false, + false, + "baz", + [], + null, + null, + null + )) + }) >> { AggregateEntry e -> + assert e.getHitCount() == count && e.getDuration() == count * duration * 2 + } cleanup: aggregator.close() @@ -541,23 +559,25 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: "should aggregate into single metric" latchTriggered 1 * writer.startBucket(1, _, _) - 1 * writer.add(new MetricKey( - "resource", - "service", - "operation", - null, - "type", - HTTP_OK, - false, - false, - "server", - [], - "GET", - "/api/users/:id", - null - ), { AggregateMetric value -> - value.getHitCount() == count && value.getDuration() == count * duration - }) + 1 * writer.add({ + AggregateEntryTestUtils.equals(it,AggregateEntryTestUtils.of( + "resource", + "service", + "operation", + null, + "type", + HTTP_OK, + false, + false, + "server", + [], + "GET", + "/api/users/:id", + null + )) + }) >> { AggregateEntry e -> + assert e.getHitCount() == count && e.getDuration() == count * duration + } 1 * writer.finishBucket() >> { latch.countDown() } when: "publish spans with different endpoints" @@ -582,57 +602,63 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: "should create separate metrics for each endpoint/method combination" latchTriggered2 1 * writer.startBucket(3, _, _) - 1 * writer.add(new MetricKey( - "resource", - "service", - "operation", - null, - "type", - HTTP_OK, - false, - false, - "server", - [], - "GET", - "/api/users/:id", - null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - }) - 1 * writer.add(new MetricKey( - "resource", - "service", - "operation", - null, - "type", - HTTP_OK, - false, - false, - "server", - [], - "GET", - "/api/orders/:id", - null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration * 2 - }) - 1 * writer.add(new MetricKey( - "resource", - "service", - "operation", - null, - "type", - HTTP_OK, - false, - false, - "server", - [], - "POST", - "/api/users/:id", - null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration * 3 - }) + 1 * writer.add({ + AggregateEntryTestUtils.equals(it,AggregateEntryTestUtils.of( + "resource", + "service", + "operation", + null, + "type", + HTTP_OK, + false, + false, + "server", + [], + "GET", + "/api/users/:id", + null + )) + }) >> { AggregateEntry e -> + assert e.getHitCount() == 1 && e.getDuration() == duration + } + 1 * writer.add({ + AggregateEntryTestUtils.equals(it,AggregateEntryTestUtils.of( + "resource", + "service", + "operation", + null, + "type", + HTTP_OK, + false, + false, + "server", + [], + "GET", + "/api/orders/:id", + null + )) + }) >> { AggregateEntry e -> + assert e.getHitCount() == 1 && e.getDuration() == duration * 2 + } + 1 * writer.add({ + AggregateEntryTestUtils.equals(it,AggregateEntryTestUtils.of( + "resource", + "service", + "operation", + null, + "type", + HTTP_OK, + false, + false, + "server", + [], + "POST", + "/api/users/:id", + null + )) + }) >> { AggregateEntry e -> + assert e.getHitCount() == 1 && e.getDuration() == duration * 3 + } 1 * writer.finishBucket() >> { latch2.countDown() } cleanup: @@ -680,74 +706,82 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: "should create 4 separate metrics" latchTriggered 1 * writer.startBucket(4, _, _) - 1 * writer.add(new MetricKey( - "resource", - "service", - "operation", - null, - "type", - 200, - false, - false, - "server", - [], - "GET", - "/api/users/:id", - null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - }) - 1 * writer.add(new MetricKey( - "resource", - "service", - "operation", - null, - "type", - 200, - false, - false, - "server", - [], - "POST", - "/api/users/:id", - null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration * 2 - }) - 1 * writer.add(new MetricKey( - "resource", - "service", - "operation", - null, - "type", - 404, - false, - false, - "server", - [], - "GET", - "/api/users/:id", - null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration * 3 - }) - 1 * writer.add(new MetricKey( - "resource", - "service", - "operation", - null, - "type", - 200, - false, - false, - "server", - [], - "GET", - "/api/orders/:id", - null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration * 4 - }) + 1 * writer.add({ + AggregateEntryTestUtils.equals(it,AggregateEntryTestUtils.of( + "resource", + "service", + "operation", + null, + "type", + 200, + false, + false, + "server", + [], + "GET", + "/api/users/:id", + null + )) + }) >> { AggregateEntry e -> + assert e.getHitCount() == 1 && e.getDuration() == duration + } + 1 * writer.add({ + AggregateEntryTestUtils.equals(it,AggregateEntryTestUtils.of( + "resource", + "service", + "operation", + null, + "type", + 200, + false, + false, + "server", + [], + "POST", + "/api/users/:id", + null + )) + }) >> { AggregateEntry e -> + assert e.getHitCount() == 1 && e.getDuration() == duration * 2 + } + 1 * writer.add({ + AggregateEntryTestUtils.equals(it,AggregateEntryTestUtils.of( + "resource", + "service", + "operation", + null, + "type", + 404, + false, + false, + "server", + [], + "GET", + "/api/users/:id", + null + )) + }) >> { AggregateEntry e -> + assert e.getHitCount() == 1 && e.getDuration() == duration * 3 + } + 1 * writer.add({ + AggregateEntryTestUtils.equals(it,AggregateEntryTestUtils.of( + "resource", + "service", + "operation", + null, + "type", + 200, + false, + false, + "server", + [], + "GET", + "/api/orders/:id", + null + )) + }) >> { AggregateEntry e -> + assert e.getHitCount() == 1 && e.getDuration() == duration * 4 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -784,40 +818,44 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: "should create separate metric keys for spans with and without HTTP tags" latchTriggered 1 * writer.startBucket(2, _, _) - 1 * writer.add(new MetricKey( - "resource", - "service", - "operation", - null, - "type", - 200, - false, - false, - "server", - [], - null, - null, - null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - }) - 1 * writer.add(new MetricKey( - "resource", - "service", - "operation", - null, - "type", - 200, - false, - false, - "server", - [], - "GET", - "/api/users/:id", - null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration * 2 - }) + 1 * writer.add({ + AggregateEntryTestUtils.equals(it,AggregateEntryTestUtils.of( + "resource", + "service", + "operation", + null, + "type", + 200, + false, + false, + "server", + [], + null, + null, + null + )) + }) >> { AggregateEntry e -> + assert e.getHitCount() == 1 && e.getDuration() == duration + } + 1 * writer.add({ + AggregateEntryTestUtils.equals(it,AggregateEntryTestUtils.of( + "resource", + "service", + "operation", + null, + "type", + 200, + false, + false, + "server", + [], + "GET", + "/api/users/:id", + null + )) + }) >> { AggregateEntry e -> + assert e.getHitCount() == 1 && e.getDuration() == duration * 2 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -852,47 +890,54 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: "should create the different metric keys for spans with and without sources" latchTriggered 1 * writer.startBucket(2, _, _) - 1 * writer.add(new MetricKey( - "resource", - "service", - "operation", - "source", - "type", - 200, - false, - false, - "server", - [], - null, - null, - null - ), { AggregateMetric value -> - value.getHitCount() == 2 && value.getDuration() == 2 * duration - }) - 1 * writer.add(new MetricKey( - "resource", - "service", - "operation", - null, - "type", - 200, - false, - false, - "server", - [], - null, - null, - null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - }) + 1 * writer.add({ + AggregateEntryTestUtils.equals(it,AggregateEntryTestUtils.of( + "resource", + "service", + "operation", + "source", + "type", + 200, + false, + false, + "server", + [], + null, + null, + null + )) + }) >> { AggregateEntry e -> + assert e.getHitCount() == 2 && e.getDuration() == 2 * duration + } + 1 * writer.add({ + AggregateEntryTestUtils.equals(it,AggregateEntryTestUtils.of( + "resource", + "service", + "operation", + null, + "type", + 200, + false, + false, + "server", + [], + null, + null, + null + )) + }) >> { AggregateEntry e -> + assert e.getHitCount() == 1 && e.getDuration() == duration + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: aggregator.close() } - def "test least recently written to aggregate flushed when size limit exceeded"() { + def "new aggregates beyond size limit are dropped when no stale entries can be evicted"() { + // The table only evicts entries with hitCount == 0 to make room. When all entries are live + // (all have been recorded against), an over-cap insert drops the new key rather than evicting + // an established one. This protects the data we've already collected from a burst of new keys. setup: int maxAggregates = 10 MetricWriter writer = Mock(MetricWriter) @@ -916,11 +961,11 @@ class ConflatingMetricAggregatorTest extends DDSpecification { aggregator.report() def latchTriggered = latch.await(2, SECONDS) - then: "the first aggregate should be dropped but the rest reported" + then: "the established service0..service9 are reported; service10 is dropped" latchTriggered 1 * writer.startBucket(10, _, SECONDS.toNanos(reportingInterval)) - for (int i = 1; i < 11; ++i) { - 1 * writer.add(new MetricKey( + for (int i = 0; i < 10; ++i) { + def expected = AggregateEntryTestUtils.of( "resource", "service" + i, "operation", @@ -933,26 +978,28 @@ class ConflatingMetricAggregatorTest extends DDSpecification { [], null, null, - null - ), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - } + null) + 1 * writer.add({ AggregateEntryTestUtils.equals(it, expected) }) >> { AggregateEntry e -> + assert e.getHitCount() == 1 && e.getDuration() == duration + } } - 0 * writer.add(new MetricKey( - "resource", - "service0", - "operation", - null, - "type", - HTTP_OK, - false, - false, - "baz", - [], - null, - null, - null - ), _) + 0 * writer.add({ + AggregateEntryTestUtils.equals(it,AggregateEntryTestUtils.of( + "resource", + "service10", + "operation", + null, + "type", + HTTP_OK, + false, + false, + "baz", + [], + null, + null, + null + )) + }) 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -1067,7 +1114,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(5, _, SECONDS.toNanos(reportingInterval)) for (int i = 0; i < 5; ++i) { - 1 * writer.add(new MetricKey( + def expected = AggregateEntryTestUtils.of( "resource", "service" + i, "operation", @@ -1080,10 +1127,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification { [], null, null, - null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - }) + null) + 1 * writer.add({ AggregateEntryTestUtils.equals(it, expected) }) >> { AggregateEntry e -> + assert e.getHitCount() == 1 && e.getDuration() == duration + } } 1 * writer.finishBucket() >> { latch.countDown() } @@ -1102,7 +1149,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(4, _, SECONDS.toNanos(reportingInterval)) for (int i = 1; i < 5; ++i) { - 1 * writer.add(new MetricKey( + def expected = AggregateEntryTestUtils.of( "resource", "service" + i, "operation", @@ -1115,26 +1162,28 @@ class ConflatingMetricAggregatorTest extends DDSpecification { [], null, null, - null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - }) + null) + 1 * writer.add({ AggregateEntryTestUtils.equals(it, expected) }) >> { AggregateEntry e -> + assert e.getHitCount() == 1 && e.getDuration() == duration + } } - 0 * writer.add(new MetricKey( - "resource", - "service0", - "operation", - null, - "type", - HTTP_OK, - false, - false, - "baz", - [], - null, - null, - null - ), _) + 0 * writer.add({ + AggregateEntryTestUtils.equals(it,AggregateEntryTestUtils.of( + "resource", + "service0", + "operation", + null, + "type", + HTTP_OK, + false, + false, + "baz", + [], + null, + null, + null + )) + }) 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -1169,7 +1218,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(5, _, SECONDS.toNanos(reportingInterval)) for (int i = 0; i < 5; ++i) { - 1 * writer.add(new MetricKey( + def expected = AggregateEntryTestUtils.of( "resource", "service" + i, "operation", @@ -1182,10 +1231,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification { [], null, null, - null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - }) + null) + 1 * writer.add({ AggregateEntryTestUtils.equals(it, expected) }) >> { AggregateEntry e -> + assert e.getHitCount() == 1 && e.getDuration() == duration + } } 1 * writer.finishBucket() >> { latch.countDown() } @@ -1195,7 +1244,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: "aggregate not updated in cycle is not reported" 0 * writer.finishBucket() 0 * writer.startBucket(_, _, _) - 0 * writer.add(_, _) + 0 * writer.add(_) cleanup: aggregator.close() @@ -1228,7 +1277,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(5, _, SECONDS.toNanos(1)) for (int i = 0; i < 5; ++i) { - 1 * writer.add(new MetricKey( + def expected = AggregateEntryTestUtils.of( "resource", "service" + i, "operation", @@ -1241,10 +1290,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification { [], null, null, - null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - }) + null) + 1 * writer.add({ AggregateEntryTestUtils.equals(it, expected) }) >> { AggregateEntry e -> + assert e.getHitCount() == 1 && e.getDuration() == duration + } } 1 * writer.finishBucket() >> { latch.countDown() } @@ -1394,24 +1443,26 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: latchTriggered 1 * writer.startBucket(1, _, _) - 1 * writer.add( - new MetricKey( - "resource", - "service", - "operation", - null, - "type", - HTTP_OK, - false, - true, - "", - [], - null, - null, - null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 1 && aggregateMetric.getDuration() == 100 - }) + 1 * writer.add({ + AggregateEntryTestUtils.equals(it, + AggregateEntryTestUtils.of( + "resource", + "service", + "operation", + null, + "type", + HTTP_OK, + false, + true, + "", + [], + null, + null, + null + )) + }) >> { AggregateEntry e -> + assert e.getHitCount() == 1 && e.getTopLevelCount() == 1 && e.getDuration() == 100 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -1449,24 +1500,26 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: "all spans should go to the same bucket (httpMethod and httpEndpoint are ignored)" latchTriggered 1 * writer.startBucket(1, _, _) - 1 * writer.add( - new MetricKey( - "resource", - "service", - "operation", - null, - "type", - HTTP_OK, - false, - false, - "server", - [], - null, - null, - null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 3 && aggregateMetric.getTopLevelCount() == 3 && aggregateMetric.getDuration() == 450 - }) + 1 * writer.add({ + AggregateEntryTestUtils.equals(it, + AggregateEntryTestUtils.of( + "resource", + "service", + "operation", + null, + "type", + HTTP_OK, + false, + false, + "server", + [], + null, + null, + null + )) + }) >> { AggregateEntry e -> + assert e.getHitCount() == 3 && e.getTopLevelCount() == 3 && e.getDuration() == 450 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -1504,60 +1557,66 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: "spans should go to separate buckets based on httpMethod and httpEndpoint" latchTriggered 1 * writer.startBucket(3, _, _) - 1 * writer.add( - new MetricKey( - "resource", - "service", - "operation", - null, - "type", - HTTP_OK, - false, - false, - "server", - [], - "GET", - "/api/users/:id", - null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 1 && aggregateMetric.getDuration() == 100 - }) - 1 * writer.add( - new MetricKey( - "resource", - "service", - "operation", - null, - "type", - HTTP_OK, - false, - false, - "server", - [], - "POST", - "/api/orders", - null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 1 && aggregateMetric.getDuration() == 200 - }) - 1 * writer.add( - new MetricKey( - "resource", - "service", - "operation", - null, - "type", - HTTP_OK, - false, - false, - "server", - [], - null, - null, - null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 1 && aggregateMetric.getDuration() == 150 - }) + 1 * writer.add({ + AggregateEntryTestUtils.equals(it, + AggregateEntryTestUtils.of( + "resource", + "service", + "operation", + null, + "type", + HTTP_OK, + false, + false, + "server", + [], + "GET", + "/api/users/:id", + null + )) + }) >> { AggregateEntry e -> + assert e.getHitCount() == 1 && e.getTopLevelCount() == 1 && e.getDuration() == 100 + } + 1 * writer.add({ + AggregateEntryTestUtils.equals(it, + AggregateEntryTestUtils.of( + "resource", + "service", + "operation", + null, + "type", + HTTP_OK, + false, + false, + "server", + [], + "POST", + "/api/orders", + null + )) + }) >> { AggregateEntry e -> + assert e.getHitCount() == 1 && e.getTopLevelCount() == 1 && e.getDuration() == 200 + } + 1 * writer.add({ + AggregateEntryTestUtils.equals(it, + AggregateEntryTestUtils.of( + "resource", + "service", + "operation", + null, + "type", + HTTP_OK, + false, + false, + "server", + [], + null, + null, + null + )) + }) >> { AggregateEntry e -> + assert e.getHitCount() == 1 && e.getTopLevelCount() == 1 && e.getDuration() == 150 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -1593,51 +1652,57 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: latchTriggered 1 * writer.startBucket(3, _, _) - 1 * writer.add(new MetricKey( - "grpc.service/Method", - "service", - "grpc.server", - null, - "rpc", - 0, - false, - false, - "server", - [], - null, - null, - "0" - ), _) - 1 * writer.add(new MetricKey( - "grpc.service/Method", - "service", - "grpc.server", - null, - "rpc", - 0, - false, - false, - "server", - [], - null, - null, - "5" - ), _) - 1 * writer.add(new MetricKey( - "GET /api", - "service", - "http.request", - null, - "web", - 200, - false, - false, - "server", - [], - null, - null, - null - ), _) + 1 * writer.add({ + AggregateEntryTestUtils.equals(it,AggregateEntryTestUtils.of( + "grpc.service/Method", + "service", + "grpc.server", + null, + "rpc", + 0, + false, + false, + "server", + [], + null, + null, + "0" + )) + }) + 1 * writer.add({ + AggregateEntryTestUtils.equals(it,AggregateEntryTestUtils.of( + "grpc.service/Method", + "service", + "grpc.server", + null, + "rpc", + 0, + false, + false, + "server", + [], + null, + null, + "5" + )) + }) + 1 * writer.add({ + AggregateEntryTestUtils.equals(it,AggregateEntryTestUtils.of( + "GET /api", + "service", + "http.request", + null, + "web", + 200, + false, + false, + "server", + [], + null, + null, + null + )) + }) 1 * writer.finishBucket() >> { latch.countDown() } cleanup: diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy index 3ff81de9851..cc0880bc30a 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy @@ -7,7 +7,6 @@ import static java.util.concurrent.TimeUnit.SECONDS import datadog.metrics.api.Histograms import datadog.metrics.impl.DDSketchHistograms import datadog.trace.api.Config -import datadog.trace.api.Pair import datadog.trace.api.ProcessTags import datadog.trace.api.WellKnownTags import datadog.trace.api.git.CommitInfo @@ -16,7 +15,6 @@ import datadog.trace.api.git.GitInfoProvider import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString import datadog.trace.test.util.DDSpecification import java.nio.ByteBuffer -import java.util.concurrent.atomic.AtomicLongArray import org.msgpack.core.MessagePack import org.msgpack.core.MessageUnpacker @@ -26,6 +24,30 @@ class SerializingMetricWriterTest extends DDSpecification { Histograms.register(DDSketchHistograms.FACTORY) } + /** Build an {@link AggregateEntry} with a pre-recorded duration count. */ + private static AggregateEntry entry( + CharSequence resource, + CharSequence service, + CharSequence operationName, + CharSequence serviceSource, + CharSequence type, + int httpStatusCode, + boolean synthetic, + boolean traceRoot, + CharSequence spanKind, + List peerTags, + CharSequence httpMethod, + CharSequence httpEndpoint, + CharSequence grpcStatusCode, + int hitCount) { + AggregateEntry e = AggregateEntryTestUtils.of( + resource, service, operationName, serviceSource, type, + httpStatusCode, synthetic, traceRoot, spanKind, peerTags, + httpMethod, httpEndpoint, grpcStatusCode) + hitCount.times { e.recordOneDuration(1L) } + return e + } + def "should produce correct message #iterationIndex with process tags enabled #withProcessTags" () { setup: if (!withProcessTags) { @@ -40,8 +62,8 @@ class SerializingMetricWriterTest extends DDSpecification { when: writer.startBucket(content.size(), startTime, duration) - for (Pair pair : content) { - writer.add(pair.getLeft(), pair.getRight()) + for (AggregateEntry e : content) { + writer.add(e) } writer.finishBucket() @@ -55,88 +77,40 @@ class SerializingMetricWriterTest extends DDSpecification { where: content << [ [ - Pair.of( - new MetricKey( - "resource1", - "service1", - "operation1", - null, - "type", - 0, - false, - false, - "client", + entry( + "resource1", "service1", "operation1", null, "type", 0, + false, false, "client", [ UTF8BytesString.create("country:canada"), UTF8BytesString.create("georegion:amer"), UTF8BytesString.create("peer.service:remote-service") ], - null, - null, - null - ), - new AggregateMetric().recordDurations(10, new AtomicLongArray(1L)) - ), - Pair.of( - new MetricKey( - "resource2", - "service2", - "operation2", - null, - "type2", - 200, - true, - false, - "producer", + null, null, null, + 10), + entry( + "resource2", "service2", "operation2", null, "type2", 200, + true, false, "producer", [ UTF8BytesString.create("country:canada"), UTF8BytesString.create("georegion:amer"), UTF8BytesString.create("peer.service:remote-service") ], - null, - null, - null - ), - new AggregateMetric().recordDurations(9, new AtomicLongArray(1L)) - ), - Pair.of( - new MetricKey( - "GET /api/users/:id", - "web-service", - "http.request", - null, - "web", - 200, - false, - true, - "server", + null, null, null, + 9), + entry( + "GET /api/users/:id", "web-service", "http.request", null, "web", 200, + false, true, "server", [], - "GET", - "/api/users/:id", - null - ), - new AggregateMetric().recordDurations(5, new AtomicLongArray(1L)) - ) + null, null, null, + 5) ], (0..10000).collect({ i -> - Pair.of( - new MetricKey( - "resource" + i, - "service" + i, - "operation" + i, - null, - "type", - 0, - false, - false, - "producer", + entry( + "resource" + i, "service" + i, "operation" + i, null, "type", 0, + false, false, "producer", [UTF8BytesString.create("messaging.destination:dest" + i)], - null, - null, - null - ), - new AggregateMetric().recordDurations(10, new AtomicLongArray(1L)) - ) + null, null, null, + 10) }) ] withProcessTags << [true, false] @@ -148,22 +122,18 @@ class SerializingMetricWriterTest extends DDSpecification { long duration = SECONDS.toNanos(10) WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") - // Create keys with different combinations of HTTP fields - def keyWithNoSource = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null) - def keyWithSource = new MetricKey("resource", "service", "operation", "source", "type", 200, false, false, "server", [], "POST", null, null) + def entryNoSource = entry("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null, 1) + def entryWithSource = entry("resource", "service", "operation", "source", "type", 200, false, false, "server", [], "POST", null, null, 1) - def content = [ - Pair.of(keyWithNoSource, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithSource, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - ] + def content = [entryNoSource, entryWithSource] ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content) SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128) when: writer.startBucket(content.size(), startTime, duration) - for (Pair pair : content) { - writer.add(pair.getLeft(), pair.getRight()) + for (AggregateEntry e : content) { + writer.add(e) } writer.finishBucket() @@ -177,34 +147,25 @@ class SerializingMetricWriterTest extends DDSpecification { long duration = SECONDS.toNanos(10) WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") - // Create keys with different combinations of HTTP fields - def keyWithBoth = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null) - def keyWithMethodOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "POST", null,null) - def keyWithEndpointOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], null, "/api/orders",null) - def keyWithNeither = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "client", [], null, null, null) - - def content = [ - Pair.of(keyWithBoth, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithMethodOnly, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithEndpointOnly, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithNeither, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))) - ] + def entryWithBoth = entry("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null, 1) + def entryWithMethodOnly = entry("resource", "service", "operation", null, "type", 200, false, false, "server", [], "POST", null, null, 1) + def entryWithEndpointOnly = entry("resource", "service", "operation", null, "type", 200, false, false, "server", [], null, "/api/orders", null, 1) + def entryWithNeither = entry("resource", "service", "operation", null, "type", 200, false, false, "client", [], null, null, null, 1) + + def content = [entryWithBoth, entryWithMethodOnly, entryWithEndpointOnly, entryWithNeither] ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content) SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128) when: writer.startBucket(content.size(), startTime, duration) - for (Pair pair : content) { - writer.add(pair.getLeft(), pair.getRight()) + for (AggregateEntry e : content) { + writer.add(e) } writer.finishBucket() then: sink.validatedInput() - // Test passes if validation in ValidatingSink succeeds - // ValidatingSink verifies that map size matches actual number of fields - // and that HTTPMethod/HTTPEndpoint are only present when non-empty } def "add git sha commit info when sha commit is #shaCommit"() { @@ -216,40 +177,63 @@ class SerializingMetricWriterTest extends DDSpecification { long duration = SECONDS.toNanos(10) WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") - // Create keys with different combinations of HTTP fields - def key = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null) + def e = entry("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null, 1) - def content = [Pair.of(key, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),] + def content = [e] ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content) SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128, gitInfoProvider) when: - writer.startBucket(content.size(), startTime, duration) - for (Pair pair : content) { - writer.add(pair.getLeft(), pair.getRight()) + for (AggregateEntry entryItem : content) { + writer.add(entryItem) } writer.finishBucket() then: - sink.validatedInput() where: shaCommit << [null, "123456"] } + def "GRPCStatusCode field is present in payload for rpc-type spans"() { + setup: + long startTime = MILLISECONDS.toNanos(System.currentTimeMillis()) + long duration = SECONDS.toNanos(10) + WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") + + def entryWithGrpc = entry("grpc.service/Method", "grpc-service", "grpc.server", null, "rpc", 0, false, false, "server", [], null, null, "OK", 1) + def entryWithGrpcError = entry("grpc.service/Method", "grpc-service", "grpc.server", null, "rpc", 0, false, false, "client", [], null, null, "NOT_FOUND", 1) + def entryWithoutGrpc = entry("resource", "service", "operation", null, "web", 200, false, false, "server", [], null, null, null, 1) + + def content = [entryWithGrpc, entryWithGrpcError, entryWithoutGrpc] + + ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content) + SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128) + + when: + writer.startBucket(content.size(), startTime, duration) + for (AggregateEntry e : content) { + writer.add(e) + } + writer.finishBucket() + + then: + sink.validatedInput() + } + static class ValidatingSink implements Sink { private final WellKnownTags wellKnownTags private final long startTimeNanos private final long duration private boolean validated = false - private List> content + private List content ValidatingSink(WellKnownTags wellKnownTags, long startTimeNanos, long duration, - List> content) { + List content) { this.wellKnownTags = wellKnownTags this.startTimeNanos = startTimeNanos this.duration = duration @@ -298,83 +282,81 @@ class SerializingMetricWriterTest extends DDSpecification { assert unpacker.unpackString() == "Stats" int statCount = unpacker.unpackArrayHeader() assert statCount == content.size() - for (Pair pair : content) { - MetricKey key = pair.getLeft() - AggregateMetric value = pair.getRight() + for (AggregateEntry entry : content) { int metricMapSize = unpacker.unpackMapHeader() // Calculate expected map size based on optional fields - boolean hasHttpMethod = key.getHttpMethod() != null - boolean hasHttpEndpoint = key.getHttpEndpoint() != null - boolean hasServiceSource = key.getServiceSource() != null - boolean hasGrpcStatusCode = key.getGrpcStatusCode() != null + boolean hasHttpMethod = entry.getHttpMethod() != null + boolean hasHttpEndpoint = entry.getHttpEndpoint() != null + boolean hasServiceSource = entry.getServiceSource() != null + boolean hasGrpcStatusCode = entry.getGrpcStatusCode() != null int expectedMapSize = 15 + (hasServiceSource ? 1 : 0) + (hasHttpMethod ? 1 : 0) + (hasHttpEndpoint ? 1 : 0) + (hasGrpcStatusCode ? 1 : 0) assert metricMapSize == expectedMapSize int elementCount = 0 assert unpacker.unpackString() == "Name" - assert unpacker.unpackString() == key.getOperationName() as String + assert unpacker.unpackString() == entry.getOperationName() as String ++elementCount assert unpacker.unpackString() == "Service" - assert unpacker.unpackString() == key.getService() as String + assert unpacker.unpackString() == entry.getService() as String ++elementCount assert unpacker.unpackString() == "Resource" - assert unpacker.unpackString() == key.getResource() as String + assert unpacker.unpackString() == entry.getResource() as String ++elementCount assert unpacker.unpackString() == "Type" - assert unpacker.unpackString() == key.getType() as String + assert unpacker.unpackString() == entry.getType() as String ++elementCount assert unpacker.unpackString() == "HTTPStatusCode" - assert unpacker.unpackInt() == key.getHttpStatusCode() + assert unpacker.unpackInt() == entry.getHttpStatusCode() ++elementCount assert unpacker.unpackString() == "Synthetics" - assert unpacker.unpackBoolean() == key.isSynthetics() + assert unpacker.unpackBoolean() == entry.isSynthetics() ++elementCount assert unpacker.unpackString() == "IsTraceRoot" - assert unpacker.unpackInt() == (key.isTraceRoot() ? TriState.TRUE.serialValue : TriState.FALSE.serialValue) + assert unpacker.unpackInt() == (entry.isTraceRoot() ? TriState.TRUE.serialValue : TriState.FALSE.serialValue) ++elementCount assert unpacker.unpackString() == "SpanKind" - assert unpacker.unpackString() == key.getSpanKind() as String + assert unpacker.unpackString() == entry.getSpanKind() as String ++elementCount assert unpacker.unpackString() == "PeerTags" int peerTagsLength = unpacker.unpackArrayHeader() - assert peerTagsLength == key.getPeerTags().size() + assert peerTagsLength == entry.getPeerTags().size() for (int i = 0; i < peerTagsLength; i++) { def unpackedPeerTag = unpacker.unpackString() - assert unpackedPeerTag == key.getPeerTags()[i].toString() + assert unpackedPeerTag == entry.getPeerTags()[i].toString() } ++elementCount // Service source is only present when the service name has been overridden by the tracer if (hasServiceSource) { assert unpacker.unpackString() == "srv_src" - assert unpacker.unpackString() == key.getServiceSource().toString() + assert unpacker.unpackString() == entry.getServiceSource().toString() ++elementCount } // HTTPMethod and HTTPEndpoint are optional - only present if non-null if (hasHttpMethod) { assert unpacker.unpackString() == "HTTPMethod" - assert unpacker.unpackString() == key.getHttpMethod() as String + assert unpacker.unpackString() == entry.getHttpMethod() as String ++elementCount } if (hasHttpEndpoint) { assert unpacker.unpackString() == "HTTPEndpoint" - assert unpacker.unpackString() == key.getHttpEndpoint() as String + assert unpacker.unpackString() == entry.getHttpEndpoint() as String ++elementCount } if (hasGrpcStatusCode) { assert unpacker.unpackString() == "GRPCStatusCode" - assert unpacker.unpackString() == key.getGrpcStatusCode() as String + assert unpacker.unpackString() == entry.getGrpcStatusCode() as String ++elementCount } assert unpacker.unpackString() == "Hits" - assert unpacker.unpackInt() == value.getHitCount() + assert unpacker.unpackInt() == entry.getHitCount() ++elementCount assert unpacker.unpackString() == "Errors" - assert unpacker.unpackInt() == value.getErrorCount() + assert unpacker.unpackInt() == entry.getErrorCount() ++elementCount assert unpacker.unpackString() == "TopLevelHits" - assert unpacker.unpackInt() == value.getTopLevelCount() + assert unpacker.unpackInt() == entry.getTopLevelCount() ++elementCount assert unpacker.unpackString() == "Duration" - assert unpacker.unpackLong() == value.getDuration() + assert unpacker.unpackLong() == entry.getDuration() ++elementCount assert unpacker.unpackString() == "OkSummary" validateSketch(unpacker) @@ -397,99 +379,4 @@ class SerializingMetricWriterTest extends DDSpecification { return validated } } - - def "ServiceSource optional in the payload"() { - setup: - long startTime = MILLISECONDS.toNanos(System.currentTimeMillis()) - long duration = SECONDS.toNanos(10) - WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") - - // Create keys with different combinations of HTTP fields - def keyWithNoSource = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null) - def keyWithSource = new MetricKey("resource", "service", "operation", "source", "type", 200, false, false, "server", [], "POST", null, null) - - def content = [ - Pair.of(keyWithNoSource, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithSource, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - ] - - ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content) - SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128) - - when: - writer.startBucket(content.size(), startTime, duration) - for (Pair pair : content) { - writer.add(pair.getLeft(), pair.getRight()) - } - writer.finishBucket() - - then: - sink.validatedInput() - } - - def "GRPCStatusCode field is present in payload for rpc-type spans"() { - setup: - long startTime = MILLISECONDS.toNanos(System.currentTimeMillis()) - long duration = SECONDS.toNanos(10) - WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") - - def keyWithGrpc = new MetricKey("grpc.service/Method", "grpc-service", "grpc.server", null, "rpc", 0, false, false, "server", [], null, null, "OK") - def keyWithGrpcError = new MetricKey("grpc.service/Method", "grpc-service", "grpc.server", null, "rpc", 0, false, false, "client", [], null, null, "NOT_FOUND") - def keyWithoutGrpc = new MetricKey("resource", "service", "operation", null, "web", 200, false, false, "server", [], null, null, null) - - def content = [ - Pair.of(keyWithGrpc, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithGrpcError, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithoutGrpc, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))) - ] - - ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content) - SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128) - - when: - writer.startBucket(content.size(), startTime, duration) - for (Pair pair : content) { - writer.add(pair.getLeft(), pair.getRight()) - } - writer.finishBucket() - - then: - sink.validatedInput() - } - - def "HTTPMethod and HTTPEndpoint fields are optional in payload"() { - setup: - long startTime = MILLISECONDS.toNanos(System.currentTimeMillis()) - long duration = SECONDS.toNanos(10) - WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") - - // Create keys with different combinations of HTTP fields - def keyWithBoth = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null) - def keyWithMethodOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "POST", null, null) - def keyWithEndpointOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], null, "/api/orders", null) - def keyWithNeither = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "client", [], null, null, null) - - def content = [ - Pair.of(keyWithBoth, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithMethodOnly, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithEndpointOnly, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithNeither, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))) - ] - - ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content) - SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128) - - when: - writer.startBucket(content.size(), startTime, duration) - for (Pair pair : content) { - writer.add(pair.getLeft(), pair.getRight()) - } - writer.finishBucket() - - then: - sink.validatedInput() - // Test passes if validation in ValidatingSink succeeds - // ValidatingSink verifies that map size matches actual number of fields - // and that HTTPMethod/HTTPEndpoint are only present when non-empty - } } diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateEntryTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateEntryTest.java new file mode 100644 index 00000000000..7fd767533c7 --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateEntryTest.java @@ -0,0 +1,165 @@ +package datadog.trace.common.metrics; + +import static datadog.trace.common.metrics.AggregateEntry.ERROR_TAG; +import static datadog.trace.common.metrics.AggregateEntry.TOP_LEVEL_TAG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import datadog.metrics.agent.AgentMeter; +import datadog.metrics.api.statsd.StatsDClient; +import datadog.metrics.impl.DDSketchHistograms; +import datadog.metrics.impl.MonitoringImpl; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +class AggregateEntryTest { + + @BeforeAll + static void initAgentMeter() { + // recordOneDuration -> Histogram.accept needs AgentMeter to be initialized. + MonitoringImpl monitoring = new MonitoringImpl(StatsDClient.NO_OP, 1, TimeUnit.SECONDS); + AgentMeter.registerIfAbsent(StatsDClient.NO_OP, monitoring, DDSketchHistograms.FACTORY); + monitoring.newTimer("test.init"); + } + + @Test + void recordOneDurationSumsToTotal() { + AggregateEntry entry = newEntry(); + entry.recordOneDuration(1L); + entry.recordOneDuration(2L); + entry.recordOneDuration(3L); + assertEquals(6, entry.getDuration()); + } + + @Test + void clearResetsAllCounters() { + AggregateEntry entry = newEntry(); + entry.recordOneDuration(5L); + entry.recordOneDuration(ERROR_TAG | 6L); + entry.recordOneDuration(TOP_LEVEL_TAG | 7L); + entry.clear(); + assertEquals(0, entry.getDuration()); + assertEquals(0, entry.getErrorCount()); + assertEquals(0, entry.getTopLevelCount()); + assertEquals(0, entry.getHitCount()); + } + + @Test + void recordOneDurationAccumulatesOkErrorAndTopLevel() { + AggregateEntry entry = newEntry(); + entry.recordOneDuration(10L); + entry.recordOneDuration(10L | TOP_LEVEL_TAG); + entry.recordOneDuration(10L | ERROR_TAG); + + assertEquals(3, entry.getHitCount()); + assertEquals(30, entry.getDuration()); + assertEquals(1, entry.getErrorCount()); + assertEquals(1, entry.getTopLevelCount()); + } + + @Test + void hitCountIncludesErrors() { + AggregateEntry entry = newEntry(); + entry.recordOneDuration(1L); + entry.recordOneDuration(2L); + entry.recordOneDuration(3L | ERROR_TAG); + assertEquals(3, entry.getHitCount()); + assertEquals(1, entry.getErrorCount()); + } + + @Test + void okAndErrorLatenciesTrackedSeparately() { + AggregateEntry entry = newEntry(); + long[] durations = { + 1L, 100L | ERROR_TAG, 2L, 99L | ERROR_TAG, 3L, 98L | ERROR_TAG, 4L, 97L | ERROR_TAG + }; + for (long d : durations) { + entry.recordOneDuration(d); + } + assertTrue(entry.getErrorLatencies().getMaxValue() >= 99); + assertTrue(entry.getOkLatencies().getMaxValue() <= 5); + } + + @Test + void testUtilsEqualsIsConsistentWithHashCodeAcrossDifferentSchemaLayouts() { + // Contract test for AggregateEntryTestUtils (the test-side equality helper used by Spock + // mock matchers). Production AggregateEntry has no equals override. + // + // Two entries with identical encoded peerTags but different raw layouts must not be equal, + // because hashOf folds in the raw arrays. Equality on the encoded list would let them + // collapse while their hashCodes differ -- violating the contract. + // + // A: schema ["a","b"], values [null,"x"] -> encoded ["b:x"] + // B: schema ["b","c"], values ["x",null] -> encoded ["b:x"] + AggregateEntry a = + AggregateEntryTestUtils.forSnapshot( + snapshotWithPeerTags(new String[] {"a", "b"}, new String[] {null, "x"})); + AggregateEntry b = + AggregateEntryTestUtils.forSnapshot( + snapshotWithPeerTags(new String[] {"b", "c"}, new String[] {"x", null})); + + // Sanity: same encoded peer tags, despite different raw layout. + assertEquals(a.getPeerTags(), b.getPeerTags()); + + // Different raw layouts -> entries must not be equal via the test helper. + assertFalse(AggregateEntryTestUtils.equals(a, b)); + // And different hashCodes (matching the inequality). + assertNotEquals(AggregateEntryTestUtils.hashCode(a), AggregateEntryTestUtils.hashCode(b)); + } + + @Test + void testUtilsEqualEntriesHaveEqualHashCodes() { + AggregateEntry a = + AggregateEntryTestUtils.forSnapshot( + snapshotWithPeerTags(new String[] {"a", "b"}, new String[] {null, "x"})); + AggregateEntry b = + AggregateEntryTestUtils.forSnapshot( + snapshotWithPeerTags(new String[] {"a", "b"}, new String[] {null, "x"})); + + assertTrue(AggregateEntryTestUtils.equals(a, b)); + assertEquals(AggregateEntryTestUtils.hashCode(a), AggregateEntryTestUtils.hashCode(b)); + } + + private static SpanSnapshot snapshotWithPeerTags(String[] names, String[] values) { + return new SpanSnapshot( + "resource", + "svc", + "op", + null, + "type", + (short) 200, + false, + true, + "client", + PeerTagSchema.testSchema(names), + values, + null, + null, + null, + 0L); + } + + private static AggregateEntry newEntry() { + SpanSnapshot snapshot = + new SpanSnapshot( + "resource", + "svc", + "op", + null, + "type", + (short) 200, + false, + true, + "client", + null, + null, + null, + null, + null, + 0L); + return AggregateEntryTestUtils.forSnapshot(snapshot); + } +} diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateEntryTestUtils.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateEntryTestUtils.java new file mode 100644 index 00000000000..ed6fd5a3a7e --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateEntryTestUtils.java @@ -0,0 +1,132 @@ +package datadog.trace.common.metrics; + +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import javax.annotation.Nullable; + +/** + * Test-side helpers for {@link AggregateEntry}: a positional-args fixture factory plus a field-wise + * equality contract for use with Spock mock argument matchers and JUnit assertions. Lives in {@code + * src/test} so the production class stays free of test-only API; same {@code + * datadog.trace.common.metrics} package so this helper can reach package-private fields and + * constructors. + * + *

Production {@code AggregateEntry} intentionally has no {@code equals}/{@code hashCode} + * override -- {@link AggregateTable} bucketing goes through {@link AggregateEntry#matches} keyed on + * {@link AggregateEntry#keyHash}, and no production code path invokes {@link Object#equals}. + * + *

The equality helper compares the raw {@code peerTagNames}/{@code peerTagValues} arrays (not + * the encoded {@code peerTags} list) so it stays consistent with {@link AggregateEntry#hashOf}, + * which folds in raw arrays via {@link PeerTagSchema#hashCode()} and {@link + * Arrays#hashCode(Object[])}. Comparing the encoded list would let two entries with different raw + * layouts (e.g. tag {@code "b"} at index 1 in schema A vs index 0 in schema B, with matching + * values) collapse to the same encoded form -- a real bug surfaced during PR #11382 review. + */ +public final class AggregateEntryTestUtils { + private AggregateEntryTestUtils() {} + + /** + * Builds an {@link AggregateEntry} from the same positional shape the prior {@code new + * MetricKey(...)} took. Accepts a pre-encoded {@code List} of {@code + * "name:value"} peer tags and recovers the parallel-array {@code (names, values)} form by + * splitting on the {@code ':'} delimiter. + * + *

Test-only. The split is at the first {@code ':'}, so peer-tag values + * containing a colon (URLs, IPv6 addresses, {@code service:env} patterns) will be silently + * misparsed and the recovered (name, value) pair will be wrong. Keep test data colon-free in + * peer-tag values, or wire a production-style snapshot through {@link #forSnapshot(SpanSnapshot)} + * directly instead. + */ + public static AggregateEntry of( + CharSequence resource, + CharSequence service, + CharSequence operationName, + @Nullable CharSequence serviceSource, + CharSequence type, + int httpStatusCode, + boolean synthetic, + boolean traceRoot, + CharSequence spanKind, + @Nullable List peerTags, + @Nullable CharSequence httpMethod, + @Nullable CharSequence httpEndpoint, + @Nullable CharSequence grpcStatusCode) { + PeerTagSchema schema = null; + String[] values = null; + if (peerTags != null && !peerTags.isEmpty()) { + String[] names = new String[peerTags.size()]; + values = new String[peerTags.size()]; + int i = 0; + for (UTF8BytesString t : peerTags) { + String s = t.toString(); + int colon = s.indexOf(':'); + names[i] = colon < 0 ? s : s.substring(0, colon); + values[i] = colon < 0 ? "" : s.substring(colon + 1); + i++; + } + schema = PeerTagSchema.testSchema(names); + } + SpanSnapshot syntheticSnapshot = + new SpanSnapshot( + resource, + service == null ? null : service.toString(), + operationName, + serviceSource, + type, + (short) httpStatusCode, + synthetic, + traceRoot, + spanKind == null ? null : spanKind.toString(), + schema, + values, + httpMethod == null ? null : httpMethod.toString(), + httpEndpoint == null ? null : httpEndpoint.toString(), + grpcStatusCode == null ? null : grpcStatusCode.toString(), + 0L); + return forSnapshot(syntheticSnapshot); + } + + /** + * Builds an {@link AggregateEntry} from {@code s} by computing its lookup hash via {@link + * AggregateEntry#hashOf(SpanSnapshot)} and calling the package-private constructor directly. + * Production callers route through {@link AggregateTable#findOrInsert} which already has the + * {@code keyHash} on hand; tests rarely do, so this helper hides the second argument. + */ + public static AggregateEntry forSnapshot(SpanSnapshot s) { + return new AggregateEntry(s, AggregateEntry.hashOf(s)); + } + + /** + * Whether {@code a} and {@code b} carry identical label fields. Counter and histogram state is + * intentionally excluded -- this compares the key identity, not the aggregate. + */ + public static boolean equals(AggregateEntry a, AggregateEntry b) { + if (a == b) return true; + if (a == null || b == null) return false; + return a.getHttpStatusCode() == b.getHttpStatusCode() + && a.isSynthetics() == b.isSynthetics() + && a.isTraceRoot() == b.isTraceRoot() + && Objects.equals(a.getResource(), b.getResource()) + && Objects.equals(a.getService(), b.getService()) + && Objects.equals(a.getOperationName(), b.getOperationName()) + && Objects.equals(a.getServiceSource(), b.getServiceSource()) + && Objects.equals(a.getType(), b.getType()) + && Objects.equals(a.getSpanKind(), b.getSpanKind()) + && Arrays.equals(a.peerTagNames, b.peerTagNames) + && Arrays.equals(a.peerTagValues, b.peerTagValues) + && Objects.equals(a.getHttpMethod(), b.getHttpMethod()) + && Objects.equals(a.getHttpEndpoint(), b.getHttpEndpoint()) + && Objects.equals(a.getGrpcStatusCode(), b.getGrpcStatusCode()); + } + + /** + * Stable hash matching {@link #equals(AggregateEntry, AggregateEntry)} -- derived from {@link + * AggregateEntry#keyHash}, which {@link AggregateEntry#hashOf} computes from the same raw fields + * the helper's {@code equals} compares. + */ + public static int hashCode(AggregateEntry e) { + return e == null ? 0 : (int) e.keyHash; + } +} diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java new file mode 100644 index 00000000000..618ead2ab43 --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java @@ -0,0 +1,364 @@ +package datadog.trace.common.metrics; + +import static datadog.trace.common.metrics.AggregateEntry.ERROR_TAG; +import static datadog.trace.common.metrics.AggregateEntry.TOP_LEVEL_TAG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import datadog.metrics.agent.AgentMeter; +import datadog.metrics.api.statsd.StatsDClient; +import datadog.metrics.impl.DDSketchHistograms; +import datadog.metrics.impl.MonitoringImpl; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +class AggregateTableTest { + + @BeforeAll + static void initAgentMeter() { + // AggregateEntry.recordOneDuration -> Histogram.accept needs AgentMeter to be initialized. + MonitoringImpl monitoring = new MonitoringImpl(StatsDClient.NO_OP, 1, TimeUnit.SECONDS); + AgentMeter.registerIfAbsent(StatsDClient.NO_OP, monitoring, DDSketchHistograms.FACTORY); + monitoring.newTimer("test.init"); + } + + @Test + void insertOnMissReturnsNewAggregate() { + AggregateTable table = new AggregateTable(8); + SpanSnapshot s = snapshot("svc", "op", "client"); + + AggregateEntry agg = table.findOrInsert(s); + + assertNotNull(agg); + assertEquals(1, table.size()); + assertEquals(0, agg.getHitCount()); + } + + @Test + void hitReturnsSameAggregateInstance() { + AggregateTable table = new AggregateTable(8); + SpanSnapshot s1 = snapshot("svc", "op", "client"); + SpanSnapshot s2 = snapshot("svc", "op", "client"); + + AggregateEntry first = table.findOrInsert(s1); + AggregateEntry second = table.findOrInsert(s2); + + assertSame(first, second); + assertEquals(1, table.size()); + } + + @Test + void differentKindFieldsAreDistinct() { + AggregateTable table = new AggregateTable(8); + + AggregateEntry clientAgg = table.findOrInsert(snapshot("svc", "op", "client")); + AggregateEntry serverAgg = table.findOrInsert(snapshot("svc", "op", "server")); + + assertNotSame(clientAgg, serverAgg); + assertEquals(2, table.size()); + } + + @Test + void peerTagPairsParticipateInIdentity() { + AggregateTable table = new AggregateTable(8); + SpanSnapshot withTags = + builder("svc", "op", "client").peerTags("peer.hostname", "host-a").build(); + SpanSnapshot otherTags = + builder("svc", "op", "client").peerTags("peer.hostname", "host-b").build(); + SpanSnapshot noTags = builder("svc", "op", "client").build(); + + AggregateEntry a = table.findOrInsert(withTags); + AggregateEntry b = table.findOrInsert(otherTags); + AggregateEntry c = table.findOrInsert(noTags); + + assertNotSame(a, b); + assertNotSame(a, c); + assertNotSame(b, c); + assertEquals(3, table.size()); + } + + @Test + void capOverrunEvictsStaleEntry() { + AggregateTable table = new AggregateTable(2); + + AggregateEntry stale = table.findOrInsert(snapshot("svc-a", "op", "client")); + // do not record on stale -> hitCount stays at 0 + + AggregateEntry live = table.findOrInsert(snapshot("svc-b", "op", "client")); + live.recordOneDuration(10L | TOP_LEVEL_TAG); // hitCount=1, not evictable + + // table is full (size=2). Inserting a third should evict the stale one and succeed. + AggregateEntry newcomer = table.findOrInsert(snapshot("svc-c", "op", "client")); + assertNotNull(newcomer); + assertEquals(2, table.size()); + + // re-inserting the stale snapshot should miss now (it was evicted) and produce a fresh entry + AggregateEntry staleAgain = table.findOrInsert(snapshot("svc-a", "op", "client")); + assertNotSame(stale, staleAgain); + } + + @Test + void backToBackEvictionsAllSucceed() { + // Cursor amortization regression: cap the table, fill with stale entries, then force a + // sequence of cap-overrun inserts. Each insert must succeed (evicting one stale entry and + // inserting one new). The cursor field is internal, but if it were ever wedged (e.g. + // pointing past the end of buckets, or not advancing after a successful eviction), some + // later insert would fail to find a stale entry. Drives ~3x the capacity worth of inserts to + // give wrap-around plenty of chances to misbehave. + AggregateTable table = new AggregateTable(8); + for (int i = 0; i < 8; i++) { + table.findOrInsert(snapshot("init-" + i, "op", "client")); + } + for (int i = 0; i < 32; i++) { + AggregateEntry inserted = table.findOrInsert(snapshot("post-" + i, "op", "client")); + assertNotNull( + inserted, "insert #" + i + " should evict a stale entry and succeed (table full)"); + } + assertEquals(8, table.size()); + } + + @Test + void clearResetsCursorForSubsequentEvictions() { + // The cursor must reset to 0 on clear so a re-filled table doesn't start eviction at a + // stale bucket index. Verified indirectly: clear and re-fill, then force an eviction; the + // newcomer must successfully take a slot (which only works if a stale entry was found). + AggregateTable table = new AggregateTable(4); + + // Fill, age, evict once -- cursor lands at some non-zero bucket + for (int i = 0; i < 4; i++) { + table.findOrInsert(snapshot("warm-" + i, "op", "client")); + } + table.findOrInsert(snapshot("evict-trigger", "op", "client")); + + table.clear(); + assertEquals(0, table.size()); + + // Re-fill, age, force eviction -- should still find a stale entry from bucket 0 onward + for (int i = 0; i < 4; i++) { + table.findOrInsert(snapshot("fresh-" + i, "op", "client")); + } + AggregateEntry newcomer = table.findOrInsert(snapshot("post-clear", "op", "client")); + assertNotNull(newcomer, "post-clear cap-overrun insert must succeed via cursor-reset evict"); + } + + @Test + void capOverrunWithNoStaleReturnsNull() { + AggregateTable table = new AggregateTable(2); + + AggregateEntry a = table.findOrInsert(snapshot("svc-a", "op", "client")); + AggregateEntry b = table.findOrInsert(snapshot("svc-b", "op", "client")); + a.recordOneDuration(10L); + b.recordOneDuration(20L); + + AggregateEntry c = table.findOrInsert(snapshot("svc-c", "op", "client")); + assertNull(c); + assertEquals(2, table.size()); + } + + @Test + void expungeStaleAggregatesRemovesZeroHitsOnly() { + AggregateTable table = new AggregateTable(16); + + AggregateEntry live = table.findOrInsert(snapshot("svc-live", "op", "client")); + live.recordOneDuration(10L); + AggregateEntry stale1 = table.findOrInsert(snapshot("svc-stale1", "op", "client")); + AggregateEntry stale2 = table.findOrInsert(snapshot("svc-stale2", "op", "client")); + assertEquals(3, table.size()); + assertEquals(0, stale1.getHitCount()); + assertEquals(0, stale2.getHitCount()); + + table.expungeStaleAggregates(); + + assertEquals(1, table.size()); + // the live entry must still be reachable + assertSame(live, table.findOrInsert(snapshot("svc-live", "op", "client"))); + } + + @Test + void forEachVisitsEveryEntry() { + AggregateTable table = new AggregateTable(8); + table.findOrInsert(snapshot("a", "op", "client")).recordOneDuration(1L); + table.findOrInsert(snapshot("b", "op", "client")).recordOneDuration(2L); + table.findOrInsert(snapshot("c", "op", "client")).recordOneDuration(3L | ERROR_TAG); + + Map visited = new HashMap<>(); + table.forEach(e -> visited.put(e.getService().toString(), e.getDuration())); + + assertEquals(3, visited.size()); + assertEquals(1L, visited.get("a")); + assertEquals(2L, visited.get("b")); + assertEquals(3L, visited.get("c")); + } + + @Test + void clearEmptiesTheTable() { + AggregateTable table = new AggregateTable(8); + table.findOrInsert(snapshot("a", "op", "client")); + table.findOrInsert(snapshot("b", "op", "client")); + assertEquals(2, table.size()); + + table.clear(); + + assertTrue(table.isEmpty()); + assertEquals(0, table.size()); + // and re-insertion works after clear + assertNotNull(table.findOrInsert(snapshot("a", "op", "client"))); + } + + @Test + void encodedLabelsAreBuiltOnInsert() { + AggregateTable table = new AggregateTable(4); + List seen = new ArrayList<>(); + table.findOrInsert(snapshot("svc", "op", "client")); + table.forEach(seen::add); + + assertEquals(1, seen.size()); + AggregateEntry e = seen.get(0); + assertEquals("svc", e.getService().toString()); + assertEquals("op", e.getOperationName().toString()); + assertEquals("client", e.getSpanKind().toString()); + } + + @Test + void nullAndEmptyOptionalFieldsCollapseToOneEntry() { + // null and length-zero are treated as equivalent for optional fields, so snapshots that + // differ only in null-vs-"" land on the same entry. + AggregateTable table = new AggregateTable(8); + + SpanSnapshot snapNull = nullableSnapshot(null, null, null, null); + SpanSnapshot snapEmpty = nullableSnapshot("", "", "", ""); + + AggregateEntry first = table.findOrInsert(snapNull); + AggregateEntry secondNull = table.findOrInsert(nullableSnapshot(null, null, null, null)); + AggregateEntry forEmpty = table.findOrInsert(snapEmpty); + + assertSame(first, secondNull, "two null-fielded snapshots must hit the same entry"); + assertSame(first, forEmpty, "null- and empty-fielded snapshots must hit the same entry"); + assertEquals(1, table.size()); + } + + @Test + void nullServiceAndSpanKindDoNotNpeAndCollapseWithEmpty() { + // Null service and spanKind are accepted (canonicalize to length-zero) and collapse with + // empty-string variants onto the same entry. + AggregateTable table = new AggregateTable(8); + + SpanSnapshot allNulls = nullServiceKindSnapshot(null, null); + SpanSnapshot allEmpty = nullServiceKindSnapshot("", ""); + + AggregateEntry first = table.findOrInsert(allNulls); + AggregateEntry secondNull = table.findOrInsert(nullServiceKindSnapshot(null, null)); + AggregateEntry forEmpty = table.findOrInsert(allEmpty); + + assertSame(first, secondNull, "two null-service/-kind snapshots must hit the same entry"); + assertSame(first, forEmpty, "null- and empty-service/-kind snapshots must hit the same entry"); + assertEquals(1, table.size()); + assertEquals(0, first.getService().length(), "null serviceName should canonicalize to EMPTY"); + assertEquals(0, first.getSpanKind().length(), "null spanKind should canonicalize to EMPTY"); + } + + private static SpanSnapshot nullServiceKindSnapshot(String service, String spanKind) { + return new SpanSnapshot( + "resource", + service, + "op", + null, + "web", + (short) 200, + false, + true, + spanKind, + null, + null, + null, + null, + null, + 0L); + } + + private static SpanSnapshot nullableSnapshot( + String resource, String operation, String type, String serviceNameSource) { + return new SpanSnapshot( + resource, + "svc", + operation, + serviceNameSource, + type, + (short) 200, + false, + true, + "client", + null, + null, + null, + null, + null, + 0L); + } + + // ---------- helpers ---------- + + private static SpanSnapshot snapshot(String service, String operation, String spanKind) { + return builder(service, operation, spanKind).build(); + } + + private static SnapshotBuilder builder(String service, String operation, String spanKind) { + return new SnapshotBuilder(service, operation, spanKind); + } + + private static final class SnapshotBuilder { + private final String service; + private final String operation; + private final String spanKind; + private PeerTagSchema peerTagSchema; + private String[] peerTagValues; + private long tagAndDuration = 0L; + + SnapshotBuilder(String service, String operation, String spanKind) { + this.service = service; + this.operation = operation; + this.spanKind = spanKind; + } + + SnapshotBuilder peerTags(String... namesAndValues) { + int pairCount = namesAndValues.length / 2; + String[] names = new String[pairCount]; + String[] values = new String[pairCount]; + for (int i = 0; i < pairCount; i++) { + names[i] = namesAndValues[2 * i]; + values[i] = namesAndValues[2 * i + 1]; + } + this.peerTagSchema = PeerTagSchema.testSchema(names); + this.peerTagValues = values; + return this; + } + + SpanSnapshot build() { + return new SpanSnapshot( + "resource", + service, + operation, + null, + "web", + (short) 200, + false, + true, + spanKind, + peerTagSchema, + peerTagValues, + null, + null, + null, + tagAndDuration); + } + } +} diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBootstrapTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBootstrapTest.java index ef07e0fbc19..59681d4724e 100644 --- a/dd-trace-core/src/test/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBootstrapTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBootstrapTest.java @@ -217,9 +217,9 @@ void reconcileSurvivesStateChangeWhenTagsUnchanged() throws Exception { void reconcileSwapsSchemaWhenTagSetChanges() throws Exception { // The reconcile slow-path's swap branch: discovery refreshes the state AND the tag set // grows. Cached schema is rebuilt and the volatile reference points at the new schema. - // Verification is end-to-end -- we look at the MetricKey the writer receives. Pre-swap the - // span snapshot was pinned to the old schema so only peer.hostname appears; post-swap a new - // publish reads the new schema and the next flush carries both peer tags. + // Verification is end-to-end -- we look at the AggregateEntry the writer receives. Pre-swap + // the span snapshot was pinned to the old schema so only peer.hostname appears; post-swap a + // new publish reads the new schema and the next flush carries both peer tags. HealthMetrics healthMetrics = mock(HealthMetrics.class); MetricWriter writer = mock(MetricWriter.class); Sink sink = mock(Sink.class); @@ -266,8 +266,8 @@ void reconcileSwapsSchemaWhenTagSetChanges() throws Exception { .finishBucket(); // Publish 1: snapshot pinned to the original {peer.hostname} schema. cycle 1's reconcile - // will swap the cached schema BEFORE the flush, but this snapshot is already pinned so its - // MetricKey will still carry only peer.hostname. + // will swap the cached schema BEFORE the flush, but this snapshot is already pinned so the + // resulting AggregateEntry will still carry only peer.hostname. aggregator.publish( Collections.>singletonList(peerAggregationSpanWithBothPeerTags())); aggregator.report(); @@ -280,20 +280,20 @@ void reconcileSwapsSchemaWhenTagSetChanges() throws Exception { aggregator.report(); assertTrue(cycle2.await(2, SECONDS)); - // Capture every (MetricKey, AggregateMetric) the writer saw across both cycles. Pre-swap - // snapshot has 1 peer tag, post-swap has 2. - ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(MetricKey.class); - verify(writer, times(2)).add(keyCaptor.capture(), any(AggregateMetric.class)); - List keys = keyCaptor.getAllValues(); + // Capture every AggregateEntry the writer saw across both cycles. Pre-swap snapshot has 1 + // peer tag, post-swap has 2. + ArgumentCaptor entryCaptor = ArgumentCaptor.forClass(AggregateEntry.class); + verify(writer, times(2)).add(entryCaptor.capture()); + List entries = entryCaptor.getAllValues(); assertEquals( Collections.singletonList(UTF8BytesString.create("peer.hostname:localhost")), - keys.get(0).getPeerTags(), + entries.get(0).getPeerTags(), "pre-swap snapshot should encode only peer.hostname"); assertEquals( Arrays.asList( UTF8BytesString.create("peer.hostname:localhost"), UTF8BytesString.create("peer.service:billing")), - keys.get(1).getPeerTags(), + entries.get(1).getPeerTags(), "post-swap snapshot should encode both peer.hostname and peer.service"); // Bootstrap (1) + cycle 1 slow-path (1) -- cycle 2 is fast-path so doesn't reach peerTags(). diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorDisableTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorDisableTest.java new file mode 100644 index 00000000000..d072371d25d --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorDisableTest.java @@ -0,0 +1,234 @@ +package datadog.trace.common.metrics; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.after; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import datadog.communication.ddagent.DDAgentFeaturesDiscovery; +import datadog.trace.bootstrap.instrumentation.api.Tags; +import datadog.trace.core.CoreSpan; +import datadog.trace.core.SpanKindFilter; +import datadog.trace.core.monitor.HealthMetrics; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +/** + * Coverage for the {@code disable() -> ClearSignal.CLEAR} threading routing introduced in this PR. + * + *

The bundled fix routes the agent-downgrade clear through the inbox so the aggregator thread + * stays the sole writer to {@link AggregateTable} (which is not thread-safe). The behavioral + * contract this test pins: + * + *

+ * + *

The test would fail if {@code disable()} reverted to mutating {@code AggregateTable} directly + * (the pre-fix path) only via races -- not deterministically -- so the assertions here are about + * the observable end-to-end shape rather than thread identity. + */ +class ConflatingMetricsAggregatorDisableTest { + + @Test + void downgradeRoutesClearThroughInboxBeforeNextReport() throws Exception { + HealthMetrics healthMetrics = mock(HealthMetrics.class); + MetricWriter writer = mock(MetricWriter.class); + Sink sink = mock(Sink.class); + DDAgentFeaturesDiscovery features = mock(DDAgentFeaturesDiscovery.class); + when(features.supportsMetrics()).thenReturn(true); + when(features.peerTags()).thenReturn(Collections.emptySet()); + when(features.state()).thenReturn("state-1"); + + ConflatingMetricsAggregator aggregator = + new ConflatingMetricsAggregator( + Collections.emptySet(), + features, + healthMetrics, + sink, + writer, + /* maxAggregates */ 16, + /* queueSize */ 64, + /* reportingInterval */ 10, + SECONDS, + /* includeEndpointInMetrics */ false); + aggregator.start(); + try { + // Baseline: publish a span, run a report, verify the table flushes normally. This gives + // us a clean post-first-report state with the aggregator's reconcile already having fired + // once on the aggregator thread. + CountDownLatch firstFlush = new CountDownLatch(1); + org.mockito.Mockito.doAnswer( + invocation -> { + firstFlush.countDown(); + return null; + }) + .when(writer) + .finishBucket(); + + aggregator.publish(Collections.>singletonList(metricsEligibleSpan())); + aggregator.report(); + assertTrue(firstFlush.await(2, SECONDS)); + + // Reset writer-side mock interactions so the post-disable verify() blocks below only see + // what happens after the downgrade. features mock keeps accumulating call counts -- we use + // those counts as a latch on aggregator-thread reconcile timing. + reset(writer); + + // Flip the discovery state. disable()'s first action is features.discover() followed by a + // features.supportsMetrics() check; returning false here selects the clear path. + when(features.supportsMetrics()).thenReturn(false); + + // Fire DOWNGRADED on the test thread. This is the production scenario where the OkHttpSink + // callback thread triggers onEvent. disable() offers ClearSignal.CLEAR to the inbox but + // does not (and must not) mutate AggregateTable directly here. + aggregator.onEvent(EventListener.EventType.DOWNGRADED, ""); + + // First: verify nothing flushes immediately after disable. We can't pin reconcile-on-the- + // aggregator-thread as a latch here because CLEAR's inbox.clear() drops any REPORT we'd + // queue behind it -- so we just wait a window for any flush attempt to materialize. + verify(writer, after(500).never()).startBucket(anyInt(), anyLong(), anyLong()); + + // Stronger contract: prove the table is actually empty after CLEAR by re-enabling metrics + // and publishing a *marker* span with a distinct resource name. The next report should + // flush exactly one entry -- the marker -- with the original "resource" gone. If disable() + // had failed to clear the table (or had cleared it from the wrong thread and corrupted + // bucket chains), this assertion would catch it. + when(features.supportsMetrics()).thenReturn(true); + CountDownLatch postClearFlush = new CountDownLatch(1); + org.mockito.Mockito.doAnswer( + invocation -> { + postClearFlush.countDown(); + return null; + }) + .when(writer) + .finishBucket(); + aggregator.publish(Collections.>singletonList(markerSpan())); + aggregator.report(); + assertTrue(postClearFlush.await(2, SECONDS)); + + ArgumentCaptor entryCaptor = ArgumentCaptor.forClass(AggregateEntry.class); + verify(writer, times(1)).add(entryCaptor.capture()); + assertEquals( + "marker-resource", + entryCaptor.getValue().getResource().toString(), + "post-CLEAR bucket should contain only the marker -- the original entry was wiped"); + } finally { + aggregator.close(); + } + } + + @Test + void clearDoesNotTrampleQueuedStopSignal() throws Exception { + // CLEAR handler clears only the aggregates table; queued signals (STOP, REPORT) survive and + // get processed normally. + HealthMetrics healthMetrics = mock(HealthMetrics.class); + MetricWriter writer = mock(MetricWriter.class); + Sink sink = mock(Sink.class); + DDAgentFeaturesDiscovery features = mock(DDAgentFeaturesDiscovery.class); + when(features.supportsMetrics()).thenReturn(true); + when(features.peerTags()).thenReturn(Collections.emptySet()); + when(features.state()).thenReturn("state-1"); + + ConflatingMetricsAggregator aggregator = + new ConflatingMetricsAggregator( + Collections.emptySet(), + features, + healthMetrics, + sink, + writer, + /* maxAggregates */ 16, + /* queueSize */ 64, + /* reportingInterval */ 10, + SECONDS, + /* includeEndpointInMetrics */ false); + aggregator.start(); + + // Force at least one snapshot into the inbox so the aggregator has something to drain. + aggregator.publish(Collections.>singletonList(metricsEligibleSpan())); + + // Fire DOWNGRADED on this thread. disable() flips supportsMetrics() to false and offers + // CLEAR. Then immediately call close() which offers STOP. If CLEAR's handler clears the + // inbox, STOP gets trampled and close() hangs until the join timeout. + when(features.supportsMetrics()).thenReturn(false); + aggregator.onEvent(EventListener.EventType.DOWNGRADED, ""); + + // close() is synchronous; bound it ourselves rather than trusting THREAD_JOIN_TIMEOUT_MS. + long deadlineNanos = System.nanoTime() + java.util.concurrent.TimeUnit.SECONDS.toNanos(2); + Thread closer = new Thread(aggregator::close, "test-closer"); + closer.start(); + while (closer.isAlive() && System.nanoTime() < deadlineNanos) { + closer.join(50); + } + assertTrue( + !closer.isAlive(), + "close() must return promptly -- if CLEAR trampled STOP, this hangs out the join timeout"); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private static CoreSpan metricsEligibleSpan() { + CoreSpan span = mock(CoreSpan.class); + when(span.isMeasured()).thenReturn(false); + when(span.isTopLevel()).thenReturn(true); + // Return true for any SpanKindFilter so peerTagSchemaFor enters the bootstrap path on the + // first publish. We want that bootstrap to fire (it's what makes features.state() + // observable), even though peerTags() returns emptySet here and the resulting schema has + // size 0. + when(span.isKind(any(SpanKindFilter.class))).thenReturn(true); + when(span.getLongRunningVersion()).thenReturn(0); + when(span.getDurationNano()).thenReturn(100L); + when(span.getError()).thenReturn(0); + when(span.getResourceName()).thenReturn("resource"); + when(span.getServiceName()).thenReturn("svc"); + when(span.getOperationName()).thenReturn("op"); + when(span.getServiceNameSource()).thenReturn(null); + when(span.getType()).thenReturn("web"); + when(span.getHttpStatusCode()).thenReturn((short) 200); + when(span.getParentId()).thenReturn(0L); + when(span.getOrigin()).thenReturn(null); + when(span.unsafeGetTag(eq(Tags.SPAN_KIND), any(CharSequence.class))).thenReturn("client"); + return span; + } + + /** + * Distinct from {@link #metricsEligibleSpan()} via the resource name: post-CLEAR the writer + * should see "marker-resource", proving the original "resource" entry is gone from the table. + */ + @SuppressWarnings({"rawtypes", "unchecked"}) + private static CoreSpan markerSpan() { + CoreSpan span = mock(CoreSpan.class); + when(span.isMeasured()).thenReturn(false); + when(span.isTopLevel()).thenReturn(true); + when(span.isKind(any(SpanKindFilter.class))).thenReturn(true); + when(span.getLongRunningVersion()).thenReturn(0); + when(span.getDurationNano()).thenReturn(100L); + when(span.getError()).thenReturn(0); + when(span.getResourceName()).thenReturn("marker-resource"); + when(span.getServiceName()).thenReturn("svc"); + when(span.getOperationName()).thenReturn("op"); + when(span.getServiceNameSource()).thenReturn(null); + when(span.getType()).thenReturn("web"); + when(span.getHttpStatusCode()).thenReturn((short) 200); + when(span.getParentId()).thenReturn(0L); + when(span.getOrigin()).thenReturn(null); + when(span.unsafeGetTag(eq(Tags.SPAN_KIND), any(CharSequence.class))).thenReturn("client"); + return span; + } +} diff --git a/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy b/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy index 2972ffa2c18..4c4ee81b276 100644 --- a/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy +++ b/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy @@ -7,15 +7,14 @@ import datadog.metrics.api.Histograms import datadog.metrics.impl.DDSketchHistograms import datadog.trace.api.Config import datadog.trace.api.WellKnownTags -import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString -import datadog.trace.common.metrics.AggregateMetric +import datadog.trace.common.metrics.AggregateEntry import datadog.trace.common.metrics.EventListener -import datadog.trace.common.metrics.MetricKey import datadog.trace.common.metrics.OkHttpSink +import datadog.trace.common.metrics.PeerTagSchema import datadog.trace.common.metrics.SerializingMetricWriter +import datadog.trace.common.metrics.SpanSnapshot import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.CountDownLatch -import java.util.concurrent.atomic.AtomicLongArray import okhttp3.HttpUrl class MetricsIntegrationTest extends AbstractTraceAgentTest { @@ -39,14 +38,22 @@ class MetricsIntegrationTest extends AbstractTraceAgentTest { sink ) writer.startBucket(2, System.nanoTime(), SECONDS.toNanos(10)) - writer.add( - new MetricKey("resource1", "service1", "operation1", null, "sql", 0, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")], null, null, null), - new AggregateMetric().recordDurations(5, new AtomicLongArray(2, 1, 2, 250, 4, 5)) - ) - writer.add( - new MetricKey("resource2", "service2", "operation2", null, "web", 200, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")], null, null, null), - new AggregateMetric().recordDurations(10, new AtomicLongArray(1, 1, 200, 2, 3, 4, 5, 6, 7, 8, 9)) - ) + // Build entries via the production AggregateEntry.forSnapshot(snap, keyHash) path -- same + // construction as AggregateTable.findOrInsert. Both entries use one peer tag (grault:quux) + // -> schema names=["grault"], values=["quux"]. + PeerTagSchema schema = PeerTagSchema.testSchema(["grault"] as String[]) + SpanSnapshot snap1 = new SpanSnapshot( + "resource1", "service1", "operation1", null, "sql", (short) 0, + false, true, "xyzzy", schema, ["quux"] as String[], null, null, null, 0L) + def entry1 = new AggregateEntry(snap1, AggregateEntry.hashOf(snap1)) + [2, 1, 2, 250, 4].each { entry1.recordOneDuration(it as long) } + writer.add(entry1) + SpanSnapshot snap2 = new SpanSnapshot( + "resource2", "service2", "operation2", null, "web", (short) 200, + false, true, "xyzzy", schema, ["quux"] as String[], null, null, null, 0L) + def entry2 = new AggregateEntry(snap2, AggregateEntry.hashOf(snap2)) + [1, 1, 200, 2, 3, 4, 5, 6, 7, 8].each { entry2.recordOneDuration(it as long) } + writer.add(entry2) writer.finishBucket() then: diff --git a/internal-api/src/main/java/datadog/trace/util/Hashtable.java b/internal-api/src/main/java/datadog/trace/util/Hashtable.java index 8f40e4609bc..ff3202c1f33 100644 --- a/internal-api/src/main/java/datadog/trace/util/Hashtable.java +++ b/internal-api/src/main/java/datadog/trace/util/Hashtable.java @@ -482,7 +482,24 @@ MutatingBucketIterator mutatingBucketIterator( */ public static final MutatingTableIterator mutatingTableIterator(Hashtable.Entry[] buckets) { - return new MutatingTableIterator(buckets); + return new MutatingTableIterator(buckets, 0, buckets.length); + } + + /** + * Variant of {@link #mutatingTableIterator(Hashtable.Entry[])} that walks only the half-open + * bucket range {@code [startBucket, endBucket)}. Useful for resumable sweeps -- e.g. cursor- + * based eviction in {@code AggregateTable} -- where one call drives {@code [cursor, length)} + * and a wrap-around call drives {@code [0, cursor)}. The iterator does not wrap around + * within a single instance; callers compose two iterators when wrap-around is desired. An empty + * range ({@code startBucket == endBucket}) produces an immediately exhausted iterator. + * + * @param startBucket inclusive lower bound; must be in {@code [0, buckets.length]}. + * @param endBucket exclusive upper bound; must be in {@code [startBucket, buckets.length]}. + */ + public static final + MutatingTableIterator mutatingTableIterator( + Hashtable.Entry[] buckets, int startBucket, int endBucket) { + return new MutatingTableIterator(buckets, startBucket, endBucket); } public static final int bucketIndex(Object[] buckets, long keyHash) { @@ -752,6 +769,9 @@ public static final class MutatingTableIterator implements Iterator { private final Hashtable.Entry[] buckets; + /** Exclusive upper bound for bucket indices visited by this iterator. */ + private final int endBucket; + /** * Index of the bucket holding {@link #nextEntry} (or holding {@link #curEntry} after remove). */ @@ -782,9 +802,34 @@ public static final class MutatingTableIterator */ private Hashtable.Entry curEntry; - MutatingTableIterator(Hashtable.Entry[] buckets) { + MutatingTableIterator(Hashtable.Entry[] buckets, int startBucket, int endBucket) { this.buckets = buckets; - seekFromBucket(0); + if (startBucket < 0 || startBucket > buckets.length) { + throw new IndexOutOfBoundsException( + "startBucket " + startBucket + " out of range [0, " + buckets.length + "]"); + } + if (endBucket < startBucket || endBucket > buckets.length) { + throw new IndexOutOfBoundsException( + "endBucket " + + endBucket + + " out of range [" + + startBucket + + ", " + + buckets.length + + "]"); + } + this.endBucket = endBucket; + seekFromBucket(startBucket); + } + + /** + * Bucket index of the entry last returned by {@link #next()}, or {@code -1} if {@code next} has + * not yet been called or the most recent call was {@link #remove()}. Useful for callers driving + * a cursor — e.g. resumable eviction sweeps that want to remember where the last successful + * removal landed. + */ + public int currentBucket() { + return this.curBucketIndex; } @Override @@ -841,12 +886,12 @@ public void remove() { } /** - * Advance {@code nextBucketIndex} / {@code nextEntry} to the first non-empty bucket >= {@code - * from}. + * Advance {@code nextBucketIndex} / {@code nextEntry} to the first non-empty bucket {@code >= + * from} within {@code [0, endBucket)}. */ private void seekFromBucket(int from) { Hashtable.Entry[] thisBuckets = this.buckets; - for (int i = from; i < thisBuckets.length; i++) { + for (int i = from; i < this.endBucket; i++) { Hashtable.Entry head = thisBuckets[i]; if (head != null) { this.nextBucketIndex = i; diff --git a/internal-api/src/test/java/datadog/trace/util/HashtableTest.java b/internal-api/src/test/java/datadog/trace/util/HashtableTest.java index 2992279be6d..953453ca3aa 100644 --- a/internal-api/src/test/java/datadog/trace/util/HashtableTest.java +++ b/internal-api/src/test/java/datadog/trace/util/HashtableTest.java @@ -349,5 +349,64 @@ void removeTwiceWithoutInterveningNextThrows() { it.remove(); assertThrows(IllegalStateException.class, it::remove); } + + @Test + void halfOpenRangeOmitsBucketsOutsideTheRange() { + // CollidingKey lets us pin entries to specific buckets via controlled hashCode. 16-slot + // table -> bucketIndex = hash & 15. Place entries in buckets 0, 5, and 10; iterate + // [5, 10) -- should see only bucket 5. + Hashtable.D1 table = new Hashtable.D1<>(16); + table.insert(new CollidingKeyEntry(new CollidingKey("b0", 0), 1)); + table.insert(new CollidingKeyEntry(new CollidingKey("b5", 5), 2)); + table.insert(new CollidingKeyEntry(new CollidingKey("b10", 10), 3)); + + Set seen = new HashSet<>(); + for (MutatingTableIterator it = + Support.mutatingTableIterator(table.buckets, 5, 10); + it.hasNext(); ) { + seen.add(it.next().key.label); + } + assertEquals(1, seen.size()); + assertTrue(seen.contains("b5")); + } + + @Test + void emptyHalfOpenRangeIsExhausted() { + // start == end -> immediately-exhausted iterator. Important: this is the wrap-around + // pass [0, cursor) when cursor == 0 in resumable sweeps. + Hashtable.D1 table = new Hashtable.D1<>(8); + table.insert(new StringIntEntry("a", 1)); + MutatingTableIterator it = Support.mutatingTableIterator(table.buckets, 0, 0); + assertFalse(it.hasNext()); + } + + @Test + void rangeBoundsOutOfOrderThrows() { + Hashtable.D1 table = new Hashtable.D1<>(8); + assertThrows( + IndexOutOfBoundsException.class, + () -> Support.mutatingTableIterator(table.buckets, -1, 4)); + assertThrows( + IndexOutOfBoundsException.class, + () -> Support.mutatingTableIterator(table.buckets, 4, 2)); // end < start + assertThrows( + IndexOutOfBoundsException.class, + () -> + Support.mutatingTableIterator( + table.buckets, 0, table.buckets.length + 1)); // end > len + } + + @Test + void currentBucketReportsLandingIndex() { + // Pin one entry to a known bucket and check currentBucket() after next() reports that + // bucket. Before any next() (or after remove()), currentBucket() returns -1. + Hashtable.D1 table = new Hashtable.D1<>(16); + table.insert(new CollidingKeyEntry(new CollidingKey("b3", 3), 1)); + + MutatingTableIterator it = Support.mutatingTableIterator(table.buckets); + assertEquals(-1, it.currentBucket(), "before any next() currentBucket should be -1"); + it.next(); + assertEquals(3, it.currentBucket(), "currentBucket should report the entry's bucket"); + } } }