diff --git a/cpp/src/arrow/json/reader_test.cc b/cpp/src/arrow/json/reader_test.cc index a52626413d6..9450ddc50f8 100644 --- a/cpp/src/arrow/json/reader_test.cc +++ b/cpp/src/arrow/json/reader_test.cc @@ -34,6 +34,24 @@ #include "arrow/type_fwd.h" #include "arrow/util/vector.h" +// On MinGW, install a crash handler to log the exception address for debugging +// intermittent segfaults. See https://github.com/apache/arrow/issues/49272 +#if defined(__MINGW32__) || defined(__MINGW64__) +# include +# include +namespace { +LONG WINAPI ArrowCrashHandler(EXCEPTION_POINTERS* ep) { + fprintf(stderr, "CRASH: code=0x%lx addr=%p\n", ep->ExceptionRecord->ExceptionCode, + ep->ExceptionRecord->ExceptionAddress); + fflush(stderr); + return EXCEPTION_CONTINUE_SEARCH; +} +struct CrashHandlerInstaller { + CrashHandlerInstaller() { SetUnhandledExceptionFilter(ArrowCrashHandler); } +} g_crash_handler; +} // namespace +#endif + namespace arrow { namespace json { @@ -290,6 +308,32 @@ TEST(ReaderTest, MultipleChunksParallel) { AssertTablesEqual(*serial, *threaded); } +// Stress test for intermittent threading crashes on MinGW. +// See https://github.com/apache/arrow/issues/49272 +TEST(ReaderTest, MultipleChunksParallelStress) { + constexpr int kTrials = 20; + for (int trial = 0; trial < kTrials; ++trial) { + int64_t count = 1 << 10; + ParseOptions parse_options; + parse_options.unexpected_field_behavior = UnexpectedFieldBehavior::InferType; + ReadOptions read_options; + read_options.block_size = static_cast(count / 2); + read_options.use_threads = true; + + std::string json; + for (int i = 0; i < count; ++i) { + json += "{\"a\":" + std::to_string(i) + "}\n"; + } + + std::shared_ptr input; + ASSERT_OK(MakeStream(json, &input)); + ASSERT_OK_AND_ASSIGN(auto reader, TableReader::Make(default_memory_pool(), input, + read_options, parse_options)); + ASSERT_OK_AND_ASSIGN(auto table, reader->Read()); + ASSERT_EQ(table->num_rows(), count); + } +} + TEST(ReaderTest, ListArrayWithFewValues) { // ARROW-7647 ParseOptions parse_options; diff --git a/cpp/src/arrow/util/task_group.cc b/cpp/src/arrow/util/task_group.cc index 27cd1ba3971..980ca84de83 100644 --- a/cpp/src/arrow/util/task_group.cc +++ b/cpp/src/arrow/util/task_group.cc @@ -99,7 +99,7 @@ class ThreadedTaskGroup : public TaskGroup { // The hot path is unlocked thanks to atomics // Only if an error occurs is the lock taken if (ok_.load(std::memory_order_acquire)) { - nremaining_.fetch_add(1, std::memory_order_acquire); + nremaining_.fetch_add(1, std::memory_order_acq_rel); auto self = checked_pointer_cast(shared_from_this()); @@ -176,7 +176,7 @@ class ThreadedTaskGroup : public TaskGroup { void OneTaskDone() { // Can be called unlocked thanks to atomics - auto nremaining = nremaining_.fetch_sub(1, std::memory_order_release) - 1; + auto nremaining = nremaining_.fetch_sub(1, std::memory_order_acq_rel) - 1; DCHECK_GE(nremaining, 0); if (nremaining == 0) { // Take the lock so that ~ThreadedTaskGroup cannot destroy cv diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index bf107006f8b..521a9d6f156 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -630,9 +630,34 @@ void ThreadPool::CollectFinishedWorkersUnlocked() { state_->finished_workers_.clear(); } +// MinGW's __emutls implementation for C++ thread_local has known race conditions +// during thread creation that can cause segfaults. Use native Win32 TLS instead. +// See https://github.com/apache/arrow/issues/49272 +# if defined(__MINGW32__) || defined(__MINGW64__) +# include + +namespace { +DWORD GetPoolTlsIndex() { + static DWORD index = TlsAlloc(); + return index; +} +} // namespace + +static ThreadPool* GetCurrentThreadPool() { + return static_cast(TlsGetValue(GetPoolTlsIndex())); +} + +static void SetCurrentThreadPool(ThreadPool* pool) { + TlsSetValue(GetPoolTlsIndex(), pool); +} +# else thread_local ThreadPool* current_thread_pool_ = nullptr; -bool ThreadPool::OwnsThisThread() { return current_thread_pool_ == this; } +static ThreadPool* GetCurrentThreadPool() { return current_thread_pool_; } +static void SetCurrentThreadPool(ThreadPool* pool) { current_thread_pool_ = pool; } +# endif + +bool ThreadPool::OwnsThisThread() { return GetCurrentThreadPool() == this; } void ThreadPool::LaunchWorkersUnlocked(int threads) { std::shared_ptr state = sp_state_; @@ -641,7 +666,7 @@ void ThreadPool::LaunchWorkersUnlocked(int threads) { state_->workers_.emplace_back(); auto it = --(state_->workers_.end()); *it = std::thread([this, state, it] { - current_thread_pool_ = this; + SetCurrentThreadPool(this); WorkerLoop(state, it); }); }