diff --git a/.github/workflows/cpp.yml b/.github/workflows/cpp.yml index b2acdab3fd2a..c35ac5b8cb64 100644 --- a/.github/workflows/cpp.yml +++ b/.github/workflows/cpp.yml @@ -366,40 +366,55 @@ jobs: - name: Setup MSYS2 shell: msys2 {0} run: ci/scripts/msys2_setup.sh cpp - - name: Pin MSYS2 packages - # Temporary workaround for #49930: gcc 16 surfaces a cluster of 5 - # MINGW64 test failures. Pinning gcc-libs to 15.2 (and C++ packages - # rebuilt against gcc-libs 16.1, for ABI compatibility) avoids all - # of them. #49272/#49462 cover one (arrow-json-test); the other 4 - # (async-utility-test, threading-utility-test, dataset-writer-test - # `bad_weak_ptr`, dataset-file-test) need separate fixes — see #49930. - # Remove once all 5 pass on current upstream MSYS2 without these pins + - name: Patch MSYS2 GCC atomic word builtins + # Diagnostic workaround for GCC Bug #125312 / GH-49958: + # GCC r16-427's configure test for _GLIBCXX_ATOMIC_WORD_BUILTINS uses a POSIX-style + # absolute path the Windows GCC being built can't open. libstdc++ falls back + # to non-lock-free atomic refcount ops if: matrix.msystem_upper == 'MINGW64' shell: msys2 {0} run: | set -ex - base="https://repo.msys2.org/mingw/mingw64" - urls=( - "$base/mingw-w64-x86_64-gcc-libs-15.2.0-14-any.pkg.tar.zst" - "$base/mingw-w64-x86_64-gcc-15.2.0-14-any.pkg.tar.zst" - "$base/mingw-w64-x86_64-aws-crt-cpp-0.38.4-1-any.pkg.tar.zst" - "$base/mingw-w64-x86_64-boost-libs-1.91.0-1-any.pkg.tar.zst" - "$base/mingw-w64-x86_64-boost-1.91.0-1-any.pkg.tar.zst" - "$base/mingw-w64-x86_64-ccache-4.13.2-1-any.pkg.tar.zst" - "$base/mingw-w64-x86_64-clang-libs-22.1.4-1-any.pkg.tar.zst" - "$base/mingw-w64-x86_64-clang-22.1.4-1-any.pkg.tar.zst" - "$base/mingw-w64-x86_64-cmake-4.3.2-2-any.pkg.tar.zst" - "$base/mingw-w64-x86_64-icu-78.3-1-any.pkg.tar.zst" - "$base/mingw-w64-x86_64-llvm-libs-22.1.4-1-any.pkg.tar.zst" - "$base/mingw-w64-x86_64-llvm-tools-22.1.4-1-any.pkg.tar.zst" - "$base/mingw-w64-x86_64-llvm-22.1.4-1-any.pkg.tar.zst" - "$base/mingw-w64-x86_64-tbb-2022.3.0-1-any.pkg.tar.zst" - "$base/mingw-w64-x86_64-thrift-0.22.0-1-any.pkg.tar.zst" - "$base/mingw-w64-x86_64-zstd-1.5.7-1-any.pkg.tar.zst" - "$base/mingw-w64-x86_64-gflags-2.2.2-7-any.pkg.tar.zst" - "$base/mingw-w64-x86_64-aws-sdk-cpp-1.11.479-1-any.pkg.tar.zst" - ) - pacman -U --noconfirm "${urls[@]}" + g++ -v + pacman -Q "${MINGW_PACKAGE_PREFIX}-gcc" "${MINGW_PACKAGE_PREFIX}-gcc-libs" + + config_header="$( + find "${MINGW_PREFIX}/include/c++" \ + -path "*/${MINGW_CHOST}/bits/c++config.h" \ + -print \ + -quit + )" + test -n "${config_header}" + + grep -n "GLIBCXX_ATOMIC_WORD_BUILTINS" "${config_header}" || true + cp "${config_header}" "${config_header}.before-gh-49958" + + if grep -q "^#define _GLIBCXX_ATOMIC_WORD_BUILTINS 1" "${config_header}"; then + echo "_GLIBCXX_ATOMIC_WORD_BUILTINS already defined" + elif grep -q "^/\* #undef _GLIBCXX_ATOMIC_WORD_BUILTINS \*/" "${config_header}"; then + sed -i \ + 's|^/\* #undef _GLIBCXX_ATOMIC_WORD_BUILTINS \*/|#define _GLIBCXX_ATOMIC_WORD_BUILTINS 1|' \ + "${config_header}" + else + printf "\n#define _GLIBCXX_ATOMIC_WORD_BUILTINS 1\n" >> "${config_header}" + fi + + grep -n "GLIBCXX_ATOMIC_WORD_BUILTINS" "${config_header}" + - name: Run bad_weak_ptr repro + if: matrix.msystem_upper == 'MINGW64' + shell: msys2 {0} + run: | + set -ex + g++ \ + -std=gnu++20 \ + -O2 \ + -g \ + -Wall \ + -Wextra \ + -pthread \ + cpp/tools/gh-49958-mingw-bad-weak-ptr-repro.cc \ + -o gh-49958-mingw-bad-weak-ptr-repro.exe + ./gh-49958-mingw-bad-weak-ptr-repro.exe 30 - name: Cache ccache uses: actions/cache@v5 with: diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index f5104efb70b1..5744189d24df 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -17,6 +17,7 @@ #include "arrow/dataset/dataset_writer.h" +#include #include #include #include @@ -138,7 +139,31 @@ class DatasetWriterFileQueue explicit DatasetWriterFileQueue(const std::shared_ptr& schema, const FileSystemDatasetWriteOptions& options, std::shared_ptr writer_state) - : options_(options), schema_(schema), writer_state_(std::move(writer_state)) {} + : options_(options), schema_(schema), writer_state_(std::move(writer_state)) { + std::fprintf(stderr, "GH-49958: +DatasetWriterFileQueue this=%p\n", + static_cast(this)); + std::fflush(stderr); + } + + // TEMP DIAG (3): did the object actually get destroyed under us? + ~DatasetWriterFileQueue() { + std::fprintf(stderr, "GH-49958: ~DatasetWriterFileQueue this=%p\n", + static_cast(this)); + std::fflush(stderr); + } + + // TEMP DEBUG: log shared_from_this() callsite throwing bad_weak_ptr + std::shared_ptr SfwDbg(int line) { + try { + return shared_from_this(); + } catch (const std::bad_weak_ptr&) { + std::fprintf(stderr, + "GH-49958: bad_weak_ptr DatasetWriterFileQueue dataset_writer.cc:%d\n", + line); + std::fflush(stderr); + throw; + } + } void Start(std::unique_ptr file_tasks, std::string filename) { @@ -146,7 +171,7 @@ class DatasetWriterFileQueue // Because the scheduler runs one task at a time we know the writer will // be opened before any attempt to write file_tasks_->AddSimpleTask( - [self = shared_from_this(), filename = std::move(filename)] { + [self = SfwDbg(__LINE__), filename = std::move(filename)] { Executor* io_executor = self->options_.filesystem->io_context().executor(); return DeferNotOk(io_executor->Submit([self, filename = std::move(filename)]() { ARROW_ASSIGN_OR_RAISE(self->writer_, @@ -194,8 +219,14 @@ class DatasetWriterFileQueue } void ScheduleBatch(std::shared_ptr batch) { + // TEMP DIAG (2): control block already gone (use-after-free) by here? + if (auto wp = weak_from_this(); wp.expired()) { + std::fprintf(stderr, "GH-49958: ScheduleBatch this=%p EXPIRED use_count=%d\n", + static_cast(this), static_cast(wp.use_count())); + std::fflush(stderr); + } file_tasks_->AddSimpleTask( - [self = shared_from_this(), batch = std::move(batch)]() { + [self = SfwDbg(__LINE__), batch = std::move(batch)]() { return self->WriteNext(std::move(batch)); }, "DatasetWriter::WriteBatch"sv); @@ -234,7 +265,7 @@ class DatasetWriterFileQueue // At this point all write tasks have been added. Because the scheduler // is a 1-task FIFO we know this task will run at the very end and can // add it now. - file_tasks_->AddSimpleTask([self = shared_from_this()] { return self->DoFinish(); }, + file_tasks_->AddSimpleTask([self = SfwDbg(__LINE__)] { return self->DoFinish(); }, "DatasetWriter::FinishFile"sv); file_tasks_.reset(); return Status::OK(); @@ -244,7 +275,7 @@ class DatasetWriterFileQueue Future<> WriteNext(std::shared_ptr next) { // May want to prototype / measure someday pushing the async write down further return DeferNotOk(options_.filesystem->io_context().executor()->Submit( - [self = shared_from_this(), batch = std::move(next)]() { + [self = SfwDbg(__LINE__), batch = std::move(next)]() { int64_t rows_to_release = batch->num_rows(); Status status = self->writer_->Write(batch); self->writer_state_->rows_in_flight_throttle.Release(rows_to_release); @@ -258,7 +289,7 @@ class DatasetWriterFileQueue RETURN_NOT_OK(options_.writer_pre_finish(writer_.get())); } return writer_->Finish().Then( - [self = shared_from_this(), writer_post_finish = options_.writer_post_finish]() { + [self = SfwDbg(__LINE__), writer_post_finish = options_.writer_post_finish]() { std::lock_guard lg(self->writer_state_->visitors_mutex); return writer_post_finish(self->writer_.get()); }); @@ -300,6 +331,20 @@ class DatasetWriterDirectoryQueue } } + // TEMP DEBUG: log shared_from_this() callsite throwing bad_weak_ptr + std::shared_ptr SfwDbg(int line) { + try { + return shared_from_this(); + } catch (const std::bad_weak_ptr&) { + std::fprintf( + stderr, + "GH-49958: bad_weak_ptr DatasetWriterDirectoryQueue dataset_writer.cc:%d\n", + line); + std::fflush(stderr); + throw; + } + } + Result> NextWritableChunk( std::shared_ptr batch, std::shared_ptr* remainder, bool* will_open_file) const { @@ -359,9 +404,10 @@ class DatasetWriterDirectoryQueue } Status OpenFileQueue(const std::string& filename) { - latest_open_file_.reset( - new DatasetWriterFileQueue(schema_, write_options_, writer_state_)); - auto file_finish_task = [self = shared_from_this()] { + // TEMP DIAG (1): make_shared instead of reset to wire _M_weak_this + latest_open_file_ = + std::make_shared(schema_, write_options_, writer_state_); + auto file_finish_task = [self = SfwDbg(__LINE__)] { self->writer_state_->open_files_throttle.Release(1); return Status::OK(); }; diff --git a/cpp/tools/gh-49958-mingw-bad-weak-ptr-repro.cc b/cpp/tools/gh-49958-mingw-bad-weak-ptr-repro.cc new file mode 100644 index 000000000000..bcdc7622907a --- /dev/null +++ b/cpp/tools/gh-49958-mingw-bad-weak-ptr-repro.cc @@ -0,0 +1,169 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Minimal standalone repro for MinGW gcc 16.1 shared_ptr/weak_ptr regression. +// Build (MSYS2 MINGW64): +// g++ -std=gnu++20 -O2 -g -pthread gh-49958-mingw-bad-weak-ptr-repro.cc -o repro +// Run (argument in seconds): +// ./repro 30 + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace { + +struct State : public std::enable_shared_from_this { + void Check() { + check_enter.fetch_add(1, std::memory_order_relaxed); + stage.store(5, std::memory_order_relaxed); // in shared_from_this + auto self = shared_from_this(); + (void)self; + check_ok.fetch_add(1, std::memory_order_relaxed); + stage.store(6, std::memory_order_relaxed); // shared_from_this returned + } + + static std::atomic check_enter; + static std::atomic check_ok; + static std::atomic stage; +}; + +std::atomic State::check_enter{0}; +std::atomic State::check_ok{0}; +std::atomic State::stage{0}; + +std::atomic lock_ok{0}; +std::atomic lock_fail{0}; +std::atomic checks{0}; +std::atomic saw_bad_weak_ptr{false}; +std::atomic saw_impossible_expired{false}; + +void Dump(const char* tag) { + std::fprintf(stderr, + "%s: stage=%d checks=%" PRIu64 " lock_ok=%" PRIu64 " lock_fail=%" PRIu64 + " check_enter=%" PRIu64 " check_ok=%" PRIu64 + " bad_weak_ptr=%d impossible_expired=%d\n", + tag, State::stage.load(std::memory_order_relaxed), + checks.load(std::memory_order_relaxed), + lock_ok.load(std::memory_order_relaxed), + lock_fail.load(std::memory_order_relaxed), + State::check_enter.load(std::memory_order_relaxed), + State::check_ok.load(std::memory_order_relaxed), + saw_bad_weak_ptr.load(std::memory_order_relaxed) ? 1 : 0, + saw_impossible_expired.load(std::memory_order_relaxed) ? 1 : 0); +} + +void OnSignal(int sig) { + std::fprintf(stderr, "caught signal %d\n", sig); + Dump("crash_state"); + std::_Exit(128 + sig); +} + +} // namespace + +int main(int argc, char** argv) { + std::setvbuf(stderr, nullptr, _IONBF, 0); + std::signal(SIGSEGV, OnSignal); + std::signal(SIGABRT, OnSignal); + + int run_seconds = 10; + if (argc > 1) { + run_seconds = std::max(1, std::atoi(argv[1])); + } + + constexpr int kThreads = 32; + + auto owner = std::make_shared(); + std::weak_ptr weak = owner; + + std::atomic stop{false}; + std::atomic owner_is_alive{true}; + + auto worker = [&] { + auto weak_local = weak; + while (!stop.load(std::memory_order_relaxed)) { + State::stage.store(1, std::memory_order_relaxed); // loop + auto sp = weak_local.lock(); + if (!sp) { + State::stage.store(3, std::memory_order_relaxed); // lock failed + lock_fail.fetch_add(1, std::memory_order_relaxed); + if (owner_is_alive.load(std::memory_order_relaxed)) { + saw_impossible_expired.store(true, std::memory_order_relaxed); + stop.store(true, std::memory_order_relaxed); + } + continue; + } + lock_ok.fetch_add(1, std::memory_order_relaxed); + State::stage.store(2, std::memory_order_relaxed); // sp obtained + + try { + State::stage.store(4, std::memory_order_relaxed); // before Check + sp->Check(); + checks.fetch_add(1, std::memory_order_relaxed); + } catch (const std::bad_weak_ptr&) { + saw_bad_weak_ptr.store(true, std::memory_order_relaxed); + stop.store(true, std::memory_order_relaxed); + return; + } + + std::shared_ptr copies[8] = {sp, sp, sp, sp, sp, sp, sp, sp}; + (void)copies; + } + }; + + std::vector threads; + threads.reserve(kThreads); + for (int i = 0; i < kThreads; ++i) { + threads.emplace_back(worker); + } + + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(run_seconds); + while (std::chrono::steady_clock::now() < deadline && + !stop.load(std::memory_order_relaxed)) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + stop.store(true, std::memory_order_relaxed); + for (auto& t : threads) { + t.join(); + } + + owner_is_alive.store(false, std::memory_order_relaxed); + owner.reset(); + + Dump("done"); + + if (saw_bad_weak_ptr.load(std::memory_order_relaxed)) { + std::fprintf(stderr, "REPRODUCED: unexpected std::bad_weak_ptr while object alive\n"); + return 3; + } + if (saw_impossible_expired.load(std::memory_order_relaxed)) { + std::fprintf( + stderr, "REPRODUCED: weak_ptr.lock() failed while owning shared_ptr was alive\n"); + return 2; + } + + std::fprintf(stderr, "No failure observed in this run\n"); + return 0; +}