Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions ddprof-lib/src/main/cpp/event.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,27 @@ class WallClockEpochEvent {
class TraceRootEvent {
public:
u64 _local_root_span_id;
u64 _parent_span_id;
u64 _start_ticks;
u32 _label;
u32 _operation;

TraceRootEvent(u64 local_root_span_id, u32 label, u32 operation)
: _local_root_span_id(local_root_span_id), _label(label),
_operation(operation){};
TraceRootEvent(u64 local_root_span_id, u64 parent_span_id, u64 start_ticks,
u32 label, u32 operation)
: _local_root_span_id(local_root_span_id),
_parent_span_id(parent_span_id), _start_ticks(start_ticks),
_label(label), _operation(operation){};
};

typedef struct TaskBlockEvent {
u64 _start_ticks;
u64 _end_ticks;
u64 _span_id;
u64 _root_span_id;
uintptr_t _blocker;
u64 _unblocking_span_id;
} TaskBlockEvent;

typedef struct QueueTimeEvent {
u64 _start;
u64 _end;
Expand All @@ -171,6 +184,7 @@ typedef struct QueueTimeEvent {
u32 _origin;
u32 _queueType;
u32 _queueLength;
u64 _submitting_span_id;
} QueueTimeEvent;

#endif // _EVENT_H
73 changes: 71 additions & 2 deletions ddprof-lib/src/main/cpp/flightRecorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1531,13 +1531,57 @@ void Recording::recordTraceRoot(Buffer *buf, int tid, TraceRootEvent *event) {
flushIfNeeded(buf);
int start = buf->skip(1);
buf->putVar64(T_ENDPOINT);
buf->putVar64(TSC::ticks());
buf->put8(0);
buf->putVar64(event->_start_ticks);
buf->putVar64(TSC::ticks() - event->_start_ticks);
buf->putVar32(tid);
buf->put8(0);
buf->putVar32(event->_label);
buf->putVar32(event->_operation);
buf->putVar64(event->_local_root_span_id);
buf->putVar64(event->_parent_span_id);
writeEventSizePrefix(buf, start);
flushIfNeeded(buf);
}

void Recording::recordTaskBlock(Buffer *buf, int tid, TaskBlockEvent *event) {
flushIfNeeded(buf);
int start = buf->skip(1);
buf->putVar64(T_TASK_BLOCK);
buf->putVar64(event->_start_ticks);
buf->putVar64(event->_end_ticks - event->_start_ticks);
buf->putVar32(tid);
buf->put8(0);
buf->putVar64(event->_span_id);
buf->putVar64(event->_root_span_id);
buf->putVar64((u64)event->_blocker);
buf->putVar64(event->_unblocking_span_id);
writeEventSizePrefix(buf, start);
flushIfNeeded(buf);
}

void Recording::recordSpanNode(Buffer *buf, int tid, u64 spanId, u64 parentSpanId, u64 rootSpanId,
u64 startNanos, u64 durationNanos, u32 encodedOperation, u32 encodedResource) {
// Convert epoch nanoseconds to JFR ticks so that standard JFR tooling (JMC, Mission
// Control) can correlate SpanNode events with other events on the recording timeline.
// _start_time is in microseconds; multiply by 1000 to get the recording epoch in nanos.
u64 start_epoch_nanos = _start_time * 1000ULL;
u64 startTicks = _start_ticks + (u64)((double)(long long)(startNanos - start_epoch_nanos)
* TSC::frequency() / NANOTIME_FREQ);
u64 durationTicks = (u64)((double)durationNanos * TSC::frequency() / NANOTIME_FREQ);

flushIfNeeded(buf);
int start = buf->skip(1);
buf->putVar64(T_SPAN_NODE);
buf->putVar64(startTicks); // startTime (F_TIME_TICKS)
buf->putVar64(durationTicks); // duration (F_DURATION_TICKS)
buf->putVar32(tid); // eventThread (F_CPOOL)
buf->putVar64(spanId);
buf->putVar64(parentSpanId);
buf->putVar64(rootSpanId);
buf->putVar64(startNanos); // startNanos — epoch ns, used by backend extractor
buf->putVar64(durationNanos); // durationNanos — ns
buf->putVar32(encodedOperation);
buf->putVar32(encodedResource);
writeEventSizePrefix(buf, start);
flushIfNeeded(buf);
}
Expand All @@ -1554,6 +1598,7 @@ void Recording::recordQueueTime(Buffer *buf, int tid, QueueTimeEvent *event) {
buf->putVar64(event->_queueType);
buf->putVar64(event->_queueLength);
writeContext(buf, Contexts::get());
buf->putVar64(event->_submitting_span_id);
writeEventSizePrefix(buf, start);
flushIfNeeded(buf);
}
Expand Down Expand Up @@ -1761,6 +1806,30 @@ void FlightRecorder::recordQueueTime(int lock_index, int tid,
}
}

void FlightRecorder::recordTaskBlock(int lock_index, int tid,
TaskBlockEvent *event) {
OptionalSharedLockGuard locker(&_rec_lock);
if (locker.ownsLock()) {
Recording* rec = _rec;
if (rec != nullptr) {
Buffer *buf = rec->buffer(lock_index);
rec->recordTaskBlock(buf, tid, event);
}
}
}

void FlightRecorder::recordSpanNode(int lock_index, int tid, u64 spanId, u64 parentSpanId, u64 rootSpanId,
u64 startNanos, u64 durationNanos, u32 encodedOperation, u32 encodedResource) {
OptionalSharedLockGuard locker(&_rec_lock);
if (locker.ownsLock()) {
Recording* rec = _rec;
if (rec != nullptr) {
Buffer *buf = rec->buffer(lock_index);
rec->recordSpanNode(buf, tid, spanId, parentSpanId, rootSpanId, startNanos, durationNanos, encodedOperation, encodedResource);
}
}
}

void FlightRecorder::recordDatadogSetting(int lock_index, int length,
const char *name, const char *value,
const char *unit) {
Expand Down
6 changes: 6 additions & 0 deletions ddprof-lib/src/main/cpp/flightRecorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ class Recording {
void recordWallClockEpoch(Buffer *buf, WallClockEpochEvent *event);
void recordTraceRoot(Buffer *buf, int tid, TraceRootEvent *event);
void recordQueueTime(Buffer *buf, int tid, QueueTimeEvent *event);
void recordTaskBlock(Buffer *buf, int tid, TaskBlockEvent *event);
void recordSpanNode(Buffer *buf, int tid, u64 spanId, u64 parentSpanId, u64 rootSpanId,
u64 startNanos, u64 durationNanos, u32 encodedOperation, u32 encodedResource);
void recordAllocation(RecordingBuffer *buf, int tid, u64 call_trace_id,
AllocEvent *event);
void recordHeapLiveObject(Buffer *buf, int tid, u64 call_trace_id,
Expand Down Expand Up @@ -344,6 +347,9 @@ class FlightRecorder {
void wallClockEpoch(int lock_index, WallClockEpochEvent *event);
void recordTraceRoot(int lock_index, int tid, TraceRootEvent *event);
void recordQueueTime(int lock_index, int tid, QueueTimeEvent *event);
void recordTaskBlock(int lock_index, int tid, TaskBlockEvent *event);
void recordSpanNode(int lock_index, int tid, u64 spanId, u64 parentSpanId, u64 rootSpanId,
u64 startNanos, u64 durationNanos, u32 encodedOperation, u32 encodedResource);

bool active() const { return _rec != NULL; }

Expand Down
38 changes: 34 additions & 4 deletions ddprof-lib/src/main/cpp/javaApi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ Java_com_datadoghq_profiler_JavaProfiler_filterThreadRemove0(JNIEnv *env,

extern "C" DLLEXPORT jboolean JNICALL
Java_com_datadoghq_profiler_JavaProfiler_recordTrace0(
JNIEnv *env, jclass unused, jlong rootSpanId, jstring endpoint,
jstring operation, jint sizeLimit) {
JNIEnv *env, jclass unused, jlong rootSpanId, jlong parentSpanId,
jlong startTicks, jstring endpoint, jstring operation, jint sizeLimit) {
JniString endpoint_str(env, endpoint);
u32 endpointLabel = Profiler::instance()->stringLabelMap()->bounded_lookup(
endpoint_str.c_str(), endpoint_str.length(), sizeLimit);
Expand All @@ -212,13 +212,41 @@ Java_com_datadoghq_profiler_JavaProfiler_recordTrace0(
operationLabel = Profiler::instance()->contextValueMap()->bounded_lookup(
operation_str.c_str(), operation_str.length(), 1 << 16);
}
TraceRootEvent event(rootSpanId, endpointLabel, operationLabel);
TraceRootEvent event(rootSpanId, (u64)parentSpanId, (u64)startTicks,
endpointLabel, operationLabel);
int tid = ProfiledThread::currentTid();
Profiler::instance()->recordTraceRoot(tid, &event);
}
return acceptValue;
}

extern "C" DLLEXPORT void JNICALL
Java_com_datadoghq_profiler_JavaProfiler_recordTaskBlock0(
JNIEnv *env, jclass unused, jlong startTicks, jlong endTicks,
jlong spanId, jlong rootSpanId, jlong blocker, jlong unblockingSpanId) {
TaskBlockEvent event;
event._start_ticks = (u64)startTicks;
event._end_ticks = (u64)endTicks;
event._span_id = (u64)spanId;
event._root_span_id = (u64)rootSpanId;
event._blocker = (uintptr_t)blocker;
event._unblocking_span_id = (u64)unblockingSpanId;
int tid = ProfiledThread::currentTid();
Profiler::instance()->recordTaskBlock(tid, &event);
}

extern "C" DLLEXPORT void JNICALL
Java_com_datadoghq_profiler_JavaProfiler_recordSpanNode0(
JNIEnv *env, jclass unused,
jlong spanId, jlong parentSpanId, jlong rootSpanId,
jlong startNanos, jlong durationNanos,
jint encodedOperation, jint encodedResource) {
int tid = ProfiledThread::currentTid();
Profiler::instance()->recordSpanNode(tid, (u64)spanId, (u64)parentSpanId, (u64)rootSpanId,
(u64)startNanos, (u64)durationNanos,
(u32)encodedOperation, (u32)encodedResource);
}

extern "C" DLLEXPORT jint JNICALL
Java_com_datadoghq_profiler_JavaProfiler_registerConstant0(JNIEnv *env,
jclass unused,
Expand Down Expand Up @@ -291,7 +319,8 @@ static int dictionarizeClassName(JNIEnv* env, jstring className) {
extern "C" DLLEXPORT void JNICALL
Java_com_datadoghq_profiler_JavaProfiler_recordQueueEnd0(
JNIEnv *env, jclass unused, jlong startTime, jlong endTime, jstring task,
jstring scheduler, jthread origin, jstring queueType, jint queueLength) {
jstring scheduler, jthread origin, jstring queueType, jint queueLength,
jlong submittingSpanId) {
int tid = ProfiledThread::currentTid();
if (tid < 0) {
return;
Expand Down Expand Up @@ -321,6 +350,7 @@ Java_com_datadoghq_profiler_JavaProfiler_recordQueueEnd0(
event._origin = origin_tid;
event._queueType = queue_type_offset;
event._queueLength = queueLength;
event._submitting_span_id = (u64)submittingSpanId;
Profiler::instance()->recordQueueTime(tid, &event);
}

Expand Down
30 changes: 28 additions & 2 deletions ddprof-lib/src/main/cpp/jfrMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,32 @@ void JfrMetadata::initialize(
<< field("stackTrace", T_STACK_TRACE, "Stack Trace", F_CPOOL)
<< field("endpoint", T_STRING, "Endpoint", F_CPOOL)
<< field("operation", T_ATTRIBUTE_VALUE, "Operation", F_CPOOL)
<< field("localRootSpanId", T_LONG, "Local Root Span ID"))
<< field("localRootSpanId", T_LONG, "Local Root Span ID")
<< field("parentSpanId", T_LONG, "Parent Span ID"))

<< (type("datadog.TaskBlock", T_TASK_BLOCK, "Task Block")
<< category("Datadog")
<< field("startTime", T_LONG, "Start Time", F_TIME_TICKS)
<< field("duration", T_LONG, "Duration", F_DURATION_TICKS)
<< field("eventThread", T_THREAD, "Event Thread", F_CPOOL)
<< field("stackTrace", T_STACK_TRACE, "Stack Trace", F_CPOOL)
<< field("spanId", T_LONG, "Span ID")
<< field("localRootSpanId", T_LONG, "Local Root Span ID")
<< field("blocker", T_LONG, "Blocker Object Hash", F_UNSIGNED)
<< field("unblockingSpanId", T_LONG, "Unblocking Span ID"))

<< (type("datadog.SpanNode", T_SPAN_NODE, "Span Node")
<< category("Datadog")
<< field("startTime", T_LONG, "Start Time", F_TIME_TICKS)
<< field("duration", T_LONG, "Duration", F_DURATION_TICKS)
<< field("eventThread", T_THREAD, "Event Thread", F_CPOOL)
<< field("spanId", T_LONG, "Span ID")
<< field("parentSpanId", T_LONG, "Parent Span ID")
<< field("localRootSpanId", T_LONG, "Local Root Span ID")
<< field("startNanos", T_LONG, "Start Time (epoch ns)")
<< field("durationNanos", T_LONG, "Duration (ns)")
<< field("encodedOperation", T_INT, "Encoded Operation Name")
<< field("encodedResource", T_INT, "Encoded Resource Name"))

<< (type("datadog.QueueTime", T_QUEUE_TIME, "Queue Time")
<< category("Datadog")
Expand All @@ -189,7 +214,8 @@ void JfrMetadata::initialize(
<< field("queueType", T_CLASS, "Queue Type", F_CPOOL)
<< field("queueLength", T_INT, "Queue Length on Entry")
<< field("spanId", T_LONG, "Span ID")
<< field("localRootSpanId", T_LONG, "Local Root Span ID") ||
<< field("localRootSpanId", T_LONG, "Local Root Span ID")
<< field("submittingSpanId", T_LONG, "Submitting Span ID") ||
contextAttributes)

<< (type("datadog.HeapUsage", T_HEAP_USAGE, "JVM Heap Usage")
Expand Down
2 changes: 2 additions & 0 deletions ddprof-lib/src/main/cpp/jfrMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ enum JfrType {
T_DATADOG_CLASSREF_CACHE = 124,
T_DATADOG_COUNTER = 125,
T_UNWIND_FAILURE = 126,
T_TASK_BLOCK = 127,
T_SPAN_NODE = 128,
T_ANNOTATION = 200,
T_LABEL = 201,
T_CATEGORY = 202,
Expand Down
23 changes: 23 additions & 0 deletions ddprof-lib/src/main/cpp/profiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,29 @@ void Profiler::recordQueueTime(int tid, QueueTimeEvent *event) {
_locks[lock_index].unlock();
}

void Profiler::recordTaskBlock(int tid, TaskBlockEvent *event) {
u32 lock_index = getLockIndex(tid);
if (!_locks[lock_index].tryLock() &&
!_locks[lock_index = (lock_index + 1) % CONCURRENCY_LEVEL].tryLock() &&
!_locks[lock_index = (lock_index + 2) % CONCURRENCY_LEVEL].tryLock()) {
return;
}
_jfr.recordTaskBlock(lock_index, tid, event);
_locks[lock_index].unlock();
}

void Profiler::recordSpanNode(int tid, u64 spanId, u64 parentSpanId, u64 rootSpanId,
u64 startNanos, u64 durationNanos, u32 encodedOperation, u32 encodedResource) {
u32 lock_index = getLockIndex(tid);
if (!_locks[lock_index].tryLock() &&
!_locks[lock_index = (lock_index + 1) % CONCURRENCY_LEVEL].tryLock() &&
!_locks[lock_index = (lock_index + 2) % CONCURRENCY_LEVEL].tryLock()) {
return;
}
_jfr.recordSpanNode(lock_index, tid, spanId, parentSpanId, rootSpanId, startNanos, durationNanos, encodedOperation, encodedResource);
_locks[lock_index].unlock();
}

void Profiler::recordExternalSample(u64 weight, int tid, int num_frames,
ASGCT_CallFrame *frames, bool truncated,
jint event_type, Event *event) {
Expand Down
3 changes: 3 additions & 0 deletions ddprof-lib/src/main/cpp/profiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,9 @@ class alignas(alignof(SpinLock)) Profiler {
void recordWallClockEpoch(int tid, WallClockEpochEvent *event);
void recordTraceRoot(int tid, TraceRootEvent *event);
void recordQueueTime(int tid, QueueTimeEvent *event);
void recordTaskBlock(int tid, TaskBlockEvent *event);
void recordSpanNode(int tid, u64 spanId, u64 parentSpanId, u64 rootSpanId,
u64 startNanos, u64 durationNanos, u32 encodedOperation, u32 encodedResource);
void writeLog(LogLevel level, const char *message);
void writeLog(LogLevel level, const char *message, size_t len);
void writeDatadogProfilerSetting(int tid, int length, const char *name,
Expand Down
Loading
Loading