98.14% Lines (158/161)
97.73% Functions (43/44)
| TLA | Baseline | Branch | ||||||
|---|---|---|---|---|---|---|---|---|
| Line | Hits | Code | Line | Hits | Code | |||
| 1 | // | 1 | // | |||||
| 2 | // Copyright (c) 2026 Steve Gerbino | 2 | // Copyright (c) 2026 Steve Gerbino | |||||
| 3 | // | 3 | // | |||||
| 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |||||
| 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |||||
| 6 | // | 6 | // | |||||
| 7 | // Official repository: https://github.com/cppalliance/capy | 7 | // Official repository: https://github.com/cppalliance/capy | |||||
| 8 | // | 8 | // | |||||
| 9 | 9 | |||||||
| 10 | #ifndef BOOST_CAPY_WHEN_ALL_HPP | 10 | #ifndef BOOST_CAPY_WHEN_ALL_HPP | |||||
| 11 | #define BOOST_CAPY_WHEN_ALL_HPP | 11 | #define BOOST_CAPY_WHEN_ALL_HPP | |||||
| 12 | 12 | |||||||
| 13 | #include <boost/capy/detail/config.hpp> | 13 | #include <boost/capy/detail/config.hpp> | |||||
| 14 | + | #include <boost/capy/detail/io_result_combinators.hpp> | ||||||
| 15 | + | #include <boost/capy/continuation.hpp> | ||||||
| 14 | #include <boost/capy/concept/executor.hpp> | 16 | #include <boost/capy/concept/executor.hpp> | |||||
| 15 | #include <boost/capy/concept/io_awaitable.hpp> | 17 | #include <boost/capy/concept/io_awaitable.hpp> | |||||
| 16 | #include <coroutine> | 18 | #include <coroutine> | |||||
| 19 | + | #include <boost/capy/ex/frame_alloc_mixin.hpp> | ||||||
| 17 | #include <boost/capy/ex/io_env.hpp> | 20 | #include <boost/capy/ex/io_env.hpp> | |||||
| 18 | #include <boost/capy/ex/frame_allocator.hpp> | 21 | #include <boost/capy/ex/frame_allocator.hpp> | |||||
| 19 | #include <boost/capy/task.hpp> | 22 | #include <boost/capy/task.hpp> | |||||
| 20 | 23 | |||||||
| 21 | #include <array> | 24 | #include <array> | |||||
| 22 | #include <atomic> | 25 | #include <atomic> | |||||
| 23 | #include <exception> | 26 | #include <exception> | |||||
| 27 | + | #include <memory> | ||||||
| 24 | #include <optional> | 28 | #include <optional> | |||||
| 29 | + | #include <ranges> | ||||||
| 30 | + | #include <stdexcept> | ||||||
| 25 | #include <stop_token> | 31 | #include <stop_token> | |||||
| 26 | #include <tuple> | 32 | #include <tuple> | |||||
| 27 | #include <type_traits> | 33 | #include <type_traits> | |||||
| 28 | #include <utility> | 34 | #include <utility> | |||||
| 35 | + | #include <vector> | ||||||
| 29 | 36 | |||||||
| 30 | namespace boost { | 37 | namespace boost { | |||||
| 31 | namespace capy { | 38 | namespace capy { | |||||
| 32 | 39 | |||||||
| 33 | namespace detail { | 40 | namespace detail { | |||||
| 34 | - | /** Type trait to filter void types from a tuple. | ||||||
| 35 | - | |||||||
| 36 | - | Void-returning tasks do not contribute a value to the result tuple. | ||||||
| 37 | - | This trait computes the filtered result type. | ||||||
| 38 | - | |||||||
| 39 | - | Example: filter_void_tuple_t<int, void, string> = tuple<int, string> | ||||||
| 40 | - | */ | ||||||
| 41 | - | template<typename T> | ||||||
| 42 | - | using wrap_non_void_t = std::conditional_t<std::is_void_v<T>, std::tuple<>, std::tuple<T>>; | ||||||
| 43 | - | |||||||
| 44 | - | template<typename... Ts> | ||||||
| 45 | - | using filter_void_tuple_t = decltype(std::tuple_cat(std::declval<wrap_non_void_t<Ts>>()...)); | ||||||
| 46 | - | |||||||
| 47 | 41 | |||||||
| 48 | /** Holds the result of a single task within when_all. | 42 | /** Holds the result of a single task within when_all. | |||||
| 49 | */ | 43 | */ | |||||
| 50 | template<typename T> | 44 | template<typename T> | |||||
| 51 | struct result_holder | 45 | struct result_holder | |||||
| 52 | { | 46 | { | |||||
| 53 | std::optional<T> value_; | 47 | std::optional<T> value_; | |||||
| 54 | 48 | |||||||
| HITCBC | 55 | 60 | void set(T v) | 49 | 81 | void set(T v) | ||
| 56 | { | 50 | { | |||||
| HITCBC | 57 | 60 | value_ = std::move(v); | 51 | 81 | value_ = std::move(v); | ||
| HITCBC | 58 | 60 | } | 52 | 81 | } | ||
| 59 | 53 | |||||||
| HITCBC | 60 | 53 | T get() && | 54 | 69 | T get() && | ||
| 61 | { | 55 | { | |||||
| HITCBC | 62 | 53 | return std::move(*value_); | 56 | 69 | return std::move(*value_); | ||
| 63 | } | 57 | } | |||||
| 64 | }; | 58 | }; | |||||
| 65 | 59 | |||||||
| 66 | - | /** Specialization for void tasks - no value storage needed. | 60 | + | /** Core shared state for when_all operations. | |||
| 67 | - | */ | ||||||
| 68 | - | template<> | ||||||
| 69 | - | struct result_holder<void> | ||||||
| 70 | - | { | ||||||
| 71 | - | }; | ||||||
| 72 | 61 | |||||||
| 73 | - | /** Shared state for when_all operation. | 62 | + | Contains all members and methods common to both heterogeneous (variadic) | |||
| 63 | + | and homogeneous (range) when_all implementations. State classes embed | ||||||
| 64 | + | this via composition to avoid CRTP destructor ordering issues. | ||||||
| 74 | 65 | |||||||
| 75 | - | @tparam Ts The result types of the tasks. | 66 | + | @par Thread Safety | |||
| 67 | + | Atomic operations protect exception capture and completion count. | ||||||
| 76 | */ | 68 | */ | |||||
| 77 | - | template<typename... Ts> | 69 | + | struct when_all_core | |||
| 78 | - | struct when_all_state | ||||||
| 79 | - | static constexpr std::size_t task_count = sizeof...(Ts); | ||||||
| 80 | - | |||||||
| 81 | - | // Completion tracking - when_all waits for all children | ||||||
| 82 | { | 70 | { | |||||
| 83 | std::atomic<std::size_t> remaining_count_; | 71 | std::atomic<std::size_t> remaining_count_; | |||||
| 84 | - | // Result storage in input order | ||||||
| 85 | - | std::tuple<result_holder<Ts>...> results_; | ||||||
| 86 | - | |||||||
| 87 | - | // Runner handles - destroyed in await_resume while allocator is valid | ||||||
| 88 | - | std::array<std::coroutine_handle<>, task_count> runner_handles_{}; | ||||||
| 89 | - | |||||||
| 90 | 72 | |||||||
| 91 | // Exception storage - first error wins, others discarded | 73 | // Exception storage - first error wins, others discarded | |||||
| 92 | std::atomic<bool> has_exception_{false}; | 74 | std::atomic<bool> has_exception_{false}; | |||||
| 93 | std::exception_ptr first_exception_; | 75 | std::exception_ptr first_exception_; | |||||
| 94 | - | // Stop propagation - on error, request stop for siblings | ||||||
| 95 | 76 | |||||||
| 96 | std::stop_source stop_source_; | 77 | std::stop_source stop_source_; | |||||
| 97 | 78 | |||||||
| 98 | - | // Connects parent's stop_token to our stop_source | 79 | + | // Bridges parent's stop token to our stop_source | |||
| 99 | struct stop_callback_fn | 80 | struct stop_callback_fn | |||||
| 100 | { | 81 | { | |||||
| 101 | std::stop_source* source_; | 82 | std::stop_source* source_; | |||||
| HITCBC | 102 | 4 | void operator()() const { source_->request_stop(); } | 83 | 1 | void operator()() const { source_->request_stop(); } | ||
| 103 | }; | 84 | }; | |||||
| 104 | using stop_callback_t = std::stop_callback<stop_callback_fn>; | 85 | using stop_callback_t = std::stop_callback<stop_callback_fn>; | |||||
| 105 | std::optional<stop_callback_t> parent_stop_callback_; | 86 | std::optional<stop_callback_t> parent_stop_callback_; | |||||
| 106 | 87 | |||||||
| 107 | - | // Parent resumption | 88 | + | continuation continuation_; | |||
| 108 | - | std::coroutine_handle<> continuation_; | ||||||
| 109 | io_env const* caller_env_ = nullptr; | 89 | io_env const* caller_env_ = nullptr; | |||||
| 110 | 90 | |||||||
| HITCBC | 111 | - | 59 | when_all_state() | 91 | + | 72 | explicit when_all_core(std::size_t count) noexcept |
| HITCBC | 112 | - | 59 | : remaining_count_(task_count) | 92 | + | 72 | : remaining_count_(count) |
| 113 | { | 93 | { | |||||
| HITCBC | 114 | 59 | } | 94 | 72 | } | ||
| 115 | 95 | |||||||
| 116 | - | // Runners self-destruct in final_suspend. No destruction needed here. | 96 | + | /** Capture an exception (first one wins). */ | |||
| 117 | - | |||||||
| 118 | - | /** Capture an exception (first one wins). | ||||||
| 119 | - | */ | ||||||
| HITCBC | 120 | 20 | void capture_exception(std::exception_ptr ep) | 97 | 19 | void capture_exception(std::exception_ptr ep) | ||
| 121 | { | 98 | { | |||||
| HITCBC | 122 | 20 | bool expected = false; | 99 | 19 | bool expected = false; | ||
| HITCBC | 123 | 20 | if(has_exception_.compare_exchange_strong( | 100 | 19 | if(has_exception_.compare_exchange_strong( | ||
| 124 | expected, true, std::memory_order_relaxed)) | 101 | expected, true, std::memory_order_relaxed)) | |||||
| HITCBC | 125 | 17 | first_exception_ = ep; | 102 | 17 | first_exception_ = ep; | ||
| HITCBC | 126 | 20 | } | 103 | 19 | } | ||
| 104 | + | }; | ||||||
| 105 | + | |||||||
| 106 | + | /** Shared state for heterogeneous when_all (variadic overload). | ||||||
| 107 | + | |||||||
| 108 | + | @tparam Ts The result types of the tasks. | ||||||
| 109 | + | */ | ||||||
| 110 | + | template<typename... Ts> | ||||||
| 111 | + | struct when_all_state | ||||||
| 112 | + | { | ||||||
| 113 | + | static constexpr std::size_t task_count = sizeof...(Ts); | ||||||
| 127 | 114 | |||||||
| 115 | + | when_all_core core_; | ||||||
| 116 | + | std::tuple<result_holder<Ts>...> results_; | ||||||
| 117 | + | std::array<continuation, task_count> runner_handles_{}; | ||||||
| 118 | + | |||||||
| 119 | + | std::atomic<bool> has_error_{false}; | ||||||
| 120 | + | std::error_code first_error_; | ||||||
| 121 | + | |||||||
| HITGNC | 122 | + | 50 | when_all_state() | ||||
| HITGNC | 123 | + | 50 | : core_(task_count) | ||||
| 124 | + | { | ||||||
| HITGNC | 125 | + | 50 | } | ||||
| 126 | + | |||||||
| 127 | + | /** Record the first error (subsequent errors are discarded). */ | ||||||
| HITGNC | 128 | + | 43 | void record_error(std::error_code ec) | ||||
| 129 | + | { | ||||||
| HITGNC | 130 | + | 43 | bool expected = false; | ||||
| HITGNC | 131 | + | 43 | if(has_error_.compare_exchange_strong( | ||||
| 132 | + | expected, true, std::memory_order_relaxed)) | ||||||
| HITGNC | 133 | + | 29 | first_error_ = ec; | ||||
| HITGNC | 134 | + | 43 | } | ||||
| 128 | }; | 135 | }; | |||||
| 129 | 136 | |||||||
| 130 | - | /** Wrapper coroutine that intercepts task completion. | 137 | + | /** Shared state for homogeneous when_all (range overload). | |||
| 131 | 138 | |||||||
| 132 | - | This runner awaits its assigned task and stores the result in | 139 | + | Stores extracted io_result payloads in a vector indexed by task | |||
| 133 | - | the shared state, or captures the exception and requests stop. | 140 | + | position. Tracks the first error_code for error propagation. | |||
| 141 | + | |||||||
| 142 | + | @tparam T The payload type extracted from io_result. | ||||||
| 134 | */ | 143 | */ | |||||
| 135 | - | template<typename T, typename... Ts> | 144 | + | template<typename T> | |||
| 136 | - | struct when_all_runner | 145 | + | struct when_all_homogeneous_state | |||
| 137 | { | 146 | { | |||||
| 138 | - | struct promise_type // : frame_allocating_base // DISABLED FOR TESTING | 147 | + | when_all_core core_; | |||
| 148 | + | std::vector<std::optional<T>> results_; | ||||||
| 149 | + | std::unique_ptr<continuation[]> runner_handles_; | ||||||
| 150 | + | |||||||
| 151 | + | std::atomic<bool> has_error_{false}; | ||||||
| 152 | + | std::error_code first_error_; | ||||||
| 153 | + | |||||||
| HITGNC | 154 | + | 11 | explicit when_all_homogeneous_state(std::size_t count) | ||||
| HITGNC | 155 | + | 11 | : core_(count) | ||||
| HITGNC | 156 | + | 22 | , results_(count) | ||||
| HITGNC | 157 | + | 11 | , runner_handles_(std::make_unique<continuation[]>(count)) | ||||
| 139 | { | 158 | { | |||||
| HITGIC | 140 | - | when_all_state<Ts...>* state_ = nullptr; | 159 | + | 11 | } | |
| 160 | + | |||||||
| HITGNC | 161 | + | 16 | void set_result(std::size_t index, T value) | ||||
| 162 | + | { | ||||||
| HITGNC | 163 | + | 16 | results_[index].emplace(std::move(value)); | ||||
| HITGNC | 164 | + | 16 | } | ||||
| 165 | + | |||||||
| 166 | + | /** Record the first error (subsequent errors are discarded). */ | ||||||
| HITGNC | 167 | + | 7 | void record_error(std::error_code ec) | ||||
| 168 | + | { | ||||||
| HITGNC | 169 | + | 7 | bool expected = false; | ||||
| HITGNC | 170 | + | 7 | if(has_error_.compare_exchange_strong( | ||||
| 171 | + | expected, true, std::memory_order_relaxed)) | ||||||
| HITGNC | 172 | + | 5 | first_error_ = ec; | ||||
| HITGNC | 173 | + | 7 | } | ||||
| 174 | + | }; | ||||||
| 175 | + | |||||||
| 176 | + | /** Specialization for void io_result children (no payload storage). */ | ||||||
| 177 | + | template<> | ||||||
| 178 | + | struct when_all_homogeneous_state<std::tuple<>> | ||||||
| 179 | + | { | ||||||
| 180 | + | when_all_core core_; | ||||||
| 181 | + | std::unique_ptr<continuation[]> runner_handles_; | ||||||
| 182 | + | |||||||
| 183 | + | std::atomic<bool> has_error_{false}; | ||||||
| 184 | + | std::error_code first_error_; | ||||||
| 185 | + | |||||||
| HITGNC | 186 | + | 3 | explicit when_all_homogeneous_state(std::size_t count) | ||||
| HITGNC | 187 | + | 3 | : core_(count) | ||||
| HITGNC | 188 | + | 3 | , runner_handles_(std::make_unique<continuation[]>(count)) | ||||
| 189 | + | { | ||||||
| HITGNC | 190 | + | 3 | } | ||||
| 191 | + | |||||||
| 192 | + | /** Record the first error (subsequent errors are discarded). */ | ||||||
| HITGNC | 193 | + | 1 | void record_error(std::error_code ec) | ||||
| 194 | + | { | ||||||
| HITGNC | 195 | + | 1 | bool expected = false; | ||||
| HITGNC | 196 | + | 1 | if(has_error_.compare_exchange_strong( | ||||
| 197 | + | expected, true, std::memory_order_relaxed)) | ||||||
| HITGNC | 198 | + | 1 | first_error_ = ec; | ||||
| HITGNC | 199 | + | 1 | } | ||||
| 200 | + | }; | ||||||
| 201 | + | |||||||
| 202 | + | /** Wrapper coroutine that intercepts task completion for when_all. | ||||||
| 203 | + | |||||||
| 204 | + | Parameterized on StateType to work with both heterogeneous (variadic) | ||||||
| 205 | + | and homogeneous (range) state types. All state types expose their | ||||||
| 206 | + | shared members through a `core_` member of type when_all_core. | ||||||
| 207 | + | |||||||
| 208 | + | @tparam StateType The state type (when_all_state or when_all_homogeneous_state). | ||||||
| 209 | + | */ | ||||||
| 210 | + | template<typename StateType> | ||||||
| 211 | + | struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE when_all_runner | ||||||
| 212 | + | { | ||||||
| 213 | + | struct promise_type | ||||||
| 214 | + | : frame_alloc_mixin | ||||||
| 215 | + | { | ||||||
| 216 | + | StateType* state_ = nullptr; | ||||||
| 217 | + | std::size_t index_ = 0; | ||||||
| 141 | io_env env_; | 218 | io_env env_; | |||||
| 142 | 219 | |||||||
| HITCBC | 143 | - | 130 | when_all_runner get_return_object() | 220 | + | 145 | when_all_runner get_return_object() noexcept |
| 144 | { | 221 | { | |||||
| ECB | 145 | - | 130 | return when_all_runner(std::coroutine_handle<promise_type>::from_promise(*this)); | 222 | + | return when_all_runner( | |
| HITGNC | 223 | + | 145 | std::coroutine_handle<promise_type>::from_promise(*this)); | ||||
| 146 | } | 224 | } | |||||
| 147 | 225 | |||||||
| HITCBC | 148 | 130 | std::suspend_always initial_suspend() noexcept | 226 | 145 | std::suspend_always initial_suspend() noexcept | ||
| 149 | { | 227 | { | |||||
| HITCBC | 150 | 130 | return {}; | 228 | 145 | return {}; | ||
| 151 | } | 229 | } | |||||
| 152 | 230 | |||||||
| HITCBC | 153 | 130 | auto final_suspend() noexcept | 231 | 145 | auto final_suspend() noexcept | ||
| 154 | { | 232 | { | |||||
| 155 | struct awaiter | 233 | struct awaiter | |||||
| 156 | { | 234 | { | |||||
| 157 | promise_type* p_; | 235 | promise_type* p_; | |||||
| HITGIC | 158 | - | 236 | + | 145 | bool await_ready() const noexcept { return false; } | ||
| HITCBC | 159 | - | 130 | bool await_ready() const noexcept | 237 | + | 145 | auto await_suspend(std::coroutine_handle<> h) noexcept |
| 160 | - | { | ||||||
| DCB | 161 | - | 130 | return false; | ||||
| 162 | - | } | ||||||
| 163 | - | |||||||
| DCB | 164 | - | 130 | std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept | ||||
| 165 | { | 238 | { | |||||
| HITGIC | 166 | - | // Extract everything needed before self-destruction. | 239 | + | 145 | auto& core = p_->state_->core_; | |
| HITCBC | 167 | - | 130 | auto* state = p_->state_; | 240 | + | 145 | auto* counter = &core.remaining_count_; |
| HITCBC | 168 | - | 130 | auto* counter = &state->remaining_count_; | 241 | + | 145 | auto* caller_env = core.caller_env_; |
| HITCBC | 169 | - | 130 | auto* caller_env = state->caller_env_; | 242 | + | 145 | auto& cont = core.continuation_; |
| DCB | 170 | - | 130 | auto cont = state->continuation_; | ||||
| 171 | 243 | |||||||
| HITCBC | 172 | 130 | h.destroy(); | 244 | 145 | h.destroy(); | ||
| 173 | - | // If last runner, dispatch parent for symmetric transfer. | ||||||
| 174 | 245 | |||||||
| HITCBC | 175 | 130 | auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel); | 246 | 145 | auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel); | ||
| HITCBC | 176 | 130 | if(remaining == 1) | 247 | 145 | if(remaining == 1) | ||
| HITCBC | 177 | - | 59 | return caller_env->executor.dispatch(cont); | 248 | + | 72 | return detail::symmetric_transfer(caller_env->executor.dispatch(cont)); |
| HITCBC | 178 | - | 71 | return std::noop_coroutine(); | 249 | + | 73 | return detail::symmetric_transfer(std::noop_coroutine()); |
| 179 | - | } | ||||||
| 180 | - | |||||||
| DUB | 181 | - | ✗ | void await_resume() const noexcept | ||||
| 182 | - | { | ||||||
| EUB | 183 | ✗ | } | 250 | } | |||
| MISUNC | 251 | + | ✗ | void await_resume() const noexcept {} | ||||
| 184 | }; | 252 | }; | |||||
| HITCBC | 185 | 130 | return awaiter{this}; | 253 | 145 | return awaiter{this}; | ||
| 186 | } | 254 | } | |||||
| 187 | 255 | |||||||
| HITCBC | 188 | - | 110 | void return_void() | 256 | + | 126 | void return_void() noexcept {} |
| 189 | - | { | ||||||
| DCB | 190 | - | 110 | } | ||||
| 191 | 257 | |||||||
| HITCBC | 192 | - | 20 | void unhandled_exception() | 258 | + | 19 | void unhandled_exception() noexcept |
| 193 | { | 259 | { | |||||
| HITCBC | 194 | - | 20 | state_->capture_exception(std::current_exception()); | 260 | + | 19 | state_->core_.capture_exception(std::current_exception()); |
| HITGIC | 195 | - | // Request stop for sibling tasks | 261 | + | 19 | state_->core_.stop_source_.request_stop(); | |
| DCB | 196 | - | 20 | state_->stop_source_.request_stop(); | ||||
| HITCBC | 197 | 20 | } | 262 | 19 | } | ||
| 198 | 263 | |||||||
| 199 | template<class Awaitable> | 264 | template<class Awaitable> | |||||
| 200 | struct transform_awaiter | 265 | struct transform_awaiter | |||||
| 201 | { | 266 | { | |||||
| 202 | std::decay_t<Awaitable> a_; | 267 | std::decay_t<Awaitable> a_; | |||||
| 203 | promise_type* p_; | 268 | promise_type* p_; | |||||
| 204 | 269 | |||||||
| HITCBC | 205 | - | 130 | bool await_ready() | 270 | + | 145 | bool await_ready() { return a_.await_ready(); } |
| HITGIC | 206 | - | { | 271 | + | 145 | decltype(auto) await_resume() { return a_.await_resume(); } | |
| DCB | 207 | - | 130 | return a_.await_ready(); | ||||
| 208 | - | } | ||||||
| 209 | - | |||||||
| DCB | 210 | - | 130 | decltype(auto) await_resume() | ||||
| 211 | - | { | ||||||
| DCB | 212 | - | 130 | return a_.await_resume(); | ||||
| 213 | - | } | ||||||
| 214 | 272 | |||||||
| 215 | template<class Promise> | 273 | template<class Promise> | |||||
| HITCBC | 216 | 129 | auto await_suspend(std::coroutine_handle<Promise> h) | 274 | 144 | auto await_suspend(std::coroutine_handle<Promise> h) | ||
| 217 | { | 275 | { | |||||
| ECB | 218 | - | 129 | return a_.await_suspend(h, &p_->env_); | 276 | + | using R = decltype(a_.await_suspend(h, &p_->env_)); | |
| 277 | + | if constexpr (std::is_same_v<R, std::coroutine_handle<>>) | ||||||
| HITGNC | 278 | + | 144 | return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_)); | ||||
| 279 | + | else | ||||||
| 280 | + | return a_.await_suspend(h, &p_->env_); | ||||||
| 219 | } | 281 | } | |||||
| 220 | }; | 282 | }; | |||||
| 221 | 283 | |||||||
| 222 | template<class Awaitable> | 284 | template<class Awaitable> | |||||
| HITCBC | 223 | 130 | auto await_transform(Awaitable&& a) | 285 | 145 | auto await_transform(Awaitable&& a) | ||
| 224 | { | 286 | { | |||||
| 225 | using A = std::decay_t<Awaitable>; | 287 | using A = std::decay_t<Awaitable>; | |||||
| 226 | if constexpr (IoAwaitable<A>) | 288 | if constexpr (IoAwaitable<A>) | |||||
| 227 | { | 289 | { | |||||
| 228 | return transform_awaiter<Awaitable>{ | 290 | return transform_awaiter<Awaitable>{ | |||||
| HITCBC | 229 | 260 | std::forward<Awaitable>(a), this}; | 291 | 290 | std::forward<Awaitable>(a), this}; | ||
| 230 | } | 292 | } | |||||
| 231 | else | 293 | else | |||||
| 232 | { | 294 | { | |||||
| 233 | static_assert(sizeof(A) == 0, "requires IoAwaitable"); | 295 | static_assert(sizeof(A) == 0, "requires IoAwaitable"); | |||||
| 234 | } | 296 | } | |||||
| HITCBC | 235 | 130 | } | 297 | 145 | } | ||
| 236 | }; | 298 | }; | |||||
| 237 | 299 | |||||||
| 238 | std::coroutine_handle<promise_type> h_; | 300 | std::coroutine_handle<promise_type> h_; | |||||
| 239 | 301 | |||||||
| HITCBC | 240 | - | 130 | explicit when_all_runner(std::coroutine_handle<promise_type> h) | 302 | + | 145 | explicit when_all_runner(std::coroutine_handle<promise_type> h) noexcept |
| HITCBC | 241 | 130 | : h_(h) | 303 | 145 | : h_(h) | ||
| 242 | { | 304 | { | |||||
| HITCBC | 243 | 130 | } | 305 | 145 | } | ||
| 244 | 306 | |||||||
| 245 | // Enable move for all clang versions - some versions need it | 307 | // Enable move for all clang versions - some versions need it | |||||
| 246 | - | when_all_runner(when_all_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {} | 308 | + | when_all_runner(when_all_runner&& other) noexcept | |||
| 309 | + | : h_(std::exchange(other.h_, nullptr)) | ||||||
| 310 | + | { | ||||||
| 311 | + | } | ||||||
| 247 | - | // Non-copyable | ||||||
| 248 | 312 | |||||||
| 249 | when_all_runner(when_all_runner const&) = delete; | 313 | when_all_runner(when_all_runner const&) = delete; | |||||
| 250 | when_all_runner& operator=(when_all_runner const&) = delete; | 314 | when_all_runner& operator=(when_all_runner const&) = delete; | |||||
| 251 | when_all_runner& operator=(when_all_runner&&) = delete; | 315 | when_all_runner& operator=(when_all_runner&&) = delete; | |||||
| 252 | 316 | |||||||
| HITCBC | 253 | 130 | auto release() noexcept | 317 | 145 | auto release() noexcept | ||
| 254 | { | 318 | { | |||||
| HITCBC | 255 | 130 | return std::exchange(h_, nullptr); | 319 | 145 | return std::exchange(h_, nullptr); | ||
| 256 | } | 320 | } | |||||
| 257 | }; | 321 | }; | |||||
| 258 | 322 | |||||||
| 259 | - | /** Create a runner coroutine for a single awaitable. | 323 | + | /** Create an io_result-aware runner for a single awaitable (range path). | |||
| 260 | 324 | |||||||
| 261 | - | Awaitable is passed directly to ensure proper coroutine frame storage. | 325 | + | Checks the error code, records errors and requests stop on failure, | |||
| 326 | + | or extracts the payload on success. | ||||||
| 262 | */ | 327 | */ | |||||
| 263 | - | template<std::size_t Index, IoAwaitable Awaitable, typename... Ts> | 328 | + | template<IoAwaitable Awaitable, typename StateType> | |||
| 264 | - | when_all_runner<awaitable_result_t<Awaitable>, Ts...> | 329 | + | when_all_runner<StateType> | |||
| HITCBC | 265 | - | 130 | make_when_all_runner(Awaitable inner, when_all_state<Ts...>* state) | 330 | + | 32 | make_when_all_homogeneous_runner(Awaitable inner, StateType* state, std::size_t index) |
| 266 | { | 331 | { | |||||
| 267 | - | using T = awaitable_result_t<Awaitable>; | 332 | + | auto result = co_await std::move(inner); | |||
| 268 | - | if constexpr (std::is_void_v<T>) | 333 | + | ||||
| 334 | + | if(result.ec) | ||||||
| 269 | { | 335 | { | |||||
| 270 | - | co_await std::move(inner); | 336 | + | state->record_error(result.ec); | |||
| 337 | + | state->core_.stop_source_.request_stop(); | ||||||
| 271 | } | 338 | } | |||||
| 272 | else | 339 | else | |||||
| 273 | { | 340 | { | |||||
| 274 | - | std::get<Index>(state->results_).set(co_await std::move(inner)); | 341 | + | using PayloadT = io_result_payload_t< | |||
| 342 | + | awaitable_result_t<Awaitable>>; | ||||||
| 343 | + | if constexpr (!std::is_same_v<PayloadT, std::tuple<>>) | ||||||
| 344 | + | { | ||||||
| 345 | + | state->set_result(index, | ||||||
| 346 | + | extract_io_payload(std::move(result))); | ||||||
| 347 | + | } | ||||||
| 275 | } | 348 | } | |||||
| HITCBC | 276 | 260 | } | 349 | 64 | } | ||
| 277 | 350 | |||||||
| 278 | - | /** Internal awaitable that launches all runner coroutines and waits. | 351 | + | /** Create a runner for io_result children that requests stop on ec. */ | |||
| 352 | + | template<std::size_t Index, IoAwaitable Awaitable, typename... Ts> | ||||||
| 353 | + | when_all_runner<when_all_state<Ts...>> | ||||||
| HITGNC | 354 | + | 97 | make_when_all_io_runner(Awaitable inner, when_all_state<Ts...>* state) | ||||
| 355 | + | { | ||||||
| 356 | + | auto result = co_await std::move(inner); | ||||||
| 357 | + | auto ec = result.ec; | ||||||
| 358 | + | std::get<Index>(state->results_).set(std::move(result)); | ||||||
| 279 | 359 | |||||||
| 280 | - | This awaitable is used inside the when_all coroutine to handle | 360 | + | if(ec) | |||
| 281 | - | the concurrent execution of child awaitables. | 361 | + | { | |||
| 282 | - | */ | 362 | + | state->record_error(ec); | |||
| 363 | + | state->core_.stop_source_.request_stop(); | ||||||
| 364 | + | } | ||||||
| HITGNC | 365 | + | 194 | } | ||||
| 366 | + | |||||||
| 367 | + | /** Launcher that uses io_result-aware runners. */ | ||||||
| 283 | template<IoAwaitable... Awaitables> | 368 | template<IoAwaitable... Awaitables> | |||||
| 284 | - | class when_all_launcher | 369 | + | class when_all_io_launcher | |||
| 285 | { | 370 | { | |||||
| 286 | using state_type = when_all_state<awaitable_result_t<Awaitables>...>; | 371 | using state_type = when_all_state<awaitable_result_t<Awaitables>...>; | |||||
| 287 | 372 | |||||||
| 288 | std::tuple<Awaitables...>* awaitables_; | 373 | std::tuple<Awaitables...>* awaitables_; | |||||
| 289 | state_type* state_; | 374 | state_type* state_; | |||||
| 290 | 375 | |||||||
| 291 | public: | 376 | public: | |||||
| HITCBC | 292 | - | 59 | when_all_launcher( | 377 | + | 50 | when_all_io_launcher( |
| 293 | std::tuple<Awaitables...>* awaitables, | 378 | std::tuple<Awaitables...>* awaitables, | |||||
| 294 | state_type* state) | 379 | state_type* state) | |||||
| HITCBC | 295 | 59 | : awaitables_(awaitables) | 380 | 50 | : awaitables_(awaitables) | ||
| HITCBC | 296 | 59 | , state_(state) | 381 | 50 | , state_(state) | ||
| 297 | { | 382 | { | |||||
| HITCBC | 298 | 59 | } | 383 | 50 | } | ||
| 299 | 384 | |||||||
| HITCBC | 300 | 59 | bool await_ready() const noexcept | 385 | 50 | bool await_ready() const noexcept | ||
| 301 | { | 386 | { | |||||
| HITCBC | 302 | 59 | return sizeof...(Awaitables) == 0; | 387 | 50 | return sizeof...(Awaitables) == 0; | ||
| 303 | } | 388 | } | |||||
| 304 | 389 | |||||||
| HITCBC | 305 | - | 59 | std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env) | 390 | + | 50 | std::coroutine_handle<> await_suspend( |
| 391 | + | std::coroutine_handle<> continuation, io_env const* caller_env) | ||||||
| 306 | { | 392 | { | |||||
| HITCBC | 307 | - | 59 | state_->continuation_ = continuation; | 393 | + | 50 | state_->core_.continuation_.h = continuation; |
| HITCBC | 308 | - | 59 | state_->caller_env_ = caller_env; | 394 | + | 50 | state_->core_.caller_env_ = caller_env; |
| 309 | - | // Forward parent's stop requests to children | ||||||
| 310 | 395 | |||||||
| HITCBC | 311 | 59 | if(caller_env->stop_token.stop_possible()) | 396 | 50 | if(caller_env->stop_token.stop_possible()) | ||
| 312 | { | 397 | { | |||||
| HITCBC | 313 | - | 16 | state_->parent_stop_callback_.emplace( | 398 | + | 2 | state_->core_.parent_stop_callback_.emplace( |
| HITCBC | 314 | 8 | caller_env->stop_token, | 399 | 1 | caller_env->stop_token, | ||
| HITCBC | 315 | - | 8 | typename state_type::stop_callback_fn{&state_->stop_source_}); | 400 | + | 1 | when_all_core::stop_callback_fn{&state_->core_.stop_source_}); |
| 316 | 401 | |||||||
| HITCBC | 317 | 8 | if(caller_env->stop_token.stop_requested()) | 402 | 1 | if(caller_env->stop_token.stop_requested()) | ||
| MISLBC | 318 | - | 4 | state_->stop_source_.request_stop(); | 403 | + | ✗ | state_->core_.stop_source_.request_stop(); |
| 319 | } | 404 | } | |||||
| 320 | 405 | |||||||
| HITGIC | 321 | - | // CRITICAL: If the last task finishes synchronously then the parent | 406 | + | 50 | auto token = state_->core_.stop_source_.get_token(); | |
| 322 | - | // coroutine resumes, destroying its frame, and destroying this object | ||||||
| 323 | - | // prior to the completion of await_suspend. Therefore, await_suspend | ||||||
| 324 | - | // must ensure `this` cannot be referenced after calling `launch_one` | ||||||
| 325 | - | // for the last time. | ||||||
| DCB | 326 | - | 59 | auto token = state_->stop_source_.get_token(); | ||||
| HITCBC | 327 | 58 | [&]<std::size_t... Is>(std::index_sequence<Is...>) { | 407 | 46 | [&]<std::size_t... Is>(std::index_sequence<Is...>) { | ||
| HITCBC | 328 | 59 | (..., launch_one<Is>(caller_env->executor, token)); | 408 | 50 | (..., launch_one<Is>(caller_env->executor, token)); | ||
| HITCBC | 329 | 59 | }(std::index_sequence_for<Awaitables...>{}); | 409 | 50 | }(std::index_sequence_for<Awaitables...>{}); | ||
| 330 | - | // Let signal_completion() handle resumption | ||||||
| 331 | 410 | |||||||
| HITCBC | 332 | 118 | return std::noop_coroutine(); | 411 | 100 | return std::noop_coroutine(); | ||
| HITCBC | 333 | 59 | } | 412 | 50 | } | ||
| 334 | 413 | |||||||
| HITCBC | 335 | - | 59 | void await_resume() const noexcept | 414 | + | 50 | void await_resume() const noexcept {} |
| 336 | - | { | ||||||
| 337 | - | // Results are extracted by the when_all coroutine from state | ||||||
| DCB | 338 | - | 59 | } | ||||
| 339 | 415 | |||||||
| 340 | private: | 416 | private: | |||||
| 341 | template<std::size_t I> | 417 | template<std::size_t I> | |||||
| HITCBC | 342 | 130 | void launch_one(executor_ref caller_ex, std::stop_token token) | 418 | 97 | void launch_one(executor_ref caller_ex, std::stop_token token) | ||
| 343 | { | 419 | { | |||||
| HITCBC | 344 | - | 130 | auto runner = make_when_all_runner<I>( | 420 | + | 97 | auto runner = make_when_all_io_runner<I>( |
| HITCBC | 345 | 130 | std::move(std::get<I>(*awaitables_)), state_); | 421 | 97 | std::move(std::get<I>(*awaitables_)), state_); | ||
| 346 | 422 | |||||||
| HITCBC | 347 | 130 | auto h = runner.release(); | 423 | 97 | auto h = runner.release(); | ||
| HITCBC | 348 | 130 | h.promise().state_ = state_; | 424 | 97 | h.promise().state_ = state_; | ||
| HITCBC | 349 | - | 130 | h.promise().env_ = io_env{caller_ex, token, state_->caller_env_->frame_allocator}; | 425 | + | 97 | h.promise().env_ = io_env{caller_ex, token, |
| HITGNC | 426 | + | 97 | state_->core_.caller_env_->frame_allocator}; | ||||
| 350 | 427 | |||||||
| HITCBC | 351 | - | 130 | std::coroutine_handle<> ch{h}; | 428 | + | 97 | state_->runner_handles_[I].h = std::coroutine_handle<>{h}; |
| HITCBC | 352 | - | 130 | state_->runner_handles_[I] = ch; | 429 | + | 97 | state_->core_.caller_env_->executor.post(state_->runner_handles_[I]); |
| DCB | 353 | - | 130 | state_->caller_env_->executor.post(ch); | ||||
| HITCBC | 354 | 260 | } | 430 | 194 | } | ||
| 355 | }; | 431 | }; | |||||
| 356 | 432 | |||||||
| 357 | - | /** Compute the result type for when_all. | 433 | + | /** Helper to extract a single result from state. | |||
| 358 | - | |||||||
| 359 | - | Returns void when all tasks are void (P2300 aligned), | ||||||
| 360 | - | otherwise returns a tuple with void types filtered out. | ||||||
| 361 | - | */ | ||||||
| 362 | - | template<typename... Ts> | ||||||
| 363 | - | using when_all_result_t = std::conditional_t< | ||||||
| 364 | - | std::is_same_v<filter_void_tuple_t<Ts...>, std::tuple<>>, | ||||||
| 365 | - | void, | ||||||
| 366 | - | filter_void_tuple_t<Ts...>>; | ||||||
| 367 | - | |||||||
| 368 | - | /** Helper to extract a single result, returning empty tuple for void. | ||||||
| 369 | This is a separate function to work around a GCC-11 ICE that occurs | 434 | This is a separate function to work around a GCC-11 ICE that occurs | |||||
| 370 | when using nested immediately-invoked lambdas with pack expansion. | 435 | when using nested immediately-invoked lambdas with pack expansion. | |||||
| 371 | */ | 436 | */ | |||||
| 372 | template<std::size_t I, typename... Ts> | 437 | template<std::size_t I, typename... Ts> | |||||
| HITCBC | 373 | 57 | auto extract_single_result(when_all_state<Ts...>& state) | 438 | 69 | auto extract_single_result(when_all_state<Ts...>& state) | ||
| 374 | { | 439 | { | |||||
| HITGIC | 375 | - | using T = std::tuple_element_t<I, std::tuple<Ts...>>; | 440 | + | 69 | return std::move(std::get<I>(state.results_)).get(); | |
| 376 | - | if constexpr (std::is_void_v<T>) | ||||||
| DCB | 377 | - | 4 | return std::tuple<>(); | ||||
| 378 | - | else | ||||||
| DCB | 379 | - | 53 | return std::make_tuple(std::move(std::get<I>(state.results_)).get()); | ||||
| 380 | } | 441 | } | |||||
| 381 | 442 | |||||||
| 382 | - | /** Extract results from state, filtering void types. | 443 | + | /** Extract all results from state as a tuple. | |||
| 383 | */ | 444 | */ | |||||
| 384 | template<typename... Ts> | 445 | template<typename... Ts> | |||||
| HITCBC | 385 | 24 | auto extract_results(when_all_state<Ts...>& state) | 446 | 36 | auto extract_results(when_all_state<Ts...>& state) | ||
| 386 | { | 447 | { | |||||
| HITCBC | 387 | 43 | return [&]<std::size_t... Is>(std::index_sequence<Is...>) { | 448 | 55 | return [&]<std::size_t... Is>(std::index_sequence<Is...>) { | ||
| HITCBC | 388 | - | 25 | return std::tuple_cat(extract_single_result<Is>(state)...); | 449 | + | 36 | return std::tuple(extract_single_result<Is>(state)...); |
| HITCBC | 389 | 48 | }(std::index_sequence_for<Ts...>{}); | 450 | 72 | }(std::index_sequence_for<Ts...>{}); | ||
| 390 | } | 451 | } | |||||
| 391 | 452 | |||||||
| 453 | + | /** Launches all homogeneous runners concurrently. | ||||||
| 454 | + | |||||||
| 455 | + | Two-phase approach: create all runners first, then post all. | ||||||
| 456 | + | This avoids lifetime issues if a task completes synchronously. | ||||||
| 457 | + | */ | ||||||
| 458 | + | template<typename Range> | ||||||
| 459 | + | class when_all_homogeneous_launcher | ||||||
| 460 | + | { | ||||||
| 461 | + | using Awaitable = std::ranges::range_value_t<Range>; | ||||||
| 462 | + | using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>; | ||||||
| 463 | + | |||||||
| 464 | + | Range* range_; | ||||||
| 465 | + | when_all_homogeneous_state<PayloadT>* state_; | ||||||
| 466 | + | |||||||
| 467 | + | public: | ||||||
| HITGNC | 468 | + | 14 | when_all_homogeneous_launcher( | ||||
| 469 | + | Range* range, | ||||||
| 470 | + | when_all_homogeneous_state<PayloadT>* state) | ||||||
| HITGNC | 471 | + | 14 | : range_(range) | ||||
| HITGNC | 472 | + | 14 | , state_(state) | ||||
| 473 | + | { | ||||||
| HITGNC | 474 | + | 14 | } | ||||
| 475 | + | |||||||
| HITGNC | 476 | + | 14 | bool await_ready() const noexcept | ||||
| 477 | + | { | ||||||
| HITGNC | 478 | + | 14 | return std::ranges::empty(*range_); | ||||
| 479 | + | } | ||||||
| 480 | + | |||||||
| HITGNC | 481 | + | 14 | std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env) | ||||
| 482 | + | { | ||||||
| HITGNC | 483 | + | 14 | state_->core_.continuation_.h = continuation; | ||||
| HITGNC | 484 | + | 14 | state_->core_.caller_env_ = caller_env; | ||||
| 485 | + | |||||||
| HITGNC | 486 | + | 14 | if(caller_env->stop_token.stop_possible()) | ||||
| 487 | + | { | ||||||
| HITGNC | 488 | + | 2 | state_->core_.parent_stop_callback_.emplace( | ||||
| HITGNC | 489 | + | 1 | caller_env->stop_token, | ||||
| HITGNC | 490 | + | 1 | when_all_core::stop_callback_fn{&state_->core_.stop_source_}); | ||||
| 491 | + | |||||||
| HITGNC | 492 | + | 1 | if(caller_env->stop_token.stop_requested()) | ||||
| MISUNC | 493 | + | ✗ | state_->core_.stop_source_.request_stop(); | ||||
| 494 | + | } | ||||||
| 495 | + | |||||||
| HITGNC | 496 | + | 14 | auto token = state_->core_.stop_source_.get_token(); | ||||
| 497 | + | |||||||
| 498 | + | // Phase 1: Create all runners without dispatching. | ||||||
| HITGNC | 499 | + | 14 | std::size_t index = 0; | ||||
| HITGNC | 500 | + | 46 | for(auto&& a : *range_) | ||||
| 501 | + | { | ||||||
| HITGNC | 502 | + | 32 | auto runner = make_when_all_homogeneous_runner( | ||||
| HITGNC | 503 | + | 32 | std::move(a), state_, index); | ||||
| 504 | + | |||||||
| HITGNC | 505 | + | 32 | auto h = runner.release(); | ||||
| HITGNC | 506 | + | 32 | h.promise().state_ = state_; | ||||
| HITGNC | 507 | + | 32 | h.promise().index_ = index; | ||||
| HITGNC | 508 | + | 32 | h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator}; | ||||
| 509 | + | |||||||
| HITGNC | 510 | + | 32 | state_->runner_handles_[index].h = std::coroutine_handle<>{h}; | ||||
| HITGNC | 511 | + | 32 | ++index; | ||||
| 512 | + | } | ||||||
| 513 | + | |||||||
| 514 | + | // Phase 2: Post all runners. Any may complete synchronously. | ||||||
| 515 | + | // After last post, state_ and this may be destroyed. | ||||||
| HITGNC | 516 | + | 14 | auto* handles = state_->runner_handles_.get(); | ||||
| HITGNC | 517 | + | 14 | std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed); | ||||
| HITGNC | 518 | + | 46 | for(std::size_t i = 0; i < count; ++i) | ||||
| HITGNC | 519 | + | 32 | caller_env->executor.post(handles[i]); | ||||
| 520 | + | |||||||
| HITGNC | 521 | + | 28 | return std::noop_coroutine(); | ||||
| HITGNC | 522 | + | 46 | } | ||||
| 523 | + | |||||||
| HITGNC | 524 | + | 14 | void await_resume() const noexcept | ||||
| 525 | + | { | ||||||
| HITGNC | 526 | + | 14 | } | ||||
| 527 | + | }; | ||||||
| 528 | + | |||||||
| 392 | } // namespace detail | 529 | } // namespace detail | |||||
| 393 | 530 | |||||||
| 394 | - | /** Execute multiple awaitables concurrently and collect their results. | 531 | + | /** Execute a range of io_result-returning awaitables concurrently. | |||
| 395 | 532 | |||||||
| 396 | - | Launches all awaitables simultaneously and waits for all to complete | 533 | + | Launches all awaitables simultaneously and waits for all to complete. | |||
| 397 | - | before returning. Results are collected in input order. If any | 534 | + | On success, extracted payloads are collected in a vector preserving | |||
| 398 | - | awaitable throws, cancellation is requested for siblings and the first | 535 | + | input order. The first error_code cancels siblings and is propagated | |||
| 399 | - | exception is rethrown after all awaitables complete. | 536 | + | in the outer io_result. Exceptions always beat error codes. | |||
| 400 | 537 | |||||||
| 401 | @li All child awaitables run concurrently on the caller's executor | 538 | @li All child awaitables run concurrently on the caller's executor | |||||
| 402 | - | @li Results are returned as a tuple in input order | 539 | + | @li Payloads are returned as a vector in input order | |||
| 403 | - | @li Void-returning awaitables do not contribute to the result tuple | 540 | + | @li First error_code wins and cancels siblings | |||
| 404 | - | @li If all awaitables return void, `when_all` returns `task<void>` | 541 | + | @li Exception always beats error_code | |||
| 405 | - | @li First exception wins; subsequent exceptions are discarded | ||||||
| 406 | - | @li Stop is requested for siblings on first error | ||||||
| 407 | @li Completes only after all children have finished | 542 | @li Completes only after all children have finished | |||||
| 408 | 543 | |||||||
| 409 | @par Thread Safety | 544 | @par Thread Safety | |||||
| 410 | The returned task must be awaited from a single execution context. | 545 | The returned task must be awaited from a single execution context. | |||||
| 411 | Child awaitables execute concurrently but complete through the caller's | 546 | Child awaitables execute concurrently but complete through the caller's | |||||
| 412 | executor. | 547 | executor. | |||||
| 413 | 548 | |||||||
| 414 | - | @param awaitables The awaitables to execute concurrently. Each must | 549 | + | @param awaitables Range of io_result-returning awaitables to execute | |||
| 415 | - | satisfy @ref IoAwaitable and is consumed (moved-from) when | 550 | + | concurrently (must not be empty). | |||
| 416 | - | `when_all` is awaited. | ||||||
| 417 | 551 | |||||||
| 418 | - | @return A task yielding a tuple of non-void results. Returns | 552 | + | @return A task yielding io_result<vector<PayloadT>> where PayloadT | |||
| 419 | - | `task<void>` when all input awaitables return void. | 553 | + | is the payload extracted from each child's io_result. | |||
| 554 | + | |||||||
| 555 | + | @throws std::invalid_argument if range is empty (thrown before | ||||||
| 556 | + | coroutine suspends). | ||||||
| 557 | + | @throws Rethrows the first child exception after all children | ||||||
| 558 | + | complete (exception beats error_code). | ||||||
| 420 | 559 | |||||||
| 421 | @par Example | 560 | @par Example | |||||
| 561 | + | @code | ||||||
| 562 | + | task<void> example() | ||||||
| 563 | + | { | ||||||
| 564 | + | std::vector<io_task<size_t>> reads; | ||||||
| 565 | + | for (auto& buf : buffers) | ||||||
| 566 | + | reads.push_back(stream.read_some(buf)); | ||||||
| 567 | + | |||||||
| 568 | + | auto [ec, counts] = co_await when_all(std::move(reads)); | ||||||
| 569 | + | if (ec) { // handle error | ||||||
| 570 | + | } | ||||||
| 571 | + | } | ||||||
| 572 | + | @endcode | ||||||
| 422 | 573 | |||||||
| 574 | + | @see IoAwaitableRange, when_all | ||||||
| 575 | + | */ | ||||||
| 576 | + | template<IoAwaitableRange R> | ||||||
| 577 | + | requires detail::is_io_result_v< | ||||||
| 578 | + | awaitable_result_t<std::ranges::range_value_t<R>>> | ||||||
| 579 | + | && (!std::is_same_v< | ||||||
| 580 | + | detail::io_result_payload_t< | ||||||
| 581 | + | awaitable_result_t<std::ranges::range_value_t<R>>>, | ||||||
| 582 | + | std::tuple<>>) | ||||||
| HITGNC | 583 | + | 12 | [[nodiscard]] auto when_all(R&& awaitables) | ||||
| 584 | + | -> task<io_result<std::vector< | ||||||
| 585 | + | detail::io_result_payload_t< | ||||||
| 586 | + | awaitable_result_t<std::ranges::range_value_t<R>>>>>> | ||||||
| 587 | + | { | ||||||
| 588 | + | using Awaitable = std::ranges::range_value_t<R>; | ||||||
| 589 | + | using PayloadT = detail::io_result_payload_t< | ||||||
| 590 | + | awaitable_result_t<Awaitable>>; | ||||||
| 591 | + | using OwnedRange = std::remove_cvref_t<R>; | ||||||
| 592 | + | |||||||
| 593 | + | auto count = std::ranges::size(awaitables); | ||||||
| 594 | + | if(count == 0) | ||||||
| 595 | + | throw std::invalid_argument("when_all requires at least one awaitable"); | ||||||
| 596 | + | |||||||
| 597 | + | OwnedRange owned_awaitables = std::forward<R>(awaitables); | ||||||
| 598 | + | |||||||
| 599 | + | detail::when_all_homogeneous_state<PayloadT> state(count); | ||||||
| 600 | + | |||||||
| 601 | + | co_await detail::when_all_homogeneous_launcher<OwnedRange>( | ||||||
| 602 | + | &owned_awaitables, &state); | ||||||
| 603 | + | |||||||
| 604 | + | if(state.core_.first_exception_) | ||||||
| 605 | + | std::rethrow_exception(state.core_.first_exception_); | ||||||
| 606 | + | |||||||
| 607 | + | if(state.has_error_.load(std::memory_order_relaxed)) | ||||||
| 608 | + | co_return io_result<std::vector<PayloadT>>{state.first_error_, {}}; | ||||||
| 609 | + | |||||||
| 610 | + | std::vector<PayloadT> results; | ||||||
| 611 | + | results.reserve(count); | ||||||
| 612 | + | for(auto& opt : state.results_) | ||||||
| 613 | + | results.push_back(std::move(*opt)); | ||||||
| 614 | + | |||||||
| 615 | + | co_return io_result<std::vector<PayloadT>>{{}, std::move(results)}; | ||||||
| HITGNC | 616 | + | 24 | } | ||||
| 617 | + | |||||||
| 618 | + | /** Execute a range of void io_result-returning awaitables concurrently. | ||||||
| 619 | + | |||||||
| 620 | + | Launches all awaitables simultaneously and waits for all to complete. | ||||||
| 621 | + | Since all awaitables return io_result<>, no payload values are | ||||||
| 622 | + | collected. The first error_code cancels siblings and is propagated. | ||||||
| 623 | + | Exceptions always beat error codes. | ||||||
| 624 | + | |||||||
| 625 | + | @param awaitables Range of io_result<>-returning awaitables to | ||||||
| 626 | + | execute concurrently (must not be empty). | ||||||
| 627 | + | |||||||
| 628 | + | @return A task yielding io_result<> whose ec is the first child | ||||||
| 629 | + | error, or default-constructed on success. | ||||||
| 630 | + | |||||||
| 631 | + | @throws std::invalid_argument if range is empty. | ||||||
| 632 | + | @throws Rethrows the first child exception after all children | ||||||
| 633 | + | complete (exception beats error_code). | ||||||
| 634 | + | |||||||
| 635 | + | @par Example | ||||||
| 423 | @code | 636 | @code | |||||
| 424 | - | task<> example() | 637 | + | task<void> example() | |||
| 425 | { | 638 | { | |||||
| 426 | - | // Concurrent fetch, results collected in order | 639 | + | std::vector<io_task<>> jobs; | |||
| 427 | - | auto [user, posts] = co_await when_all( | 640 | + | for (int i = 0; i < n; ++i) | |||
| 428 | - | fetch_user( id ), // task<User> | 641 | + | jobs.push_back(process(i)); | |||
| 429 | - | fetch_posts( id ) // task<std::vector<Post>> | ||||||
| 430 | - | ); | ||||||
| 431 | 642 | |||||||
| 432 | - | // Void awaitables don't contribute to result | 643 | + | auto [ec] = co_await when_all(std::move(jobs)); | |||
| 433 | - | co_await when_all( | ||||||
| 434 | - | log_event( "start" ), // task<void> | ||||||
| 435 | - | notify_user( id ) // task<void> | ||||||
| 436 | - | ); | ||||||
| 437 | - | // Returns task<void>, no result tuple | ||||||
| 438 | } | 644 | } | |||||
| 439 | @endcode | 645 | @endcode | |||||
| 440 | 646 | |||||||
| 441 | - | @see IoAwaitable, task | 647 | + | @see IoAwaitableRange, when_all | |||
| 648 | + | */ | ||||||
| 649 | + | template<IoAwaitableRange R> | ||||||
| 650 | + | requires detail::is_io_result_v< | ||||||
| 651 | + | awaitable_result_t<std::ranges::range_value_t<R>>> | ||||||
| 652 | + | && std::is_same_v< | ||||||
| 653 | + | detail::io_result_payload_t< | ||||||
| 654 | + | awaitable_result_t<std::ranges::range_value_t<R>>>, | ||||||
| 655 | + | std::tuple<>> | ||||||
| HITGNC | 656 | + | 4 | [[nodiscard]] auto when_all(R&& awaitables) -> task<io_result<>> | ||||
| 657 | + | { | ||||||
| 658 | + | using OwnedRange = std::remove_cvref_t<R>; | ||||||
| 659 | + | |||||||
| 660 | + | auto count = std::ranges::size(awaitables); | ||||||
| 661 | + | if(count == 0) | ||||||
| 662 | + | throw std::invalid_argument("when_all requires at least one awaitable"); | ||||||
| 663 | + | |||||||
| 664 | + | OwnedRange owned_awaitables = std::forward<R>(awaitables); | ||||||
| 665 | + | |||||||
| 666 | + | detail::when_all_homogeneous_state<std::tuple<>> state(count); | ||||||
| 667 | + | |||||||
| 668 | + | co_await detail::when_all_homogeneous_launcher<OwnedRange>( | ||||||
| 669 | + | &owned_awaitables, &state); | ||||||
| 670 | + | |||||||
| 671 | + | if(state.core_.first_exception_) | ||||||
| 672 | + | std::rethrow_exception(state.core_.first_exception_); | ||||||
| 673 | + | |||||||
| 674 | + | if(state.has_error_.load(std::memory_order_relaxed)) | ||||||
| 675 | + | co_return io_result<>{state.first_error_}; | ||||||
| 676 | + | |||||||
| 677 | + | co_return io_result<>{}; | ||||||
| HITGNC | 678 | + | 8 | } | ||||
| 679 | + | |||||||
| 680 | + | /** Execute io_result-returning awaitables concurrently, inspecting error codes. | ||||||
| 681 | + | |||||||
| 682 | + | Overload selected when all children return io_result<Ts...>. | ||||||
| 683 | + | The error_code is lifted out of each child into a single outer | ||||||
| 684 | + | io_result. On success all values are returned; on failure the | ||||||
| 685 | + | first error_code wins. | ||||||
| 686 | + | |||||||
| 687 | + | @par Exception Safety | ||||||
| 688 | + | Exception always beats error_code. If any child throws, the | ||||||
| 689 | + | exception is rethrown regardless of error_code results. | ||||||
| 690 | + | |||||||
| 691 | + | @param awaitables One or more awaitables each returning | ||||||
| 692 | + | io_result<Ts...>. | ||||||
| 693 | + | |||||||
| 694 | + | @return A task yielding io_result<R1, R2, ..., Rn> where each Ri | ||||||
| 695 | + | follows the payload flattening rules. | ||||||
| 442 | */ | 696 | */ | |||||
| 443 | template<IoAwaitable... As> | 697 | template<IoAwaitable... As> | |||||
| 698 | + | requires (sizeof...(As) > 0) | ||||||
| 699 | + | && detail::all_io_result_awaitables<As...> | ||||||
| HITCBC | 444 | 59 | [[nodiscard]] auto when_all(As... awaitables) | 700 | 50 | [[nodiscard]] auto when_all(As... awaitables) | ||
| 445 | - | -> task<detail::when_all_result_t<detail::awaitable_result_t<As>...>> | 701 | + | -> task<io_result< | |||
| 702 | + | detail::io_result_payload_t<awaitable_result_t<As>>...>> | ||||||
| 446 | { | 703 | { | |||||
| 447 | - | using result_type = detail::when_all_result_t<detail::awaitable_result_t<As>...>; | 704 | + | using result_type = io_result< | |||
| 448 | - | 705 | + | detail::io_result_payload_t<awaitable_result_t<As>>...>; | ||||
| 449 | - | // State is stored in the coroutine frame, using the frame allocator | ||||||
| 450 | - | detail::when_all_state<detail::awaitable_result_t<As>...> state; | ||||||
| 451 | 706 | |||||||
| 452 | - | // Store awaitables in the frame | 707 | + | detail::when_all_state<awaitable_result_t<As>...> state; | |||
| 453 | std::tuple<As...> awaitable_tuple(std::move(awaitables)...); | 708 | std::tuple<As...> awaitable_tuple(std::move(awaitables)...); | |||||
| 454 | 709 | |||||||
| 455 | - | // Launch all awaitables and wait for completion | 710 | + | co_await detail::when_all_io_launcher<As...>(&awaitable_tuple, &state); | |||
| 456 | - | co_await detail::when_all_launcher<As...>(&awaitable_tuple, &state); | ||||||
| 457 | 711 | |||||||
| 458 | - | // Propagate first exception if any. | 712 | + | // Exception always wins over error_code | |||
| 459 | - | // Safe without explicit acquire: capture_exception() is sequenced-before | 713 | + | if(state.core_.first_exception_) | |||
| 460 | - | // signal_completion()'s acq_rel fetch_sub, which synchronizes-with the | 714 | + | std::rethrow_exception(state.core_.first_exception_); | |||
| 461 | - | // last task's decrement that resumes this coroutine. | ||||||
| 462 | - | if(state.first_exception_) | ||||||
| 463 | - | std::rethrow_exception(state.first_exception_); | ||||||
| 464 | 715 | |||||||
| 465 | - | // Extract and return results | 716 | + | auto r = detail::build_when_all_io_result<result_type>( | |||
| 466 | - | if constexpr (std::is_void_v<result_type>) | 717 | + | detail::extract_results(state)); | |||
| 467 | - | co_return; | 718 | + | if(state.has_error_.load(std::memory_order_relaxed)) | |||
| 468 | - | else | 719 | + | r.ec = state.first_error_; | |||
| 469 | - | co_return detail::extract_results(state); | 720 | + | co_return r; | |||
| DCB | 470 | - | 118 | |||||
| 471 | - | /// Compute the result type of `when_all` for the given task types. | ||||||
| 472 | - | template<typename... Ts> | ||||||
| 473 | - | using when_all_result_type = detail::when_all_result_t<Ts...>; | ||||||
| HITGIC | 474 | } | 721 | 100 | } | |||
| 475 | 722 | |||||||
| 476 | } // namespace capy | 723 | } // namespace capy | |||||
| 477 | } // namespace boost | 724 | } // namespace boost | |||||
| 478 | 725 | |||||||
| 479 | #endif | 726 | #endif | |||||