Skip to content
Draft
75 changes: 45 additions & 30 deletions .github/workflows/cpp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -366,40 +366,55 @@ jobs:
- name: Setup MSYS2
shell: msys2 {0}
run: ci/scripts/msys2_setup.sh cpp
- name: Pin MSYS2 packages
# Temporary workaround for #49930: gcc 16 surfaces a cluster of 5
# MINGW64 test failures. Pinning gcc-libs to 15.2 (and C++ packages
# rebuilt against gcc-libs 16.1, for ABI compatibility) avoids all
# of them. #49272/#49462 cover one (arrow-json-test); the other 4
# (async-utility-test, threading-utility-test, dataset-writer-test
# `bad_weak_ptr`, dataset-file-test) need separate fixes — see #49930.
# Remove once all 5 pass on current upstream MSYS2 without these pins
- name: Patch MSYS2 GCC atomic word builtins
# Diagnostic workaround for GCC Bug #125312 / GH-49958:
# GCC r16-427's configure test for _GLIBCXX_ATOMIC_WORD_BUILTINS uses a POSIX-style
# absolute path the Windows GCC being built can't open. libstdc++ falls back
# to non-lock-free atomic refcount ops
if: matrix.msystem_upper == 'MINGW64'
shell: msys2 {0}
run: |
set -ex
base="https://repo.msys2.org/mingw/mingw64"
urls=(
"$base/mingw-w64-x86_64-gcc-libs-15.2.0-14-any.pkg.tar.zst"
"$base/mingw-w64-x86_64-gcc-15.2.0-14-any.pkg.tar.zst"
"$base/mingw-w64-x86_64-aws-crt-cpp-0.38.4-1-any.pkg.tar.zst"
"$base/mingw-w64-x86_64-boost-libs-1.91.0-1-any.pkg.tar.zst"
"$base/mingw-w64-x86_64-boost-1.91.0-1-any.pkg.tar.zst"
"$base/mingw-w64-x86_64-ccache-4.13.2-1-any.pkg.tar.zst"
"$base/mingw-w64-x86_64-clang-libs-22.1.4-1-any.pkg.tar.zst"
"$base/mingw-w64-x86_64-clang-22.1.4-1-any.pkg.tar.zst"
"$base/mingw-w64-x86_64-cmake-4.3.2-2-any.pkg.tar.zst"
"$base/mingw-w64-x86_64-icu-78.3-1-any.pkg.tar.zst"
"$base/mingw-w64-x86_64-llvm-libs-22.1.4-1-any.pkg.tar.zst"
"$base/mingw-w64-x86_64-llvm-tools-22.1.4-1-any.pkg.tar.zst"
"$base/mingw-w64-x86_64-llvm-22.1.4-1-any.pkg.tar.zst"
"$base/mingw-w64-x86_64-tbb-2022.3.0-1-any.pkg.tar.zst"
"$base/mingw-w64-x86_64-thrift-0.22.0-1-any.pkg.tar.zst"
"$base/mingw-w64-x86_64-zstd-1.5.7-1-any.pkg.tar.zst"
"$base/mingw-w64-x86_64-gflags-2.2.2-7-any.pkg.tar.zst"
"$base/mingw-w64-x86_64-aws-sdk-cpp-1.11.479-1-any.pkg.tar.zst"
)
pacman -U --noconfirm "${urls[@]}"
g++ -v
pacman -Q "${MINGW_PACKAGE_PREFIX}-gcc" "${MINGW_PACKAGE_PREFIX}-gcc-libs"

config_header="$(
find "${MINGW_PREFIX}/include/c++" \
-path "*/${MINGW_CHOST}/bits/c++config.h" \
-print \
-quit
)"
test -n "${config_header}"

grep -n "GLIBCXX_ATOMIC_WORD_BUILTINS" "${config_header}" || true
cp "${config_header}" "${config_header}.before-gh-49958"

if grep -q "^#define _GLIBCXX_ATOMIC_WORD_BUILTINS 1" "${config_header}"; then
echo "_GLIBCXX_ATOMIC_WORD_BUILTINS already defined"
elif grep -q "^/\* #undef _GLIBCXX_ATOMIC_WORD_BUILTINS \*/" "${config_header}"; then
sed -i \
's|^/\* #undef _GLIBCXX_ATOMIC_WORD_BUILTINS \*/|#define _GLIBCXX_ATOMIC_WORD_BUILTINS 1|' \
"${config_header}"
else
printf "\n#define _GLIBCXX_ATOMIC_WORD_BUILTINS 1\n" >> "${config_header}"
fi

grep -n "GLIBCXX_ATOMIC_WORD_BUILTINS" "${config_header}"
- name: Run bad_weak_ptr repro
if: matrix.msystem_upper == 'MINGW64'
shell: msys2 {0}
run: |
set -ex
g++ \
-std=gnu++20 \
-O2 \
-g \
-Wall \
-Wextra \
-pthread \
cpp/tools/gh-49958-mingw-bad-weak-ptr-repro.cc \
-o gh-49958-mingw-bad-weak-ptr-repro.exe
./gh-49958-mingw-bad-weak-ptr-repro.exe 30
- name: Cache ccache
uses: actions/cache@v5
with:
Expand Down
64 changes: 55 additions & 9 deletions cpp/src/arrow/dataset/dataset_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "arrow/dataset/dataset_writer.h"

#include <cstdio>
#include <deque>
#include <memory>
#include <mutex>
Expand Down Expand Up @@ -138,15 +139,39 @@ class DatasetWriterFileQueue
explicit DatasetWriterFileQueue(const std::shared_ptr<Schema>& schema,
const FileSystemDatasetWriteOptions& options,
std::shared_ptr<DatasetWriterState> writer_state)
: options_(options), schema_(schema), writer_state_(std::move(writer_state)) {}
: options_(options), schema_(schema), writer_state_(std::move(writer_state)) {
std::fprintf(stderr, "GH-49958: +DatasetWriterFileQueue this=%p\n",
static_cast<void*>(this));
std::fflush(stderr);
}

// TEMP DIAG (3): did the object actually get destroyed under us?
~DatasetWriterFileQueue() {
std::fprintf(stderr, "GH-49958: ~DatasetWriterFileQueue this=%p\n",
static_cast<void*>(this));
std::fflush(stderr);
}

// TEMP DEBUG: log shared_from_this() callsite throwing bad_weak_ptr
std::shared_ptr<DatasetWriterFileQueue> SfwDbg(int line) {
try {
return shared_from_this();
} catch (const std::bad_weak_ptr&) {
std::fprintf(stderr,
"GH-49958: bad_weak_ptr DatasetWriterFileQueue dataset_writer.cc:%d\n",
line);
std::fflush(stderr);
throw;
}
}

void Start(std::unique_ptr<util::ThrottledAsyncTaskScheduler> file_tasks,
std::string filename) {
file_tasks_ = std::move(file_tasks);
// Because the scheduler runs one task at a time we know the writer will
// be opened before any attempt to write
file_tasks_->AddSimpleTask(
[self = shared_from_this(), filename = std::move(filename)] {
[self = SfwDbg(__LINE__), filename = std::move(filename)] {
Executor* io_executor = self->options_.filesystem->io_context().executor();
return DeferNotOk(io_executor->Submit([self, filename = std::move(filename)]() {
ARROW_ASSIGN_OR_RAISE(self->writer_,
Expand Down Expand Up @@ -194,8 +219,14 @@ class DatasetWriterFileQueue
}

void ScheduleBatch(std::shared_ptr<RecordBatch> batch) {
// TEMP DIAG (2): control block already gone (use-after-free) by here?
if (auto wp = weak_from_this(); wp.expired()) {
std::fprintf(stderr, "GH-49958: ScheduleBatch this=%p EXPIRED use_count=%d\n",
static_cast<void*>(this), static_cast<int>(wp.use_count()));
std::fflush(stderr);
}
file_tasks_->AddSimpleTask(
[self = shared_from_this(), batch = std::move(batch)]() {
[self = SfwDbg(__LINE__), batch = std::move(batch)]() {
return self->WriteNext(std::move(batch));
},
"DatasetWriter::WriteBatch"sv);
Expand Down Expand Up @@ -234,7 +265,7 @@ class DatasetWriterFileQueue
// At this point all write tasks have been added. Because the scheduler
// is a 1-task FIFO we know this task will run at the very end and can
// add it now.
file_tasks_->AddSimpleTask([self = shared_from_this()] { return self->DoFinish(); },
file_tasks_->AddSimpleTask([self = SfwDbg(__LINE__)] { return self->DoFinish(); },
"DatasetWriter::FinishFile"sv);
file_tasks_.reset();
return Status::OK();
Expand All @@ -244,7 +275,7 @@ class DatasetWriterFileQueue
Future<> WriteNext(std::shared_ptr<RecordBatch> next) {
// May want to prototype / measure someday pushing the async write down further
return DeferNotOk(options_.filesystem->io_context().executor()->Submit(
[self = shared_from_this(), batch = std::move(next)]() {
[self = SfwDbg(__LINE__), batch = std::move(next)]() {
int64_t rows_to_release = batch->num_rows();
Status status = self->writer_->Write(batch);
self->writer_state_->rows_in_flight_throttle.Release(rows_to_release);
Expand All @@ -258,7 +289,7 @@ class DatasetWriterFileQueue
RETURN_NOT_OK(options_.writer_pre_finish(writer_.get()));
}
return writer_->Finish().Then(
[self = shared_from_this(), writer_post_finish = options_.writer_post_finish]() {
[self = SfwDbg(__LINE__), writer_post_finish = options_.writer_post_finish]() {
std::lock_guard<std::mutex> lg(self->writer_state_->visitors_mutex);
return writer_post_finish(self->writer_.get());
});
Expand Down Expand Up @@ -300,6 +331,20 @@ class DatasetWriterDirectoryQueue
}
}

// TEMP DEBUG: log shared_from_this() callsite throwing bad_weak_ptr
std::shared_ptr<DatasetWriterDirectoryQueue> SfwDbg(int line) {
try {
return shared_from_this();
} catch (const std::bad_weak_ptr&) {
std::fprintf(
stderr,
"GH-49958: bad_weak_ptr DatasetWriterDirectoryQueue dataset_writer.cc:%d\n",
line);
std::fflush(stderr);
throw;
}
}

Result<std::shared_ptr<RecordBatch>> NextWritableChunk(
std::shared_ptr<RecordBatch> batch, std::shared_ptr<RecordBatch>* remainder,
bool* will_open_file) const {
Expand Down Expand Up @@ -359,9 +404,10 @@ class DatasetWriterDirectoryQueue
}

Status OpenFileQueue(const std::string& filename) {
latest_open_file_.reset(
new DatasetWriterFileQueue(schema_, write_options_, writer_state_));
auto file_finish_task = [self = shared_from_this()] {
// TEMP DIAG (1): make_shared instead of reset to wire _M_weak_this
latest_open_file_ =
std::make_shared<DatasetWriterFileQueue>(schema_, write_options_, writer_state_);
auto file_finish_task = [self = SfwDbg(__LINE__)] {
self->writer_state_->open_files_throttle.Release(1);
return Status::OK();
};
Expand Down
169 changes: 169 additions & 0 deletions cpp/tools/gh-49958-mingw-bad-weak-ptr-repro.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Minimal standalone repro for MinGW gcc 16.1 shared_ptr/weak_ptr regression.
// Build (MSYS2 MINGW64):
// g++ -std=gnu++20 -O2 -g -pthread gh-49958-mingw-bad-weak-ptr-repro.cc -o repro
// Run (argument in seconds):
// ./repro 30

#include <algorithm>
#include <atomic>
#include <chrono>
#include <cinttypes>
#include <csignal>
#include <cstdio>
#include <cstdlib>
#include <memory>
#include <thread>
#include <vector>

namespace {

struct State : public std::enable_shared_from_this<State> {
void Check() {
check_enter.fetch_add(1, std::memory_order_relaxed);
stage.store(5, std::memory_order_relaxed); // in shared_from_this
auto self = shared_from_this();
(void)self;
check_ok.fetch_add(1, std::memory_order_relaxed);
stage.store(6, std::memory_order_relaxed); // shared_from_this returned
}

static std::atomic<uint64_t> check_enter;
static std::atomic<uint64_t> check_ok;
static std::atomic<int> stage;
};

std::atomic<uint64_t> State::check_enter{0};
std::atomic<uint64_t> State::check_ok{0};
std::atomic<int> State::stage{0};

std::atomic<uint64_t> lock_ok{0};
std::atomic<uint64_t> lock_fail{0};
std::atomic<uint64_t> checks{0};
std::atomic<bool> saw_bad_weak_ptr{false};
std::atomic<bool> saw_impossible_expired{false};

void Dump(const char* tag) {
std::fprintf(stderr,
"%s: stage=%d checks=%" PRIu64 " lock_ok=%" PRIu64 " lock_fail=%" PRIu64
" check_enter=%" PRIu64 " check_ok=%" PRIu64
" bad_weak_ptr=%d impossible_expired=%d\n",
tag, State::stage.load(std::memory_order_relaxed),
checks.load(std::memory_order_relaxed),
lock_ok.load(std::memory_order_relaxed),
lock_fail.load(std::memory_order_relaxed),
State::check_enter.load(std::memory_order_relaxed),
State::check_ok.load(std::memory_order_relaxed),
saw_bad_weak_ptr.load(std::memory_order_relaxed) ? 1 : 0,
saw_impossible_expired.load(std::memory_order_relaxed) ? 1 : 0);
}

void OnSignal(int sig) {
std::fprintf(stderr, "caught signal %d\n", sig);
Dump("crash_state");
std::_Exit(128 + sig);
}

} // namespace

int main(int argc, char** argv) {
std::setvbuf(stderr, nullptr, _IONBF, 0);
std::signal(SIGSEGV, OnSignal);
std::signal(SIGABRT, OnSignal);

int run_seconds = 10;
if (argc > 1) {
run_seconds = std::max(1, std::atoi(argv[1]));
}

constexpr int kThreads = 32;

auto owner = std::make_shared<State>();
std::weak_ptr<State> weak = owner;

std::atomic<bool> stop{false};
std::atomic<bool> owner_is_alive{true};

auto worker = [&] {
auto weak_local = weak;
while (!stop.load(std::memory_order_relaxed)) {
State::stage.store(1, std::memory_order_relaxed); // loop
auto sp = weak_local.lock();
if (!sp) {
State::stage.store(3, std::memory_order_relaxed); // lock failed
lock_fail.fetch_add(1, std::memory_order_relaxed);
if (owner_is_alive.load(std::memory_order_relaxed)) {
saw_impossible_expired.store(true, std::memory_order_relaxed);
stop.store(true, std::memory_order_relaxed);
}
continue;
}
lock_ok.fetch_add(1, std::memory_order_relaxed);
State::stage.store(2, std::memory_order_relaxed); // sp obtained

try {
State::stage.store(4, std::memory_order_relaxed); // before Check
sp->Check();
checks.fetch_add(1, std::memory_order_relaxed);
} catch (const std::bad_weak_ptr&) {
saw_bad_weak_ptr.store(true, std::memory_order_relaxed);
stop.store(true, std::memory_order_relaxed);
return;
}

std::shared_ptr<State> copies[8] = {sp, sp, sp, sp, sp, sp, sp, sp};
(void)copies;
}
};

std::vector<std::thread> threads;
threads.reserve(kThreads);
for (int i = 0; i < kThreads; ++i) {
threads.emplace_back(worker);
}

auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(run_seconds);
while (std::chrono::steady_clock::now() < deadline &&
!stop.load(std::memory_order_relaxed)) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

stop.store(true, std::memory_order_relaxed);
for (auto& t : threads) {
t.join();
}

owner_is_alive.store(false, std::memory_order_relaxed);
owner.reset();

Dump("done");

if (saw_bad_weak_ptr.load(std::memory_order_relaxed)) {
std::fprintf(stderr, "REPRODUCED: unexpected std::bad_weak_ptr while object alive\n");
return 3;
}
if (saw_impossible_expired.load(std::memory_order_relaxed)) {
std::fprintf(
stderr, "REPRODUCED: weak_ptr.lock() failed while owning shared_ptr was alive\n");
return 2;
}

std::fprintf(stderr, "No failure observed in this run\n");
return 0;
}
Loading