Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
1045a85
feat: wall-clock precheck and signal suppression
kaahos May 28, 2026
aed7b1a
Merge branch 'main' into paul.fournillon/wallclock-suppression
kaahos May 28, 2026
7a250b6
fix
kaahos May 29, 2026
6028fdd
Merge branch 'main' into paul.fournillon/wallclock-suppression
kaahos May 29, 2026
1e1bcd1
fix: fix build + tests
kaahos May 29, 2026
b1cb73f
fix: fix mem leaks in tests
kaahos May 29, 2026
a3a9462
fix: track wall precheck block state in thread filter
kaahos Jun 1, 2026
137065c
fix: arm wall precheck after recording sample
kaahos Jun 1, 2026
c7caa46
fix: include wait states in wall precheck suppression
kaahos Jun 1, 2026
55073d0
Fix ProfiledThread ownership in park_state_ut
kaahos Jun 1, 2026
619449a
Add Java block-state bridge for wall-clock precheck
kaahos Jun 1, 2026
1cd0f8b
Fix wall-clock thread filter reset
kaahos Jun 2, 2026
3ee7f42
Gate wall-clock precheck on untraced context
kaahos Jun 2, 2026
6bda356
Merge branch 'main' into paul.fournillon/wallclock-suppression
kaahos Jun 3, 2026
8bb1fed
fix: avoid exact suppression for unowned blocked states
kaahos Jun 8, 2026
f82fe70
Merge branch 'main' into paul.fournillon/wallclock-suppression
kaahos Jun 9, 2026
6ece88d
fix: address ownership correctness review
kaahos Jun 11, 2026
110abaa
fix: address thread filter review
kaahos Jun 11, 2026
e9f53b9
fix: factorize code and add support for jvmti
kaahos Jun 12, 2026
e2d60da
fix: fix wall-clock counters and misleading comment
kaahos Jun 12, 2026
5ab946c
Merge branch 'main' into paul.fournillon/wallclock-suppression
kaahos Jun 12, 2026
37df0a5
fix: apply review about test and unused stuff
kaahos Jun 12, 2026
43471ee
fix: apply review about drainSuppressedSampledRun
kaahos Jun 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ddprof-lib/src/main/cpp/arguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,14 @@ Error Arguments::parse(const char *args) {
_jvmtistacks = true;
}

CASE("wallprecheck")
if (value != NULL) {
_wall_precheck = strcmp(value, "false") != 0 && strcmp(value, "0") != 0;
} else {
// No value means enable
_wall_precheck = true;
}

CASE("wallsampler")
if (value != NULL) {
switch (value[0]) {
Expand Down
2 changes: 2 additions & 0 deletions ddprof-lib/src/main/cpp/arguments.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ class Arguments {
long _cpu;
long _wall;
bool _wall_collapsing;
bool _wall_precheck;
int _wall_threads_per_tick;
WallclockSampler _wallclock_sampler;
long _memory;
Expand Down Expand Up @@ -207,6 +208,7 @@ class Arguments {
_cpu(-1),
_wall(-1),
_wall_collapsing(false),
_wall_precheck(false),
_wall_threads_per_tick(DEFAULT_WALL_THREADS_PER_TICK),
_wallclock_sampler(ASGCT),
_memory(-1),
Expand Down
4 changes: 4 additions & 0 deletions ddprof-lib/src/main/cpp/counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@
X(AGCT_NATIVE_NO_JAVA_CONTEXT, "agct_native_no_java_context") \
X(AGCT_BLOCKED_IN_VM, "agct_blocked_in_vm") \
X(SKIPPED_WALLCLOCK_UNWINDS, "skipped_wallclock_unwinds") \
X(WC_SIGNAL_SUPPRESSED_SAMPLED_RUN, "wc_signals_suppressed_sampled_run") \
X(WC_UNOWNED_BLOCKED_SUPPRESSED, "wc_unowned_blocked_suppressed") \
X(WC_UNOWNED_BLOCKED_RECORDED, "wc_unowned_blocked_recorded") \
X(WC_SIGNAL_QUEUE_FULL, "wc_signals_queue_full") \
X(UNWINDING_TIME_ASYNC, "unwinding_ticks_async") \
X(UNWINDING_TIME_JVMTI, "unwinding_ticks_jvmti") \
X(CALLTRACE_STORAGE_DROPPED, "calltrace_storage_dropped_traces") \
Expand Down
11 changes: 10 additions & 1 deletion ddprof-lib/src/main/cpp/event.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,13 @@ class WallClockEpochEvent {
u32 _num_failed_samples;
u32 _num_exited_threads;
u32 _num_permission_denied;
u64 _num_suppressed_sampled_run;

WallClockEpochEvent(u64 start_time)
: _dirty(false), _start_time(start_time), _duration_millis(0),
_num_samplable_threads(0), _num_successful_samples(0),
_num_failed_samples(0), _num_exited_threads(0),
_num_permission_denied(0) {}
_num_permission_denied(0), _num_suppressed_sampled_run(0) {}

bool hasChanged() { return _dirty; }

Expand Down Expand Up @@ -166,13 +167,21 @@ class WallClockEpochEvent {
}
}

void addNumSuppressedSampledRun(u64 n) {
if (n > 0) {
_dirty = true;
_num_suppressed_sampled_run += n;
}
}

void endEpoch(u64 millis) { _duration_millis = millis; }

void clean() { _dirty = false; }

void newEpoch(u64 start_time) {
_dirty = false;
_start_time = start_time;
_num_suppressed_sampled_run = 0;
}
};

Expand Down
1 change: 1 addition & 0 deletions ddprof-lib/src/main/cpp/flightRecorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1771,6 +1771,7 @@ void Recording::recordWallClockEpoch(Buffer *buf, WallClockEpochEvent *event) {
buf->putVar64(event->_num_failed_samples);
buf->putVar64(event->_num_exited_threads);
buf->putVar64(event->_num_permission_denied);
buf->putVar64(event->_num_suppressed_sampled_run);
writeEventSizePrefix(buf, start);
flushIfNeeded(buf);
}
Expand Down
9 changes: 9 additions & 0 deletions ddprof-lib/src/main/cpp/hotspot/vmStructs.inline.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,21 @@

#include "hotspot/vmStructs.h"
#include "jvmThread.h"
#include "vmEntry.h"

VMThread* VMThread::current() {
// JVMThread::current() is the native thread self pointer. On OpenJ9/Zing it
// is not a HotSpot JavaThread*; only HotSpot may reinterpret it as VMThread*.
if (!VM::isHotspot() || JVMThread::current() == nullptr) {
return nullptr;
}
return VMThread::cast(JVMThread::current());
}

VMThread* VMThread::fromJavaThread(JNIEnv* env, jthread thread) {
if (!VM::isHotspot()) {
return nullptr;
}
assert(_eetop != nullptr);
if (_eetop != nullptr) {
jlong eetop = env->GetLongField(thread, _eetop);
Expand Down
96 changes: 94 additions & 2 deletions ddprof-lib/src/main/cpp/javaApi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include "counters.h"
#include "common.h"
#include "engine.h"
#include "hotspot/vmStructs.h"
#include "hotspot/vmStructs.inline.h"
#include "incbin.h"
#include "jvmThread.h"
#include "os.h"
Expand Down Expand Up @@ -155,10 +155,13 @@ JavaCritical_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0() {
slot_id = thread_filter->registerThread();
current->setFilterSlotId(slot_id);
}

if (unlikely(slot_id == -1)) {
return; // Failed to register thread
}
// Reset suppression state so a new thread occupying this slot does not inherit
// stale state from its predecessor. Must happen before add().
thread_filter->resetSlotRunState(slot_id);
thread_filter->add(tid, slot_id);
}

Expand Down Expand Up @@ -314,6 +317,95 @@ Java_com_datadoghq_profiler_JavaProfiler_recordQueueEnd0(
Profiler::instance()->recordQueueTime(tid, &event);
}

extern "C" DLLEXPORT void JNICALL
Java_com_datadoghq_profiler_JavaProfiler_parkEnter0(JNIEnv *env, jclass unused) {
ProfiledThread *current = ProfiledThread::current();
if (current == nullptr) {
return;
}
bool first_park = current->parkEnter();
ThreadFilter *tf = Profiler::instance()->threadFilter();
if (first_park && tf->enabled()) {
ThreadFilter::SlotID slot_id = current->filterSlotId();
if (slot_id >= 0) {
current->setParkBlockToken(
tf->enterBlockedRun(slot_id, OSThreadState::CONDVAR_WAIT));
}
}
}

extern "C" DLLEXPORT void JNICALL
Java_com_datadoghq_profiler_JavaProfiler_parkExit0(
JNIEnv *env, jclass unused, jlong blocker, jlong unblockingSpanId) {
ProfiledThread *current = ProfiledThread::current();
if (current == nullptr) {
return;
}
u64 park_block_token = 0;
if (!current->parkExit(park_block_token) || park_block_token == 0) {
return;
}
ThreadFilter *tf = Profiler::instance()->threadFilter();
if (tf->enabled()) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Sphinx Review — HIGH] parkExit0 ignores the return value of current->parkExit() and always calls tf->exitBlockedRun(slot_id) unconditionally — even when the thread was not parked (FLAG_PARKED was not set) or when the slot is owned by blockEnter0. This clears an active block run it does not own, disabling suppression for the remaining sleep interval.

Suggestion: bool was_parked = current->parkExit(start_ticks, park_context); if (was_parked) tf->exitBlockedRun(slot_id); Alternatively store the enterBlockedRun token during parkEnter0 and use the generation-checked overload.

Confirmed by adversary.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed in 6ece88d.

ThreadFilter::SlotID slot_id = ThreadFilter::tokenSlotId(park_block_token);
if (current->filterSlotId() == slot_id) {
tf->exitBlockedRun(slot_id, ThreadFilter::tokenGeneration(park_block_token));
}
}
}

static bool decodeJavaBlockState(jint state, OSThreadState &decoded) {
if (state == static_cast<jint>(OSThreadState::SLEEPING)) {
decoded = OSThreadState::SLEEPING;
return true;
}
decoded = OSThreadState::UNKNOWN;
return false;
}

extern "C" DLLEXPORT jlong JNICALL
Java_com_datadoghq_profiler_JavaProfiler_blockEnter0(
JNIEnv *env, jclass unused, jint state) {
OSThreadState decoded;
if (!decodeJavaBlockState(state, decoded)) {
return 0;
}
ProfiledThread *current = ProfiledThread::current();
if (current == nullptr) {
return 0;
}
ThreadFilter *tf = Profiler::instance()->threadFilter();
if (!tf->enabled()) {
return 0;
}
ThreadFilter::SlotID slot_id = current->filterSlotId();
if (slot_id < 0) {
return 0;
}
return static_cast<jlong>(tf->enterBlockedRun(slot_id, decoded));
}

extern "C" DLLEXPORT void JNICALL
Java_com_datadoghq_profiler_JavaProfiler_blockExit0(
JNIEnv *env, jclass unused, jlong token) {
u64 block_token = static_cast<u64>(token);
if (block_token == 0) {
return;
}
ProfiledThread *current = ProfiledThread::current();
if (current == nullptr) {
return;
}
ThreadFilter::SlotID slot_id = ThreadFilter::tokenSlotId(block_token);
if (current->filterSlotId() != slot_id) {
return;
}
ThreadFilter *tf = Profiler::instance()->threadFilter();
if (tf->enabled()) {
tf->exitBlockedRun(slot_id, ThreadFilter::tokenGeneration(block_token));
}
}

extern "C" DLLEXPORT jlong JNICALL
Java_com_datadoghq_profiler_JavaProfiler_currentTicks0(JNIEnv *env,
jclass unused) {
Expand Down
4 changes: 3 additions & 1 deletion ddprof-lib/src/main/cpp/jfrMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ void JfrMetadata::initialize(
<< field("numExitedThreads", T_INT,
"Number of Exited Threads Before Handling Signal")
<< field("numPermissionDenied", T_INT,
"Number of Permission Denied Errors"))
"Number of Permission Denied Errors")
<< field("numSuppressedSampledRun", T_LONG,
"Signals suppressed by the wall-clock once-per-run filter"))

<< (type("datadog.ObjectSample", T_ALLOC, "Allocation sample")
<< category("Datadog", "Profiling")
Expand Down
10 changes: 5 additions & 5 deletions ddprof-lib/src/main/cpp/livenessTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,10 @@ Error LivenessTracker::initialize(Arguments &args) {
return Error::OK;
}

// _record_heap_usage controls per-session JFR event emission only, not the
// tracking table. Update it before the _initialized guard so each profiler
// start gets the correct setting even when the table persists across recordings.
_record_heap_usage = args._record_heap_usage;

if (_initialized) {
// Once heap usage recording has been enabled it stays on for the JVM lifetime,
// so a later recording without ':L' does not silently drop HeapUsage events.
_record_heap_usage = _record_heap_usage || args._record_heap_usage;
// if the tracker was previously initialized return the stored result for
// consistency this hack also means that if the profiler is started with
// different arguments for liveness tracking those will be ignored it is
Expand Down Expand Up @@ -270,6 +268,8 @@ Error LivenessTracker::initialize(Arguments &args) {
// enough for 1G of heap
_table = (TrackingEntry *)malloc(sizeof(TrackingEntry) * _table_cap);

_record_heap_usage = args._record_heap_usage;

_gc_epoch = 0;
_last_gc_epoch = 0;

Expand Down
28 changes: 19 additions & 9 deletions ddprof-lib/src/main/cpp/profiler.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright The async-profiler authors
* Copyright 2024, 2025 Datadog, Inc
* Copyright 2024, 2026 Datadog, Inc
* SPDX-License-Identifier: Apache-2.0
*/

Expand Down Expand Up @@ -36,6 +36,7 @@
#include "tsc.h"
#include "utils.h"
#include "wallClock.h"
#include "wallClockCounters.h"
#include "frames.h"

#include <algorithm>
Expand Down Expand Up @@ -81,6 +82,7 @@ void Profiler::onThreadStart(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) {
if (_thread_filter.enabled()) {
int slot_id = _thread_filter.registerThread();
current->setFilterSlotId(slot_id);
_thread_filter.resetSlotRunState(slot_id);
_thread_filter.remove(slot_id); // Remove from filtering initially
}
if (thread != NULL) {
Expand Down Expand Up @@ -551,7 +553,7 @@ void Profiler::recordDeferredSample(int tid, u64 call_trace_id, jint event_type,
_locks[lock_index].unlock();
}

void Profiler::recordSample(void *ucontext, u64 counter, int tid,
bool Profiler::recordSample(void *ucontext, u64 counter, int tid,
jint event_type, u64 call_trace_id, Event *event) {
atomicIncRelaxed(_total_samples);

Expand All @@ -567,7 +569,7 @@ void Profiler::recordSample(void *ucontext, u64 counter, int tid,
// collected trace
PerfEvents::resetBuffer(tid);
}
return;
return false;
}

bool truncated = false;
Expand Down Expand Up @@ -617,12 +619,13 @@ void Profiler::recordSample(void *ucontext, u64 counter, int tid,
_jfr.recordEvent(lock_index, tid, call_trace_id, event_type, event);

_locks[lock_index].unlock();
return true;
}

void Profiler::recordSampleDelegated(void *ucontext, u64 weight, int tid,
bool Profiler::recordSampleDelegated(void *ucontext, u64 weight, int tid,
jint event_type, Event *event) {
if (!VM::canRequestStackTrace()) {
return;
return false;
}

// Reserve the correlation ID up-front so we can pass the same value to the
Expand All @@ -637,7 +640,7 @@ void Profiler::recordSampleDelegated(void *ucontext, u64 weight, int tid,
} else {
Counters::increment(JVMTI_STACKS_FAILED_OTHER);
}
return;
return false;
}

atomicIncRelaxed(_total_samples);
Expand All @@ -650,11 +653,12 @@ void Profiler::recordSampleDelegated(void *ucontext, u64 weight, int tid,
// The JVM-side stack trace request is already in flight; we just drop our
// sample event. The dangling StackTraceRequest entry in the JVM recording
// will simply have no matching datadog event, which is harmless.
return;
return false;
}

_jfr.recordEventDelegated(lock_index, tid, correlation_id, event_type, event);
_locks[lock_index].unlock();
return true;
}

void Profiler::recordWallClockEpoch(int tid, WallClockEpochEvent *event) {
Expand Down Expand Up @@ -1204,6 +1208,7 @@ Error Profiler::start(Arguments &args, bool reset) {
_omit_stacktraces = args._lightweight;
_remote_symbolication = args._remote_symbolication;
_libs->setRemoteSymbolication(_remote_symbolication);
_wall_precheck = args._wall_precheck;
_event_mask =
((args._event != NULL && strcmp(args._event, EVENT_NOOP) != 0) ? EM_CPU
: 0) |
Expand Down Expand Up @@ -1259,6 +1264,7 @@ Error Profiler::start(Arguments &args, bool reset) {
unlockAll();
}
Counters::reset();
WallClockCounters::reset();

// Reset thread names and IDs
_thread_info.clearAll();
Expand Down Expand Up @@ -1303,10 +1309,14 @@ Error Profiler::start(Arguments &args, bool reset) {

// Minor optim: Register the current thread (start thread won't be called)
if (_thread_filter.enabled()) {
_thread_filter.clearActive();
ProfiledThread *current = ProfiledThread::current();
assert(current != nullptr);
int slot_id = _thread_filter.registerThread();
current->setFilterSlotId(slot_id);
int slot_id = current->filterSlotId();
if (slot_id < 0) {
slot_id = _thread_filter.registerThread();
current->setFilterSlotId(slot_id);
}
_thread_filter.remove(slot_id); // Remove from filtering initially (matches onThreadStart behavior)
}

Expand Down
Loading
Loading