Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions cpp/src/arrow/json/reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,24 @@
#include "arrow/type_fwd.h"
#include "arrow/util/vector.h"

// On MinGW, install a crash handler to log the exception address for debugging
// intermittent segfaults. See https://github.com/apache/arrow/issues/49272
#if defined(__MINGW32__) || defined(__MINGW64__)
# include <windows.h>
# include <cstdio>
namespace {
LONG WINAPI ArrowCrashHandler(EXCEPTION_POINTERS* ep) {
fprintf(stderr, "CRASH: code=0x%lx addr=%p\n", ep->ExceptionRecord->ExceptionCode,
ep->ExceptionRecord->ExceptionAddress);
fflush(stderr);
return EXCEPTION_CONTINUE_SEARCH;
}
struct CrashHandlerInstaller {
CrashHandlerInstaller() { SetUnhandledExceptionFilter(ArrowCrashHandler); }
} g_crash_handler;
} // namespace
#endif

namespace arrow {
namespace json {

Expand Down Expand Up @@ -290,6 +308,32 @@ TEST(ReaderTest, MultipleChunksParallel) {
AssertTablesEqual(*serial, *threaded);
}

// Stress test for intermittent threading crashes on MinGW.
// See https://github.com/apache/arrow/issues/49272
TEST(ReaderTest, MultipleChunksParallelStress) {
constexpr int kTrials = 20;
for (int trial = 0; trial < kTrials; ++trial) {
int64_t count = 1 << 10;
ParseOptions parse_options;
parse_options.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
ReadOptions read_options;
read_options.block_size = static_cast<int>(count / 2);
read_options.use_threads = true;

std::string json;
for (int i = 0; i < count; ++i) {
json += "{\"a\":" + std::to_string(i) + "}\n";
}

std::shared_ptr<io::InputStream> input;
ASSERT_OK(MakeStream(json, &input));
ASSERT_OK_AND_ASSIGN(auto reader, TableReader::Make(default_memory_pool(), input,
read_options, parse_options));
ASSERT_OK_AND_ASSIGN(auto table, reader->Read());
ASSERT_EQ(table->num_rows(), count);
}
}

TEST(ReaderTest, ListArrayWithFewValues) {
// ARROW-7647
ParseOptions parse_options;
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/util/task_group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class ThreadedTaskGroup : public TaskGroup {
// The hot path is unlocked thanks to atomics
// Only if an error occurs is the lock taken
if (ok_.load(std::memory_order_acquire)) {
nremaining_.fetch_add(1, std::memory_order_acquire);
nremaining_.fetch_add(1, std::memory_order_acq_rel);

auto self = checked_pointer_cast<ThreadedTaskGroup>(shared_from_this());

Expand Down Expand Up @@ -176,7 +176,7 @@ class ThreadedTaskGroup : public TaskGroup {

void OneTaskDone() {
// Can be called unlocked thanks to atomics
auto nremaining = nremaining_.fetch_sub(1, std::memory_order_release) - 1;
auto nremaining = nremaining_.fetch_sub(1, std::memory_order_acq_rel) - 1;
DCHECK_GE(nremaining, 0);
if (nremaining == 0) {
// Take the lock so that ~ThreadedTaskGroup cannot destroy cv
Expand Down
29 changes: 27 additions & 2 deletions cpp/src/arrow/util/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -630,9 +630,34 @@ void ThreadPool::CollectFinishedWorkersUnlocked() {
state_->finished_workers_.clear();
}

// MinGW's __emutls implementation for C++ thread_local has known race conditions
// during thread creation that can cause segfaults. Use native Win32 TLS instead.
// See https://github.com/apache/arrow/issues/49272
# if defined(__MINGW32__) || defined(__MINGW64__)
# include <windows.h>

namespace {
DWORD GetPoolTlsIndex() {
static DWORD index = TlsAlloc();
return index;
}
} // namespace

static ThreadPool* GetCurrentThreadPool() {
return static_cast<ThreadPool*>(TlsGetValue(GetPoolTlsIndex()));
}

static void SetCurrentThreadPool(ThreadPool* pool) {
TlsSetValue(GetPoolTlsIndex(), pool);
}
# else
thread_local ThreadPool* current_thread_pool_ = nullptr;

bool ThreadPool::OwnsThisThread() { return current_thread_pool_ == this; }
static ThreadPool* GetCurrentThreadPool() { return current_thread_pool_; }
static void SetCurrentThreadPool(ThreadPool* pool) { current_thread_pool_ = pool; }
# endif

bool ThreadPool::OwnsThisThread() { return GetCurrentThreadPool() == this; }

void ThreadPool::LaunchWorkersUnlocked(int threads) {
std::shared_ptr<State> state = sp_state_;
Expand All @@ -641,7 +666,7 @@ void ThreadPool::LaunchWorkersUnlocked(int threads) {
state_->workers_.emplace_back();
auto it = --(state_->workers_.end());
*it = std::thread([this, state, it] {
current_thread_pool_ = this;
SetCurrentThreadPool(this);
WorkerLoop(state, it);
});
}
Expand Down