diff --git a/include/boost/capy/ex/detail/strand_service.hpp b/include/boost/capy/ex/detail/strand_service.hpp index 6f60c2c2..8f4dc144 100644 --- a/include/boost/capy/ex/detail/strand_service.hpp +++ b/include/boost/capy/ex/detail/strand_service.hpp @@ -10,6 +10,7 @@ #ifndef BOOST_CAPY_EX_DETAIL_STRAND_SERVICE_HPP #define BOOST_CAPY_EX_DETAIL_STRAND_SERVICE_HPP +#include #include #include #include @@ -69,14 +70,14 @@ class BOOST_CAPY_DECL strand_service dispatch( std::shared_ptr const& impl, executor_ref ex, - std::coroutine_handle<> h); + continuation& c); /** Post to strand queue. */ static void post( std::shared_ptr const& impl, executor_ref ex, - std::coroutine_handle<> h); + continuation& c); protected: strand_service(); diff --git a/include/boost/capy/ex/strand.hpp b/include/boost/capy/ex/strand.hpp index f639db8c..f607610d 100644 --- a/include/boost/capy/ex/strand.hpp +++ b/include/boost/capy/ex/strand.hpp @@ -59,7 +59,9 @@ namespace capy { thread_pool pool(4); auto strand = make_strand(pool.get_executor()); - // These continuations will never run concurrently + // Continuations are linked intrusively into the strand's queue, + // so each one must outlive its time there. Storage is typically + // owned by the awaitable or operation state that posted it. continuation c1{h1}, c2{h2}, c3{h3}; strand.post(c1); strand.post(c2); @@ -220,7 +222,7 @@ class strand void post(continuation& c) const { - detail::strand_service::post(impl_, executor_ref(ex_), c.h); + detail::strand_service::post(impl_, executor_ref(ex_), c); } /** Dispatch a continuation through the strand. @@ -243,7 +245,7 @@ class strand std::coroutine_handle<> dispatch(continuation& c) const { - return detail::strand_service::dispatch(impl_, executor_ref(ex_), c.h); + return detail::strand_service::dispatch(impl_, executor_ref(ex_), c); } }; diff --git a/src/ex/detail/strand_queue.hpp b/src/ex/detail/strand_queue.hpp index 1ff84027..0c4a2c9c 100644 --- a/src/ex/detail/strand_queue.hpp +++ b/src/ex/detail/strand_queue.hpp @@ -1,5 +1,6 @@ // // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) +// Copyright (c) 2026 Michael Vandeberg // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -10,233 +11,68 @@ #ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP #define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP +#include #include #include -#include -#include -#include - namespace boost { namespace capy { namespace detail { -class strand_queue; - -//---------------------------------------------------------- - -// Metadata stored before the coroutine frame -struct frame_prefix -{ - frame_prefix* next; - strand_queue* queue; - std::size_t alloc_size; -}; - -//---------------------------------------------------------- +/** Single-threaded intrusive FIFO of pending continuations. -/** Wrapper coroutine for strand queue dispatch operations. - - This coroutine wraps a target coroutine handle and resumes - it when dispatched. The wrapper ensures control returns to - the dispatch loop after the target suspends or completes. - - The promise contains an intrusive list node for queue - storage and supports a custom allocator that recycles - coroutine frames via a free list. -*/ -struct strand_op -{ - struct promise_type - { - promise_type* next = nullptr; - - void* - operator new( - std::size_t size, - strand_queue& q, - std::coroutine_handle); - - void - operator delete(void* p, std::size_t); - - strand_op - get_return_object() noexcept - { - return {std::coroutine_handle::from_promise(*this)}; - } - - std::suspend_always - initial_suspend() noexcept - { - return {}; - } - - std::suspend_always - final_suspend() noexcept - { - return {}; - } - - void - return_void() noexcept - { - } - - void - unhandled_exception() - { - std::terminate(); - } - }; - - std::coroutine_handle h_; -}; - -//---------------------------------------------------------- - -/** Single-threaded dispatch queue for coroutine handles. - - This queue stores coroutine handles and resumes them - sequentially when dispatch() is called. Each pushed - handle is wrapped in a strand_op coroutine that ensures - control returns to the dispatch loop after the target - suspends or completes. - - The queue uses an intrusive singly-linked list through - the promise type to avoid separate node allocations. - A free list recycles wrapper coroutine frames to reduce - allocation overhead during repeated push/dispatch cycles. + Links continuations directly through `continuation::next`, so + push() carries no per-item allocation. @par Thread Safety - This class is not thread-safe. All operations must be - called from a single thread. + Not thread-safe. Caller must externally synchronize push() and + take_all(). dispatch_batch() does not touch queue state and may + run unlocked once the batch has been taken. */ class strand_queue { - using promise_type = strand_op::promise_type; - - promise_type* head_ = nullptr; - promise_type* tail_ = nullptr; - frame_prefix* free_list_ = nullptr; - - friend struct strand_op::promise_type; - - static - strand_op - make_strand_op( - strand_queue& q, - std::coroutine_handle target) - { - (void)q; - safe_resume(target); - co_return; - } + continuation* head_ = nullptr; + continuation* tail_ = nullptr; public: strand_queue() = default; - strand_queue(strand_queue const&) = delete; strand_queue& operator=(strand_queue const&) = delete; - /** Destructor. - - Destroys any pending wrappers without resuming them, - then frees all memory in the free list. - */ - ~strand_queue() - { - // Destroy pending wrappers - while(head_) - { - promise_type* p = head_; - head_ = p->next; - - auto h = std::coroutine_handle::from_promise(*p); - h.destroy(); - } - - // Free the free list memory - while(free_list_) - { - frame_prefix* prefix = free_list_; - free_list_ = prefix->next; - ::operator delete(prefix); - } - } - - /** Returns true if there are no pending operations. - */ + /** Returns true if there are no pending continuations. */ bool empty() const noexcept { return head_ == nullptr; } - /** Push a coroutine handle to the queue. - - Creates a wrapper coroutine and appends it to the - queue. The wrapper will resume the target handle - when dispatch() processes it. + /** Push a continuation to the queue. - @param h The coroutine handle to dispatch. + @param c The continuation to enqueue; see `continuation` + for lifetime and aliasing requirements. */ void - push(std::coroutine_handle h) + push(continuation& c) noexcept { - strand_op op = make_strand_op(*this, h); - - promise_type* p = &op.h_.promise(); - p->next = nullptr; - + c.next = nullptr; if(tail_) - tail_->next = p; + tail_->next = &c; else - head_ = p; - tail_ = p; - } - - /** Resume all queued coroutines in sequence. - - Processes each wrapper in FIFO order, resuming its - target coroutine. After each target suspends or - completes, the wrapper is destroyed and its frame - is added to the free list for reuse. - - Coroutines resumed during dispatch may push new - handles, which will also be processed in the same - dispatch call. - - @warning Not thread-safe. Do not call while another - thread may be calling push(). - */ - void - dispatch() - { - while(head_) - { - promise_type* p = head_; - head_ = p->next; - if(!head_) - tail_ = nullptr; - - auto h = std::coroutine_handle::from_promise(*p); - safe_resume(h); - h.destroy(); - } + head_ = &c; + tail_ = &c; } /** Batch of taken items for thread-safe dispatch. */ struct taken_batch { - promise_type* head = nullptr; - promise_type* tail = nullptr; + continuation* head = nullptr; + continuation* tail = nullptr; }; /** Take all pending items atomically. - Removes all items from the queue and returns them - as a batch. The queue is left empty. + Removes all items from the queue and returns them as a + batch. The queue is left empty. @return The batch of taken items. */ @@ -248,13 +84,16 @@ class strand_queue return batch; } - /** Dispatch a batch of taken items. + /** Resume each continuation in a taken batch. + + Advances past each node before resuming, since the + resumed coroutine may destroy the awaitable (and thus + the continuation) before control returns here. @param batch The batch to dispatch. - @note This is thread-safe w.r.t. push() because it doesn't - access the queue's free_list_. Frames are deleted directly - rather than recycled. + @note Thread-safe with respect to push() because the queue + itself is not touched. */ static void @@ -262,69 +101,14 @@ class strand_queue { while(batch.head) { - promise_type* p = batch.head; - batch.head = p->next; - - auto h = std::coroutine_handle::from_promise(*p); - safe_resume(h); - // Don't use h.destroy() - it would call operator delete which - // accesses the queue's free_list_ (race with push). - // Instead, manually free the frame without recycling. - // h.address() returns the frame base (what operator new returned). - frame_prefix* prefix = static_cast(h.address()) - 1; - ::operator delete(prefix); + continuation* c = batch.head; + batch.head = c->next; + safe_resume(c->h); } batch.tail = nullptr; } }; -//---------------------------------------------------------- - -inline -void* -strand_op::promise_type::operator new( - std::size_t size, - strand_queue& q, - std::coroutine_handle) -{ - // Total size includes prefix - std::size_t alloc_size = size + sizeof(frame_prefix); - void* raw; - - // Try to reuse from free list - if(q.free_list_) - { - frame_prefix* prefix = q.free_list_; - q.free_list_ = prefix->next; - raw = prefix; - } - else - { - raw = ::operator new(alloc_size); - } - - // Initialize prefix - frame_prefix* prefix = static_cast(raw); - prefix->next = nullptr; - prefix->queue = &q; - prefix->alloc_size = alloc_size; - - // Return pointer AFTER the prefix (this is where coroutine frame goes) - return prefix + 1; -} - -inline -void -strand_op::promise_type::operator delete(void* p, std::size_t) -{ - // Calculate back to get the prefix - frame_prefix* prefix = static_cast(p) - 1; - - // Add to free list - prefix->next = prefix->queue->free_list_; - prefix->queue->free_list_ = prefix; -} - } // namespace detail } // namespace capy } // namespace boost diff --git a/src/ex/detail/strand_service.cpp b/src/ex/detail/strand_service.cpp index 788696e6..a7005f4c 100644 --- a/src/ex/detail/strand_service.cpp +++ b/src/ex/detail/strand_service.cpp @@ -71,10 +71,10 @@ class strand_service_impl : public strand_service } static bool - enqueue(strand_impl& impl, std::coroutine_handle<> h) + enqueue(strand_impl& impl, continuation& c) { std::lock_guard lock(*impl.mutex_); - impl.pending_.push(h); + impl.pending_.push(c); if(!impl.locked_) { impl.locked_ = true; @@ -269,12 +269,12 @@ strand_service:: dispatch( std::shared_ptr const& impl, executor_ref ex, - std::coroutine_handle<> h) + continuation& c) { if(running_in_this_thread(*impl)) - return h; + return c.h; - if(strand_service_impl::enqueue(*impl, h)) + if(strand_service_impl::enqueue(*impl, c)) strand_service_impl::post_invoker(impl, ex); return std::noop_coroutine(); } @@ -284,9 +284,9 @@ strand_service:: post( std::shared_ptr const& impl, executor_ref ex, - std::coroutine_handle<> h) + continuation& c) { - if(strand_service_impl::enqueue(*impl, h)) + if(strand_service_impl::enqueue(*impl, c)) strand_service_impl::post_invoker(impl, ex); } diff --git a/test/unit/ex/strand.cpp b/test/unit/ex/strand.cpp index fc92e4ab..00962fc4 100644 --- a/test/unit/ex/strand.cpp +++ b/test/unit/ex/strand.cpp @@ -1,5 +1,6 @@ // // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) +// Copyright (c) 2026 Michael Vandeberg // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -487,11 +488,14 @@ struct strand_test std::atomic done{false}; std::weak_ptr impl_weak; + // c must outlive its time in the strand queue; the strand + // links it intrusively rather than copying. + continuation c; { auto s = strand(pool.get_executor()); impl_weak = s.impl_; auto coro = make_lifetime_coro(done); - continuation c{coro.handle()}; + c.h = coro.handle(); s.post(c); coro.release(); } // strand handle dropped here @@ -518,14 +522,16 @@ struct strand_test std::vector coros; coros.reserve(num_strands * posts_per_strand); + std::vector conts; + conts.reserve(num_strands * posts_per_strand); for(int i = 0; i < num_strands; ++i) { for(int j = 0; j < posts_per_strand; ++j) { coros.push_back(make_counter_coro(total)); - continuation c{coros.back().handle()}; - strands[i].post(c); + conts.push_back({coros.back().handle()}); + strands[i].post(conts.back()); coros.back().release(); } } @@ -671,12 +677,14 @@ struct strand_test std::vector coros; coros.reserve(N); + std::vector conts; + conts.reserve(N); for(int i = 0; i < N; ++i) { coros.push_back(make_counter_coro(counter)); - continuation c{coros.back().handle()}; - s.post(c); + conts.push_back({coros.back().handle()}); + s.post(conts.back()); coros.back().release(); } @@ -697,17 +705,31 @@ struct strand_test std::vector threads; threads.reserve(num_threads); + // Storage hoisted out of the threads so each continuation + // outlives its time in the strand queue. + std::vector> coros_per_thread(num_threads); + std::vector> conts_per_thread(num_threads); + for(int i = 0; i < num_threads; ++i) + { + coros_per_thread[i].reserve(per_thread); + conts_per_thread[i].reserve(per_thread); + } + for(int i = 0; i < num_threads; ++i) { - threads.emplace_back([&s, &counter]{ - for(int j = 0; j < per_thread; ++j) + threads.emplace_back( + [&s, &counter, + &my_coros = coros_per_thread[i], + &my_conts = conts_per_thread[i]] { - auto coro = make_counter_coro(counter); - continuation c{coro.handle()}; - s.post(c); - coro.release(); - } - }); + for(int j = 0; j < per_thread; ++j) + { + my_coros.push_back(make_counter_coro(counter)); + my_conts.push_back({my_coros.back().handle()}); + s.post(my_conts.back()); + my_coros.back().release(); + } + }); } for(auto& t : threads) @@ -730,13 +752,15 @@ struct strand_test std::vector coros; coros.reserve(N); + std::vector conts; + conts.reserve(N); // Post coroutines with sequential IDs for(int i = 0; i < N; ++i) { coros.push_back(make_order_coro(log, log_mutex, i)); - continuation c{coros.back().handle()}; - s.post(c); + conts.push_back({coros.back().handle()}); + s.post(conts.back()); coros.back().release(); } @@ -833,12 +857,14 @@ struct strand_test std::vector coros; coros.reserve(N); + std::vector conts; + conts.reserve(N); for(int i = 0; i < N; ++i) { coros.push_back(make_tracking_coro()); - continuation c{coros.back().handle()}; - s.post(c); + conts.push_back({coros.back().handle()}); + s.post(conts.back()); coros.back().release(); } @@ -904,12 +930,14 @@ struct strand_test std::vector coros; coros.reserve(N); + std::vector conts; + conts.reserve(N); for(int i = 0; i < N; ++i) { coros.push_back(make_counter_coro(counter)); - continuation c{coros.back().handle()}; - s.post(c); + conts.push_back({coros.back().handle()}); + s.post(conts.back()); coros.back().release(); }