From 6248bd89b7d6805e6215c20fd114eb3f69a05594 Mon Sep 17 00:00:00 2001 From: vanshaj2023 Date: Mon, 16 Feb 2026 17:50:58 +0530 Subject: [PATCH] GH-49272: [C++][CI] Fix intermittent segfault in arrow-json-test on MinGW --- cpp/src/arrow/json/reader_test.cc | 45 +++++++++++++++++++++++++++++++ cpp/src/arrow/util/task_group.cc | 4 +-- cpp/src/arrow/util/thread_pool.cc | 29 ++++++++++++++++++-- 3 files changed, 74 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/json/reader_test.cc b/cpp/src/arrow/json/reader_test.cc index a52626413d6..8a01820e7cb 100644 --- a/cpp/src/arrow/json/reader_test.cc +++ b/cpp/src/arrow/json/reader_test.cc @@ -34,6 +34,25 @@ #include "arrow/type_fwd.h" #include "arrow/util/vector.h" +// On MinGW, install a crash handler to log the exception address for debugging +// intermittent segfaults. See https://github.com/apache/arrow/issues/49272 +#if defined(__MINGW32__) || defined(__MINGW64__) +# include +# include +namespace { +LONG WINAPI ArrowCrashHandler(EXCEPTION_POINTERS* ep) { + fprintf(stderr, "CRASH: code=0x%lx addr=%p\n", + ep->ExceptionRecord->ExceptionCode, + ep->ExceptionRecord->ExceptionAddress); + fflush(stderr); + return EXCEPTION_CONTINUE_SEARCH; +} +struct CrashHandlerInstaller { + CrashHandlerInstaller() { SetUnhandledExceptionFilter(ArrowCrashHandler); } +} g_crash_handler; +} // namespace +#endif + namespace arrow { namespace json { @@ -290,6 +309,32 @@ TEST(ReaderTest, MultipleChunksParallel) { AssertTablesEqual(*serial, *threaded); } +// Stress test for intermittent threading crashes on MinGW. +// See https://github.com/apache/arrow/issues/49272 +TEST(ReaderTest, MultipleChunksParallelStress) { + constexpr int kTrials = 20; + for (int trial = 0; trial < kTrials; ++trial) { + int64_t count = 1 << 10; + ParseOptions parse_options; + parse_options.unexpected_field_behavior = UnexpectedFieldBehavior::InferType; + ReadOptions read_options; + read_options.block_size = static_cast(count / 2); + read_options.use_threads = true; + + std::string json; + for (int i = 0; i < count; ++i) { + json += "{\"a\":" + std::to_string(i) + "}\n"; + } + + std::shared_ptr input; + ASSERT_OK(MakeStream(json, &input)); + ASSERT_OK_AND_ASSIGN(auto reader, TableReader::Make(default_memory_pool(), input, + read_options, parse_options)); + ASSERT_OK_AND_ASSIGN(auto table, reader->Read()); + ASSERT_EQ(table->num_rows(), count); + } +} + TEST(ReaderTest, ListArrayWithFewValues) { // ARROW-7647 ParseOptions parse_options; diff --git a/cpp/src/arrow/util/task_group.cc b/cpp/src/arrow/util/task_group.cc index 27cd1ba3971..980ca84de83 100644 --- a/cpp/src/arrow/util/task_group.cc +++ b/cpp/src/arrow/util/task_group.cc @@ -99,7 +99,7 @@ class ThreadedTaskGroup : public TaskGroup { // The hot path is unlocked thanks to atomics // Only if an error occurs is the lock taken if (ok_.load(std::memory_order_acquire)) { - nremaining_.fetch_add(1, std::memory_order_acquire); + nremaining_.fetch_add(1, std::memory_order_acq_rel); auto self = checked_pointer_cast(shared_from_this()); @@ -176,7 +176,7 @@ class ThreadedTaskGroup : public TaskGroup { void OneTaskDone() { // Can be called unlocked thanks to atomics - auto nremaining = nremaining_.fetch_sub(1, std::memory_order_release) - 1; + auto nremaining = nremaining_.fetch_sub(1, std::memory_order_acq_rel) - 1; DCHECK_GE(nremaining, 0); if (nremaining == 0) { // Take the lock so that ~ThreadedTaskGroup cannot destroy cv diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index bf107006f8b..7f8806a38a1 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -630,9 +630,34 @@ void ThreadPool::CollectFinishedWorkersUnlocked() { state_->finished_workers_.clear(); } +// MinGW's __emutls implementation for C++ thread_local has known race conditions +// during thread creation that can cause segfaults. Use native Win32 TLS instead. +// See https://github.com/apache/arrow/issues/49272 +#if defined(__MINGW32__) || defined(__MINGW64__) +# include + +namespace { +DWORD GetPoolTlsIndex() { + static DWORD index = TlsAlloc(); + return index; +} +} // namespace + +static ThreadPool* GetCurrentThreadPool() { + return static_cast(TlsGetValue(GetPoolTlsIndex())); +} + +static void SetCurrentThreadPool(ThreadPool* pool) { + TlsSetValue(GetPoolTlsIndex(), pool); +} +#else thread_local ThreadPool* current_thread_pool_ = nullptr; -bool ThreadPool::OwnsThisThread() { return current_thread_pool_ == this; } +static ThreadPool* GetCurrentThreadPool() { return current_thread_pool_; } +static void SetCurrentThreadPool(ThreadPool* pool) { current_thread_pool_ = pool; } +#endif + +bool ThreadPool::OwnsThisThread() { return GetCurrentThreadPool() == this; } void ThreadPool::LaunchWorkersUnlocked(int threads) { std::shared_ptr state = sp_state_; @@ -641,7 +666,7 @@ void ThreadPool::LaunchWorkersUnlocked(int threads) { state_->workers_.emplace_back(); auto it = --(state_->workers_.end()); *it = std::thread([this, state, it] { - current_thread_pool_ = this; + SetCurrentThreadPool(this); WorkerLoop(state, it); }); }