Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 103 additions & 42 deletions module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,33 @@ struct AsyncLocalStorageLookup {

// Structure to hold information for each thread/isolate
struct ThreadInfo {
// Thread name
// Mutex protecting this thread's mutable data (poll_state, last_seen)
// Does NOT protect async_store (immutable after creation)
mutable std::mutex mutex;
// Thread name (immutable after creation)
std::string thread_name;
// Last time this thread was seen in milliseconds since epoch
milliseconds last_seen;
milliseconds last_seen; // Protected by mutex
// Optional async local storage associated with this thread
std::optional<AsyncLocalStorageLookup> async_store;
// Using shared_ptr to safely share with async tasks even if ThreadInfo is
// erased
std::shared_ptr<std::optional<AsyncLocalStorageLookup>> async_store;
// Some JSON serialized state sent via threadPoll
std::string poll_state;
std::string poll_state; // Protected by mutex

// Constructor needed because std::mutex is not movable/copyable
ThreadInfo(std::string name, milliseconds seen,
std::shared_ptr<std::optional<AsyncLocalStorageLookup>> store,
std::string state)
: thread_name(std::move(name)), last_seen(seen),
async_store(std::move(store)), poll_state(std::move(state)) {}
};

static std::mutex threads_mutex;
// Separate mutexes for different concerns:
// - threads_map_mutex: protects the threads map structure
// (insert/erase/iteration)
// - ThreadInfo::mutex: protects each thread's mutable data
static std::mutex threads_map_mutex;
// Map to hold all registered threads and their information
static std::unordered_map<v8::Isolate *, ThreadInfo> threads = {};

Expand Down Expand Up @@ -316,7 +332,7 @@ std::string GetThreadState(Isolate *isolate,

struct InterruptArgs {
std::promise<JsStackTrace> promise;
const std::optional<AsyncLocalStorageLookup> *store;
std::shared_ptr<std::optional<AsyncLocalStorageLookup>> store;
};

// Function to be called when an isolate's execution is interrupted
Expand Down Expand Up @@ -346,9 +362,9 @@ static void ExecutionInterrupted(Isolate *isolate, void *data) {
}

// Function to capture the stack trace of a single isolate
JsStackTrace
CaptureStackTrace(Isolate *isolate,
const std::optional<AsyncLocalStorageLookup> &store) {
JsStackTrace CaptureStackTrace(
Isolate *isolate,
const std::shared_ptr<std::optional<AsyncLocalStorageLookup>> &store) {
if (isolate->IsExecutionTerminating()) {
return JsStackTrace{{}, ""};
}
Expand All @@ -357,8 +373,20 @@ CaptureStackTrace(Isolate *isolate,
auto future = promise.get_future();

// The v8 isolate must be interrupted to capture the stack trace
// Note: Even if we timeout below, the interrupt may still fire later.
// The InterruptArgs holds a shared_ptr to keep data alive until the callback
// executes.
isolate->RequestInterrupt(ExecutionInterrupted,
new InterruptArgs{std::move(promise), &store});
new InterruptArgs{std::move(promise), store});

// Wait with timeout to prevent infinite hang if isolate never processes
// interrupt (e.g., stuck in native code or terminating)
if (future.wait_for(std::chrono::seconds(5)) == std::future_status::timeout) {
// Timeout occurred. The InterruptArgs is intentionally leaked - it will be
// deleted by ExecutionInterrupted if/when the callback eventually fires.
// The shared_ptr keeps the store data alive.
return JsStackTrace{{}, ""};
}

return future.get();
}
Expand All @@ -369,34 +397,45 @@ void CaptureStackTraces(const FunctionCallbackInfo<Value> &args) {

std::vector<ThreadResult> results;

std::vector<std::future<ThreadResult>> futures;
{
std::vector<std::future<ThreadResult>> futures;
std::lock_guard<std::mutex> lock(threads_mutex);
// Only need map lock to safely iterate and copy shared_ptrs
// No deadlock risk because we release lock before fut.get()
std::lock_guard<std::mutex> lock(threads_map_mutex);
for (auto &thread : threads) {
auto thread_isolate = thread.first;
auto &thread_info = thread.second;

if (thread_isolate == capture_from_isolate)
continue;

// Copy immutable data and shared_ptrs (no individual thread lock needed)
// thread_name and async_store are immutable after creation
auto thread_name = thread_info.thread_name;
auto poll_state = thread_info.poll_state;
auto async_store_ptr = thread_info.async_store;

// For poll_state, we need to lock the thread's mutex briefly
std::string poll_state;
{
std::lock_guard<std::mutex> thread_lock(thread_info.mutex);
poll_state = thread_info.poll_state;
}

futures.emplace_back(std::async(
std::launch::async,
[thread_isolate, thread_name, poll_state](
const std::optional<AsyncLocalStorageLookup> &async_store)
-> ThreadResult {
return ThreadResult{thread_name,
CaptureStackTrace(thread_isolate, async_store),
poll_state};
},
std::cref(thread_info.async_store)));
[thread_isolate, thread_name, poll_state,
async_store_ptr]() -> ThreadResult {
return ThreadResult{
thread_name, CaptureStackTrace(thread_isolate, async_store_ptr),
poll_state};
}));
}
}

for (auto &fut : futures) {
results.emplace_back(fut.get());
}
// Wait for all futures to complete AFTER releasing the lock
// No deadlock because ThreadPoll uses a different lock (thread's own mutex)
for (auto &fut : futures) {
results.emplace_back(fut.get());
}

auto current_context = capture_from_isolate->GetCurrentContext();
Expand Down Expand Up @@ -502,19 +541,25 @@ void CaptureStackTraces(const FunctionCallbackInfo<Value> &args) {
// destroyed
void Cleanup(void *arg) {
auto isolate = static_cast<Isolate *>(arg);
std::lock_guard<std::mutex> lock(threads_mutex);
std::lock_guard<std::mutex> lock(threads_map_mutex);
threads.erase(isolate);
}

void RegisterThreadInternal(
Isolate *isolate, const std::string &thread_name,
std::optional<AsyncLocalStorageLookup> async_store) {

std::lock_guard<std::mutex> lock(threads_mutex);
auto found = threads.find(isolate);
if (found == threads.end()) {
threads.emplace(isolate, ThreadInfo{thread_name, milliseconds::zero(),
std::move(async_store), ""});
std::lock_guard<std::mutex> lock(threads_map_mutex);
// try_emplace constructs the ThreadInfo in-place if the key doesn't exist
// The mutex will be default-constructed automatically by ThreadInfo
// constructor
auto [iter, inserted] = threads.try_emplace(
isolate, thread_name, milliseconds::zero(),
std::make_shared<std::optional<AsyncLocalStorageLookup>>(
std::move(async_store)),
"");

if (inserted) {
// Register a cleanup hook to remove this thread when the isolate is
// destroyed
node::AddEnvironmentCleanupHook(isolate, Cleanup, isolate);
Expand Down Expand Up @@ -646,19 +691,28 @@ void ThreadPoll(const FunctionCallbackInfo<Value> &args) {
poll_state = JSONStringify(isolate, obj);
}

// First, find the thread without holding any lock (map reads are safe)
// Then lock only that specific thread's mutex (not the global map mutex)
ThreadInfo *thread_info_ptr = nullptr;
{
std::lock_guard<std::mutex> lock(threads_mutex);
std::lock_guard<std::mutex> map_lock(threads_map_mutex);
auto found = threads.find(isolate);
if (found != threads.end()) {
auto &thread_info = found->second;
thread_info.poll_state = std::move(poll_state);
thread_info_ptr = &found->second;
}
}

if (enable_last_seen) {
thread_info.last_seen = duration_cast<milliseconds>(
GetUnbiasedMonotonicTime().time_since_epoch());
} else {
thread_info.last_seen = milliseconds::zero();
}
// Update thread-specific data with only the thread's mutex held
// This prevents deadlock with CaptureStackTraces which holds map mutex
if (thread_info_ptr != nullptr) {
std::lock_guard<std::mutex> thread_lock(thread_info_ptr->mutex);
thread_info_ptr->poll_state = std::move(poll_state);

if (enable_last_seen) {
thread_info_ptr->last_seen = duration_cast<milliseconds>(
GetUnbiasedMonotonicTime().time_since_epoch());
} else {
thread_info_ptr->last_seen = milliseconds::zero();
}
}
}
Expand All @@ -671,12 +725,19 @@ void GetThreadsLastSeen(const FunctionCallbackInfo<Value> &args) {
milliseconds now = duration_cast<milliseconds>(
GetUnbiasedMonotonicTime().time_since_epoch());
{
std::lock_guard<std::mutex> lock(threads_mutex);
std::lock_guard<std::mutex> map_lock(threads_map_mutex);
for (const auto &[thread_isolate, info] : threads) {
if (info.last_seen == milliseconds::zero())
// Lock each thread's mutex briefly to read last_seen
milliseconds last_seen;
{
std::lock_guard<std::mutex> thread_lock(info.mutex);
last_seen = info.last_seen;
}

if (last_seen == milliseconds::zero())
continue; // Skip threads that have not registered more than once

int64_t ms_since = (now - info.last_seen).count();
int64_t ms_since = (now - last_seen).count();
result
->Set(isolate->GetCurrentContext(),
String::NewFromUtf8(isolate, info.thread_name.c_str(),
Expand Down
Loading