98.14% Lines (158/161) 97.73% Functions (43/44)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2026 Steve Gerbino 2   // Copyright (c) 2026 Steve Gerbino
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/capy 7   // Official repository: https://github.com/cppalliance/capy
8   // 8   //
9   9  
10   #ifndef BOOST_CAPY_WHEN_ALL_HPP 10   #ifndef BOOST_CAPY_WHEN_ALL_HPP
11   #define BOOST_CAPY_WHEN_ALL_HPP 11   #define BOOST_CAPY_WHEN_ALL_HPP
12   12  
13   #include <boost/capy/detail/config.hpp> 13   #include <boost/capy/detail/config.hpp>
  14 + #include <boost/capy/detail/io_result_combinators.hpp>
  15 + #include <boost/capy/continuation.hpp>
14   #include <boost/capy/concept/executor.hpp> 16   #include <boost/capy/concept/executor.hpp>
15   #include <boost/capy/concept/io_awaitable.hpp> 17   #include <boost/capy/concept/io_awaitable.hpp>
16   #include <coroutine> 18   #include <coroutine>
  19 + #include <boost/capy/ex/frame_alloc_mixin.hpp>
17   #include <boost/capy/ex/io_env.hpp> 20   #include <boost/capy/ex/io_env.hpp>
18   #include <boost/capy/ex/frame_allocator.hpp> 21   #include <boost/capy/ex/frame_allocator.hpp>
19   #include <boost/capy/task.hpp> 22   #include <boost/capy/task.hpp>
20   23  
21   #include <array> 24   #include <array>
22   #include <atomic> 25   #include <atomic>
23   #include <exception> 26   #include <exception>
  27 + #include <memory>
24   #include <optional> 28   #include <optional>
  29 + #include <ranges>
  30 + #include <stdexcept>
25   #include <stop_token> 31   #include <stop_token>
26   #include <tuple> 32   #include <tuple>
27   #include <type_traits> 33   #include <type_traits>
28   #include <utility> 34   #include <utility>
  35 + #include <vector>
29   36  
30   namespace boost { 37   namespace boost {
31   namespace capy { 38   namespace capy {
32   39  
33   namespace detail { 40   namespace detail {
34 - /** Type trait to filter void types from a tuple.  
35 -  
36 - Void-returning tasks do not contribute a value to the result tuple.  
37 - This trait computes the filtered result type.  
38 -  
39 - Example: filter_void_tuple_t<int, void, string> = tuple<int, string>  
40 - */  
41 - template<typename T>  
42 - using wrap_non_void_t = std::conditional_t<std::is_void_v<T>, std::tuple<>, std::tuple<T>>;  
43 -  
44 - template<typename... Ts>  
45 - using filter_void_tuple_t = decltype(std::tuple_cat(std::declval<wrap_non_void_t<Ts>>()...));  
46 -  
47   41  
48   /** Holds the result of a single task within when_all. 42   /** Holds the result of a single task within when_all.
49   */ 43   */
50   template<typename T> 44   template<typename T>
51   struct result_holder 45   struct result_holder
52   { 46   {
53   std::optional<T> value_; 47   std::optional<T> value_;
54   48  
HITCBC 55   60 void set(T v) 49   81 void set(T v)
56   { 50   {
HITCBC 57   60 value_ = std::move(v); 51   81 value_ = std::move(v);
HITCBC 58   60 } 52   81 }
59   53  
HITCBC 60   53 T get() && 54   69 T get() &&
61   { 55   {
HITCBC 62   53 return std::move(*value_); 56   69 return std::move(*value_);
63   } 57   }
64   }; 58   };
65   59  
66 - /** Specialization for void tasks - no value storage needed. 60 + /** Core shared state for when_all operations.
67 - */  
68 - template<>  
69 - struct result_holder<void>  
70 - {  
71 - };  
72   61  
73 - /** Shared state for when_all operation. 62 + Contains all members and methods common to both heterogeneous (variadic)
  63 + and homogeneous (range) when_all implementations. State classes embed
  64 + this via composition to avoid CRTP destructor ordering issues.
74   65  
75 - @tparam Ts The result types of the tasks. 66 + @par Thread Safety
  67 + Atomic operations protect exception capture and completion count.
76   */ 68   */
77 - template<typename... Ts> 69 + struct when_all_core
78 - struct when_all_state  
79 - static constexpr std::size_t task_count = sizeof...(Ts);  
80 -  
81 - // Completion tracking - when_all waits for all children  
82   { 70   {
83   std::atomic<std::size_t> remaining_count_; 71   std::atomic<std::size_t> remaining_count_;
84 - // Result storage in input order  
85 - std::tuple<result_holder<Ts>...> results_;  
86 -  
87 - // Runner handles - destroyed in await_resume while allocator is valid  
88 - std::array<std::coroutine_handle<>, task_count> runner_handles_{};  
89 -  
90   72  
91   // Exception storage - first error wins, others discarded 73   // Exception storage - first error wins, others discarded
92   std::atomic<bool> has_exception_{false}; 74   std::atomic<bool> has_exception_{false};
93   std::exception_ptr first_exception_; 75   std::exception_ptr first_exception_;
94 - // Stop propagation - on error, request stop for siblings  
95   76  
96   std::stop_source stop_source_; 77   std::stop_source stop_source_;
97   78  
98 - // Connects parent's stop_token to our stop_source 79 + // Bridges parent's stop token to our stop_source
99   struct stop_callback_fn 80   struct stop_callback_fn
100   { 81   {
101   std::stop_source* source_; 82   std::stop_source* source_;
HITCBC 102   4 void operator()() const { source_->request_stop(); } 83   1 void operator()() const { source_->request_stop(); }
103   }; 84   };
104   using stop_callback_t = std::stop_callback<stop_callback_fn>; 85   using stop_callback_t = std::stop_callback<stop_callback_fn>;
105   std::optional<stop_callback_t> parent_stop_callback_; 86   std::optional<stop_callback_t> parent_stop_callback_;
106   87  
107 - // Parent resumption 88 + continuation continuation_;
108 - std::coroutine_handle<> continuation_;  
109   io_env const* caller_env_ = nullptr; 89   io_env const* caller_env_ = nullptr;
110   90  
HITCBC 111 - 59 when_all_state() 91 + 72 explicit when_all_core(std::size_t count) noexcept
HITCBC 112 - 59 : remaining_count_(task_count) 92 + 72 : remaining_count_(count)
113   { 93   {
HITCBC 114   59 } 94   72 }
115   95  
116 - // Runners self-destruct in final_suspend. No destruction needed here. 96 + /** Capture an exception (first one wins). */
117 -  
118 - /** Capture an exception (first one wins).  
119 - */  
HITCBC 120   20 void capture_exception(std::exception_ptr ep) 97   19 void capture_exception(std::exception_ptr ep)
121   { 98   {
HITCBC 122   20 bool expected = false; 99   19 bool expected = false;
HITCBC 123   20 if(has_exception_.compare_exchange_strong( 100   19 if(has_exception_.compare_exchange_strong(
124   expected, true, std::memory_order_relaxed)) 101   expected, true, std::memory_order_relaxed))
HITCBC 125   17 first_exception_ = ep; 102   17 first_exception_ = ep;
HITCBC 126   20 } 103   19 }
  104 + };
  105 +
  106 + /** Shared state for heterogeneous when_all (variadic overload).
  107 +
  108 + @tparam Ts The result types of the tasks.
  109 + */
  110 + template<typename... Ts>
  111 + struct when_all_state
  112 + {
  113 + static constexpr std::size_t task_count = sizeof...(Ts);
127   114  
  115 + when_all_core core_;
  116 + std::tuple<result_holder<Ts>...> results_;
  117 + std::array<continuation, task_count> runner_handles_{};
  118 +
  119 + std::atomic<bool> has_error_{false};
  120 + std::error_code first_error_;
  121 +
HITGNC   122 + 50 when_all_state()
HITGNC   123 + 50 : core_(task_count)
  124 + {
HITGNC   125 + 50 }
  126 +
  127 + /** Record the first error (subsequent errors are discarded). */
HITGNC   128 + 43 void record_error(std::error_code ec)
  129 + {
HITGNC   130 + 43 bool expected = false;
HITGNC   131 + 43 if(has_error_.compare_exchange_strong(
  132 + expected, true, std::memory_order_relaxed))
HITGNC   133 + 29 first_error_ = ec;
HITGNC   134 + 43 }
128   }; 135   };
129   136  
130 - /** Wrapper coroutine that intercepts task completion. 137 + /** Shared state for homogeneous when_all (range overload).
131   138  
132 - This runner awaits its assigned task and stores the result in 139 + Stores extracted io_result payloads in a vector indexed by task
133 - the shared state, or captures the exception and requests stop. 140 + position. Tracks the first error_code for error propagation.
  141 +
  142 + @tparam T The payload type extracted from io_result.
134   */ 143   */
135 - template<typename T, typename... Ts> 144 + template<typename T>
136 - struct when_all_runner 145 + struct when_all_homogeneous_state
137   { 146   {
138 - struct promise_type // : frame_allocating_base // DISABLED FOR TESTING 147 + when_all_core core_;
  148 + std::vector<std::optional<T>> results_;
  149 + std::unique_ptr<continuation[]> runner_handles_;
  150 +
  151 + std::atomic<bool> has_error_{false};
  152 + std::error_code first_error_;
  153 +
HITGNC   154 + 11 explicit when_all_homogeneous_state(std::size_t count)
HITGNC   155 + 11 : core_(count)
HITGNC   156 + 22 , results_(count)
HITGNC   157 + 11 , runner_handles_(std::make_unique<continuation[]>(count))
139   { 158   {
HITGIC 140 - when_all_state<Ts...>* state_ = nullptr; 159 + 11 }
  160 +
HITGNC   161 + 16 void set_result(std::size_t index, T value)
  162 + {
HITGNC   163 + 16 results_[index].emplace(std::move(value));
HITGNC   164 + 16 }
  165 +
  166 + /** Record the first error (subsequent errors are discarded). */
HITGNC   167 + 7 void record_error(std::error_code ec)
  168 + {
HITGNC   169 + 7 bool expected = false;
HITGNC   170 + 7 if(has_error_.compare_exchange_strong(
  171 + expected, true, std::memory_order_relaxed))
HITGNC   172 + 5 first_error_ = ec;
HITGNC   173 + 7 }
  174 + };
  175 +
  176 + /** Specialization for void io_result children (no payload storage). */
  177 + template<>
  178 + struct when_all_homogeneous_state<std::tuple<>>
  179 + {
  180 + when_all_core core_;
  181 + std::unique_ptr<continuation[]> runner_handles_;
  182 +
  183 + std::atomic<bool> has_error_{false};
  184 + std::error_code first_error_;
  185 +
HITGNC   186 + 3 explicit when_all_homogeneous_state(std::size_t count)
HITGNC   187 + 3 : core_(count)
HITGNC   188 + 3 , runner_handles_(std::make_unique<continuation[]>(count))
  189 + {
HITGNC   190 + 3 }
  191 +
  192 + /** Record the first error (subsequent errors are discarded). */
HITGNC   193 + 1 void record_error(std::error_code ec)
  194 + {
HITGNC   195 + 1 bool expected = false;
HITGNC   196 + 1 if(has_error_.compare_exchange_strong(
  197 + expected, true, std::memory_order_relaxed))
HITGNC   198 + 1 first_error_ = ec;
HITGNC   199 + 1 }
  200 + };
  201 +
  202 + /** Wrapper coroutine that intercepts task completion for when_all.
  203 +
  204 + Parameterized on StateType to work with both heterogeneous (variadic)
  205 + and homogeneous (range) state types. All state types expose their
  206 + shared members through a `core_` member of type when_all_core.
  207 +
  208 + @tparam StateType The state type (when_all_state or when_all_homogeneous_state).
  209 + */
  210 + template<typename StateType>
  211 + struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE when_all_runner
  212 + {
  213 + struct promise_type
  214 + : frame_alloc_mixin
  215 + {
  216 + StateType* state_ = nullptr;
  217 + std::size_t index_ = 0;
141   io_env env_; 218   io_env env_;
142   219  
HITCBC 143 - 130 when_all_runner get_return_object() 220 + 145 when_all_runner get_return_object() noexcept
144   { 221   {
ECB 145 - 130 return when_all_runner(std::coroutine_handle<promise_type>::from_promise(*this)); 222 + return when_all_runner(
HITGNC   223 + 145 std::coroutine_handle<promise_type>::from_promise(*this));
146   } 224   }
147   225  
HITCBC 148   130 std::suspend_always initial_suspend() noexcept 226   145 std::suspend_always initial_suspend() noexcept
149   { 227   {
HITCBC 150   130 return {}; 228   145 return {};
151   } 229   }
152   230  
HITCBC 153   130 auto final_suspend() noexcept 231   145 auto final_suspend() noexcept
154   { 232   {
155   struct awaiter 233   struct awaiter
156   { 234   {
157   promise_type* p_; 235   promise_type* p_;
HITGIC 158 - 236 + 145 bool await_ready() const noexcept { return false; }
HITCBC 159 - 130 bool await_ready() const noexcept 237 + 145 auto await_suspend(std::coroutine_handle<> h) noexcept
160 - {  
DCB 161 - 130 return false;  
162 - }  
163 -  
DCB 164 - 130 std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept  
165   { 238   {
HITGIC 166 - // Extract everything needed before self-destruction. 239 + 145 auto& core = p_->state_->core_;
HITCBC 167 - 130 auto* state = p_->state_; 240 + 145 auto* counter = &core.remaining_count_;
HITCBC 168 - 130 auto* counter = &state->remaining_count_; 241 + 145 auto* caller_env = core.caller_env_;
HITCBC 169 - 130 auto* caller_env = state->caller_env_; 242 + 145 auto& cont = core.continuation_;
DCB 170 - 130 auto cont = state->continuation_;  
171   243  
HITCBC 172   130 h.destroy(); 244   145 h.destroy();
173 - // If last runner, dispatch parent for symmetric transfer.  
174   245  
HITCBC 175   130 auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel); 246   145 auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
HITCBC 176   130 if(remaining == 1) 247   145 if(remaining == 1)
HITCBC 177 - 59 return caller_env->executor.dispatch(cont); 248 + 72 return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
HITCBC 178 - 71 return std::noop_coroutine(); 249 + 73 return detail::symmetric_transfer(std::noop_coroutine());
179 - }  
180 -  
DUB 181 - void await_resume() const noexcept  
182 - {  
EUB 183   } 250   }
MISUNC   251 + void await_resume() const noexcept {}
184   }; 252   };
HITCBC 185   130 return awaiter{this}; 253   145 return awaiter{this};
186   } 254   }
187   255  
HITCBC 188 - 110 void return_void() 256 + 126 void return_void() noexcept {}
189 - {  
DCB 190 - 110 }  
191   257  
HITCBC 192 - 20 void unhandled_exception() 258 + 19 void unhandled_exception() noexcept
193   { 259   {
HITCBC 194 - 20 state_->capture_exception(std::current_exception()); 260 + 19 state_->core_.capture_exception(std::current_exception());
HITGIC 195 - // Request stop for sibling tasks 261 + 19 state_->core_.stop_source_.request_stop();
DCB 196 - 20 state_->stop_source_.request_stop();  
HITCBC 197   20 } 262   19 }
198   263  
199   template<class Awaitable> 264   template<class Awaitable>
200   struct transform_awaiter 265   struct transform_awaiter
201   { 266   {
202   std::decay_t<Awaitable> a_; 267   std::decay_t<Awaitable> a_;
203   promise_type* p_; 268   promise_type* p_;
204   269  
HITCBC 205 - 130 bool await_ready() 270 + 145 bool await_ready() { return a_.await_ready(); }
HITGIC 206 - { 271 + 145 decltype(auto) await_resume() { return a_.await_resume(); }
DCB 207 - 130 return a_.await_ready();  
208 - }  
209 -  
DCB 210 - 130 decltype(auto) await_resume()  
211 - {  
DCB 212 - 130 return a_.await_resume();  
213 - }  
214   272  
215   template<class Promise> 273   template<class Promise>
HITCBC 216   129 auto await_suspend(std::coroutine_handle<Promise> h) 274   144 auto await_suspend(std::coroutine_handle<Promise> h)
217   { 275   {
ECB 218 - 129 return a_.await_suspend(h, &p_->env_); 276 + using R = decltype(a_.await_suspend(h, &p_->env_));
  277 + if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
HITGNC   278 + 144 return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
  279 + else
  280 + return a_.await_suspend(h, &p_->env_);
219   } 281   }
220   }; 282   };
221   283  
222   template<class Awaitable> 284   template<class Awaitable>
HITCBC 223   130 auto await_transform(Awaitable&& a) 285   145 auto await_transform(Awaitable&& a)
224   { 286   {
225   using A = std::decay_t<Awaitable>; 287   using A = std::decay_t<Awaitable>;
226   if constexpr (IoAwaitable<A>) 288   if constexpr (IoAwaitable<A>)
227   { 289   {
228   return transform_awaiter<Awaitable>{ 290   return transform_awaiter<Awaitable>{
HITCBC 229   260 std::forward<Awaitable>(a), this}; 291   290 std::forward<Awaitable>(a), this};
230   } 292   }
231   else 293   else
232   { 294   {
233   static_assert(sizeof(A) == 0, "requires IoAwaitable"); 295   static_assert(sizeof(A) == 0, "requires IoAwaitable");
234   } 296   }
HITCBC 235   130 } 297   145 }
236   }; 298   };
237   299  
238   std::coroutine_handle<promise_type> h_; 300   std::coroutine_handle<promise_type> h_;
239   301  
HITCBC 240 - 130 explicit when_all_runner(std::coroutine_handle<promise_type> h) 302 + 145 explicit when_all_runner(std::coroutine_handle<promise_type> h) noexcept
HITCBC 241   130 : h_(h) 303   145 : h_(h)
242   { 304   {
HITCBC 243   130 } 305   145 }
244   306  
245   // Enable move for all clang versions - some versions need it 307   // Enable move for all clang versions - some versions need it
246 - when_all_runner(when_all_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {} 308 + when_all_runner(when_all_runner&& other) noexcept
  309 + : h_(std::exchange(other.h_, nullptr))
  310 + {
  311 + }
247 - // Non-copyable  
248   312  
249   when_all_runner(when_all_runner const&) = delete; 313   when_all_runner(when_all_runner const&) = delete;
250   when_all_runner& operator=(when_all_runner const&) = delete; 314   when_all_runner& operator=(when_all_runner const&) = delete;
251   when_all_runner& operator=(when_all_runner&&) = delete; 315   when_all_runner& operator=(when_all_runner&&) = delete;
252   316  
HITCBC 253   130 auto release() noexcept 317   145 auto release() noexcept
254   { 318   {
HITCBC 255   130 return std::exchange(h_, nullptr); 319   145 return std::exchange(h_, nullptr);
256   } 320   }
257   }; 321   };
258   322  
259 - /** Create a runner coroutine for a single awaitable. 323 + /** Create an io_result-aware runner for a single awaitable (range path).
260   324  
261 - Awaitable is passed directly to ensure proper coroutine frame storage. 325 + Checks the error code, records errors and requests stop on failure,
  326 + or extracts the payload on success.
262   */ 327   */
263 - template<std::size_t Index, IoAwaitable Awaitable, typename... Ts> 328 + template<IoAwaitable Awaitable, typename StateType>
264 - when_all_runner<awaitable_result_t<Awaitable>, Ts...> 329 + when_all_runner<StateType>
HITCBC 265 - 130 make_when_all_runner(Awaitable inner, when_all_state<Ts...>* state) 330 + 32 make_when_all_homogeneous_runner(Awaitable inner, StateType* state, std::size_t index)
266   { 331   {
267 - using T = awaitable_result_t<Awaitable>; 332 + auto result = co_await std::move(inner);
268 - if constexpr (std::is_void_v<T>) 333 +
  334 + if(result.ec)
269   { 335   {
270 - co_await std::move(inner); 336 + state->record_error(result.ec);
  337 + state->core_.stop_source_.request_stop();
271   } 338   }
272   else 339   else
273   { 340   {
274 - std::get<Index>(state->results_).set(co_await std::move(inner)); 341 + using PayloadT = io_result_payload_t<
  342 + awaitable_result_t<Awaitable>>;
  343 + if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
  344 + {
  345 + state->set_result(index,
  346 + extract_io_payload(std::move(result)));
  347 + }
275   } 348   }
HITCBC 276   260 } 349   64 }
277   350  
278 - /** Internal awaitable that launches all runner coroutines and waits. 351 + /** Create a runner for io_result children that requests stop on ec. */
  352 + template<std::size_t Index, IoAwaitable Awaitable, typename... Ts>
  353 + when_all_runner<when_all_state<Ts...>>
HITGNC   354 + 97 make_when_all_io_runner(Awaitable inner, when_all_state<Ts...>* state)
  355 + {
  356 + auto result = co_await std::move(inner);
  357 + auto ec = result.ec;
  358 + std::get<Index>(state->results_).set(std::move(result));
279   359  
280 - This awaitable is used inside the when_all coroutine to handle 360 + if(ec)
281 - the concurrent execution of child awaitables. 361 + {
282 - */ 362 + state->record_error(ec);
  363 + state->core_.stop_source_.request_stop();
  364 + }
HITGNC   365 + 194 }
  366 +
  367 + /** Launcher that uses io_result-aware runners. */
283   template<IoAwaitable... Awaitables> 368   template<IoAwaitable... Awaitables>
284 - class when_all_launcher 369 + class when_all_io_launcher
285   { 370   {
286   using state_type = when_all_state<awaitable_result_t<Awaitables>...>; 371   using state_type = when_all_state<awaitable_result_t<Awaitables>...>;
287   372  
288   std::tuple<Awaitables...>* awaitables_; 373   std::tuple<Awaitables...>* awaitables_;
289   state_type* state_; 374   state_type* state_;
290   375  
291   public: 376   public:
HITCBC 292 - 59 when_all_launcher( 377 + 50 when_all_io_launcher(
293   std::tuple<Awaitables...>* awaitables, 378   std::tuple<Awaitables...>* awaitables,
294   state_type* state) 379   state_type* state)
HITCBC 295   59 : awaitables_(awaitables) 380   50 : awaitables_(awaitables)
HITCBC 296   59 , state_(state) 381   50 , state_(state)
297   { 382   {
HITCBC 298   59 } 383   50 }
299   384  
HITCBC 300   59 bool await_ready() const noexcept 385   50 bool await_ready() const noexcept
301   { 386   {
HITCBC 302   59 return sizeof...(Awaitables) == 0; 387   50 return sizeof...(Awaitables) == 0;
303   } 388   }
304   389  
HITCBC 305 - 59 std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env) 390 + 50 std::coroutine_handle<> await_suspend(
  391 + std::coroutine_handle<> continuation, io_env const* caller_env)
306   { 392   {
HITCBC 307 - 59 state_->continuation_ = continuation; 393 + 50 state_->core_.continuation_.h = continuation;
HITCBC 308 - 59 state_->caller_env_ = caller_env; 394 + 50 state_->core_.caller_env_ = caller_env;
309 - // Forward parent's stop requests to children  
310   395  
HITCBC 311   59 if(caller_env->stop_token.stop_possible()) 396   50 if(caller_env->stop_token.stop_possible())
312   { 397   {
HITCBC 313 - 16 state_->parent_stop_callback_.emplace( 398 + 2 state_->core_.parent_stop_callback_.emplace(
HITCBC 314   8 caller_env->stop_token, 399   1 caller_env->stop_token,
HITCBC 315 - 8 typename state_type::stop_callback_fn{&state_->stop_source_}); 400 + 1 when_all_core::stop_callback_fn{&state_->core_.stop_source_});
316   401  
HITCBC 317   8 if(caller_env->stop_token.stop_requested()) 402   1 if(caller_env->stop_token.stop_requested())
MISLBC 318 - 4 state_->stop_source_.request_stop(); 403 + state_->core_.stop_source_.request_stop();
319   } 404   }
320   405  
HITGIC 321 - // CRITICAL: If the last task finishes synchronously then the parent 406 + 50 auto token = state_->core_.stop_source_.get_token();
322 - // coroutine resumes, destroying its frame, and destroying this object  
323 - // prior to the completion of await_suspend. Therefore, await_suspend  
324 - // must ensure `this` cannot be referenced after calling `launch_one`  
325 - // for the last time.  
DCB 326 - 59 auto token = state_->stop_source_.get_token();  
HITCBC 327   58 [&]<std::size_t... Is>(std::index_sequence<Is...>) { 407   46 [&]<std::size_t... Is>(std::index_sequence<Is...>) {
HITCBC 328   59 (..., launch_one<Is>(caller_env->executor, token)); 408   50 (..., launch_one<Is>(caller_env->executor, token));
HITCBC 329   59 }(std::index_sequence_for<Awaitables...>{}); 409   50 }(std::index_sequence_for<Awaitables...>{});
330 - // Let signal_completion() handle resumption  
331   410  
HITCBC 332   118 return std::noop_coroutine(); 411   100 return std::noop_coroutine();
HITCBC 333   59 } 412   50 }
334   413  
HITCBC 335 - 59 void await_resume() const noexcept 414 + 50 void await_resume() const noexcept {}
336 - {  
337 - // Results are extracted by the when_all coroutine from state  
DCB 338 - 59 }  
339   415  
340   private: 416   private:
341   template<std::size_t I> 417   template<std::size_t I>
HITCBC 342   130 void launch_one(executor_ref caller_ex, std::stop_token token) 418   97 void launch_one(executor_ref caller_ex, std::stop_token token)
343   { 419   {
HITCBC 344 - 130 auto runner = make_when_all_runner<I>( 420 + 97 auto runner = make_when_all_io_runner<I>(
HITCBC 345   130 std::move(std::get<I>(*awaitables_)), state_); 421   97 std::move(std::get<I>(*awaitables_)), state_);
346   422  
HITCBC 347   130 auto h = runner.release(); 423   97 auto h = runner.release();
HITCBC 348   130 h.promise().state_ = state_; 424   97 h.promise().state_ = state_;
HITCBC 349 - 130 h.promise().env_ = io_env{caller_ex, token, state_->caller_env_->frame_allocator}; 425 + 97 h.promise().env_ = io_env{caller_ex, token,
HITGNC   426 + 97 state_->core_.caller_env_->frame_allocator};
350   427  
HITCBC 351 - 130 std::coroutine_handle<> ch{h}; 428 + 97 state_->runner_handles_[I].h = std::coroutine_handle<>{h};
HITCBC 352 - 130 state_->runner_handles_[I] = ch; 429 + 97 state_->core_.caller_env_->executor.post(state_->runner_handles_[I]);
DCB 353 - 130 state_->caller_env_->executor.post(ch);  
HITCBC 354   260 } 430   194 }
355   }; 431   };
356   432  
357 - /** Compute the result type for when_all. 433 + /** Helper to extract a single result from state.
358 -  
359 - Returns void when all tasks are void (P2300 aligned),  
360 - otherwise returns a tuple with void types filtered out.  
361 - */  
362 - template<typename... Ts>  
363 - using when_all_result_t = std::conditional_t<  
364 - std::is_same_v<filter_void_tuple_t<Ts...>, std::tuple<>>,  
365 - void,  
366 - filter_void_tuple_t<Ts...>>;  
367 -  
368 - /** Helper to extract a single result, returning empty tuple for void.  
369   This is a separate function to work around a GCC-11 ICE that occurs 434   This is a separate function to work around a GCC-11 ICE that occurs
370   when using nested immediately-invoked lambdas with pack expansion. 435   when using nested immediately-invoked lambdas with pack expansion.
371   */ 436   */
372   template<std::size_t I, typename... Ts> 437   template<std::size_t I, typename... Ts>
HITCBC 373   57 auto extract_single_result(when_all_state<Ts...>& state) 438   69 auto extract_single_result(when_all_state<Ts...>& state)
374   { 439   {
HITGIC 375 - using T = std::tuple_element_t<I, std::tuple<Ts...>>; 440 + 69 return std::move(std::get<I>(state.results_)).get();
376 - if constexpr (std::is_void_v<T>)  
DCB 377 - 4 return std::tuple<>();  
378 - else  
DCB 379 - 53 return std::make_tuple(std::move(std::get<I>(state.results_)).get());  
380   } 441   }
381   442  
382 - /** Extract results from state, filtering void types. 443 + /** Extract all results from state as a tuple.
383   */ 444   */
384   template<typename... Ts> 445   template<typename... Ts>
HITCBC 385   24 auto extract_results(when_all_state<Ts...>& state) 446   36 auto extract_results(when_all_state<Ts...>& state)
386   { 447   {
HITCBC 387   43 return [&]<std::size_t... Is>(std::index_sequence<Is...>) { 448   55 return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
HITCBC 388 - 25 return std::tuple_cat(extract_single_result<Is>(state)...); 449 + 36 return std::tuple(extract_single_result<Is>(state)...);
HITCBC 389   48 }(std::index_sequence_for<Ts...>{}); 450   72 }(std::index_sequence_for<Ts...>{});
390   } 451   }
391   452  
  453 + /** Launches all homogeneous runners concurrently.
  454 +
  455 + Two-phase approach: create all runners first, then post all.
  456 + This avoids lifetime issues if a task completes synchronously.
  457 + */
  458 + template<typename Range>
  459 + class when_all_homogeneous_launcher
  460 + {
  461 + using Awaitable = std::ranges::range_value_t<Range>;
  462 + using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
  463 +
  464 + Range* range_;
  465 + when_all_homogeneous_state<PayloadT>* state_;
  466 +
  467 + public:
HITGNC   468 + 14 when_all_homogeneous_launcher(
  469 + Range* range,
  470 + when_all_homogeneous_state<PayloadT>* state)
HITGNC   471 + 14 : range_(range)
HITGNC   472 + 14 , state_(state)
  473 + {
HITGNC   474 + 14 }
  475 +
HITGNC   476 + 14 bool await_ready() const noexcept
  477 + {
HITGNC   478 + 14 return std::ranges::empty(*range_);
  479 + }
  480 +
HITGNC   481 + 14 std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
  482 + {
HITGNC   483 + 14 state_->core_.continuation_.h = continuation;
HITGNC   484 + 14 state_->core_.caller_env_ = caller_env;
  485 +
HITGNC   486 + 14 if(caller_env->stop_token.stop_possible())
  487 + {
HITGNC   488 + 2 state_->core_.parent_stop_callback_.emplace(
HITGNC   489 + 1 caller_env->stop_token,
HITGNC   490 + 1 when_all_core::stop_callback_fn{&state_->core_.stop_source_});
  491 +
HITGNC   492 + 1 if(caller_env->stop_token.stop_requested())
MISUNC   493 + state_->core_.stop_source_.request_stop();
  494 + }
  495 +
HITGNC   496 + 14 auto token = state_->core_.stop_source_.get_token();
  497 +
  498 + // Phase 1: Create all runners without dispatching.
HITGNC   499 + 14 std::size_t index = 0;
HITGNC   500 + 46 for(auto&& a : *range_)
  501 + {
HITGNC   502 + 32 auto runner = make_when_all_homogeneous_runner(
HITGNC   503 + 32 std::move(a), state_, index);
  504 +
HITGNC   505 + 32 auto h = runner.release();
HITGNC   506 + 32 h.promise().state_ = state_;
HITGNC   507 + 32 h.promise().index_ = index;
HITGNC   508 + 32 h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
  509 +
HITGNC   510 + 32 state_->runner_handles_[index].h = std::coroutine_handle<>{h};
HITGNC   511 + 32 ++index;
  512 + }
  513 +
  514 + // Phase 2: Post all runners. Any may complete synchronously.
  515 + // After last post, state_ and this may be destroyed.
HITGNC   516 + 14 auto* handles = state_->runner_handles_.get();
HITGNC   517 + 14 std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed);
HITGNC   518 + 46 for(std::size_t i = 0; i < count; ++i)
HITGNC   519 + 32 caller_env->executor.post(handles[i]);
  520 +
HITGNC   521 + 28 return std::noop_coroutine();
HITGNC   522 + 46 }
  523 +
HITGNC   524 + 14 void await_resume() const noexcept
  525 + {
HITGNC   526 + 14 }
  527 + };
  528 +
392   } // namespace detail 529   } // namespace detail
393   530  
394 - /** Execute multiple awaitables concurrently and collect their results. 531 + /** Execute a range of io_result-returning awaitables concurrently.
395   532  
396 - Launches all awaitables simultaneously and waits for all to complete 533 + Launches all awaitables simultaneously and waits for all to complete.
397 - before returning. Results are collected in input order. If any 534 + On success, extracted payloads are collected in a vector preserving
398 - awaitable throws, cancellation is requested for siblings and the first 535 + input order. The first error_code cancels siblings and is propagated
399 - exception is rethrown after all awaitables complete. 536 + in the outer io_result. Exceptions always beat error codes.
400   537  
401   @li All child awaitables run concurrently on the caller's executor 538   @li All child awaitables run concurrently on the caller's executor
402 - @li Results are returned as a tuple in input order 539 + @li Payloads are returned as a vector in input order
403 - @li Void-returning awaitables do not contribute to the result tuple 540 + @li First error_code wins and cancels siblings
404 - @li If all awaitables return void, `when_all` returns `task<void>` 541 + @li Exception always beats error_code
405 - @li First exception wins; subsequent exceptions are discarded  
406 - @li Stop is requested for siblings on first error  
407   @li Completes only after all children have finished 542   @li Completes only after all children have finished
408   543  
409   @par Thread Safety 544   @par Thread Safety
410   The returned task must be awaited from a single execution context. 545   The returned task must be awaited from a single execution context.
411   Child awaitables execute concurrently but complete through the caller's 546   Child awaitables execute concurrently but complete through the caller's
412   executor. 547   executor.
413   548  
414 - @param awaitables The awaitables to execute concurrently. Each must 549 + @param awaitables Range of io_result-returning awaitables to execute
415 - satisfy @ref IoAwaitable and is consumed (moved-from) when 550 + concurrently (must not be empty).
416 - `when_all` is awaited.  
417   551  
418 - @return A task yielding a tuple of non-void results. Returns 552 + @return A task yielding io_result<vector<PayloadT>> where PayloadT
419 - `task<void>` when all input awaitables return void. 553 + is the payload extracted from each child's io_result.
  554 +
  555 + @throws std::invalid_argument if range is empty (thrown before
  556 + coroutine suspends).
  557 + @throws Rethrows the first child exception after all children
  558 + complete (exception beats error_code).
420   559  
421   @par Example 560   @par Example
  561 + @code
  562 + task<void> example()
  563 + {
  564 + std::vector<io_task<size_t>> reads;
  565 + for (auto& buf : buffers)
  566 + reads.push_back(stream.read_some(buf));
  567 +
  568 + auto [ec, counts] = co_await when_all(std::move(reads));
  569 + if (ec) { // handle error
  570 + }
  571 + }
  572 + @endcode
422   573  
  574 + @see IoAwaitableRange, when_all
  575 + */
  576 + template<IoAwaitableRange R>
  577 + requires detail::is_io_result_v<
  578 + awaitable_result_t<std::ranges::range_value_t<R>>>
  579 + && (!std::is_same_v<
  580 + detail::io_result_payload_t<
  581 + awaitable_result_t<std::ranges::range_value_t<R>>>,
  582 + std::tuple<>>)
HITGNC   583 + 12 [[nodiscard]] auto when_all(R&& awaitables)
  584 + -> task<io_result<std::vector<
  585 + detail::io_result_payload_t<
  586 + awaitable_result_t<std::ranges::range_value_t<R>>>>>>
  587 + {
  588 + using Awaitable = std::ranges::range_value_t<R>;
  589 + using PayloadT = detail::io_result_payload_t<
  590 + awaitable_result_t<Awaitable>>;
  591 + using OwnedRange = std::remove_cvref_t<R>;
  592 +
  593 + auto count = std::ranges::size(awaitables);
  594 + if(count == 0)
  595 + throw std::invalid_argument("when_all requires at least one awaitable");
  596 +
  597 + OwnedRange owned_awaitables = std::forward<R>(awaitables);
  598 +
  599 + detail::when_all_homogeneous_state<PayloadT> state(count);
  600 +
  601 + co_await detail::when_all_homogeneous_launcher<OwnedRange>(
  602 + &owned_awaitables, &state);
  603 +
  604 + if(state.core_.first_exception_)
  605 + std::rethrow_exception(state.core_.first_exception_);
  606 +
  607 + if(state.has_error_.load(std::memory_order_relaxed))
  608 + co_return io_result<std::vector<PayloadT>>{state.first_error_, {}};
  609 +
  610 + std::vector<PayloadT> results;
  611 + results.reserve(count);
  612 + for(auto& opt : state.results_)
  613 + results.push_back(std::move(*opt));
  614 +
  615 + co_return io_result<std::vector<PayloadT>>{{}, std::move(results)};
HITGNC   616 + 24 }
  617 +
  618 + /** Execute a range of void io_result-returning awaitables concurrently.
  619 +
  620 + Launches all awaitables simultaneously and waits for all to complete.
  621 + Since all awaitables return io_result<>, no payload values are
  622 + collected. The first error_code cancels siblings and is propagated.
  623 + Exceptions always beat error codes.
  624 +
  625 + @param awaitables Range of io_result<>-returning awaitables to
  626 + execute concurrently (must not be empty).
  627 +
  628 + @return A task yielding io_result<> whose ec is the first child
  629 + error, or default-constructed on success.
  630 +
  631 + @throws std::invalid_argument if range is empty.
  632 + @throws Rethrows the first child exception after all children
  633 + complete (exception beats error_code).
  634 +
  635 + @par Example
423   @code 636   @code
424 - task<> example() 637 + task<void> example()
425   { 638   {
426 - // Concurrent fetch, results collected in order 639 + std::vector<io_task<>> jobs;
427 - auto [user, posts] = co_await when_all( 640 + for (int i = 0; i < n; ++i)
428 - fetch_user( id ), // task<User> 641 + jobs.push_back(process(i));
429 - fetch_posts( id ) // task<std::vector<Post>>  
430 - );  
431   642  
432 - // Void awaitables don't contribute to result 643 + auto [ec] = co_await when_all(std::move(jobs));
433 - co_await when_all(  
434 - log_event( "start" ), // task<void>  
435 - notify_user( id ) // task<void>  
436 - );  
437 - // Returns task<void>, no result tuple  
438   } 644   }
439   @endcode 645   @endcode
440   646  
441 - @see IoAwaitable, task 647 + @see IoAwaitableRange, when_all
  648 + */
  649 + template<IoAwaitableRange R>
  650 + requires detail::is_io_result_v<
  651 + awaitable_result_t<std::ranges::range_value_t<R>>>
  652 + && std::is_same_v<
  653 + detail::io_result_payload_t<
  654 + awaitable_result_t<std::ranges::range_value_t<R>>>,
  655 + std::tuple<>>
HITGNC   656 + 4 [[nodiscard]] auto when_all(R&& awaitables) -> task<io_result<>>
  657 + {
  658 + using OwnedRange = std::remove_cvref_t<R>;
  659 +
  660 + auto count = std::ranges::size(awaitables);
  661 + if(count == 0)
  662 + throw std::invalid_argument("when_all requires at least one awaitable");
  663 +
  664 + OwnedRange owned_awaitables = std::forward<R>(awaitables);
  665 +
  666 + detail::when_all_homogeneous_state<std::tuple<>> state(count);
  667 +
  668 + co_await detail::when_all_homogeneous_launcher<OwnedRange>(
  669 + &owned_awaitables, &state);
  670 +
  671 + if(state.core_.first_exception_)
  672 + std::rethrow_exception(state.core_.first_exception_);
  673 +
  674 + if(state.has_error_.load(std::memory_order_relaxed))
  675 + co_return io_result<>{state.first_error_};
  676 +
  677 + co_return io_result<>{};
HITGNC   678 + 8 }
  679 +
  680 + /** Execute io_result-returning awaitables concurrently, inspecting error codes.
  681 +
  682 + Overload selected when all children return io_result<Ts...>.
  683 + The error_code is lifted out of each child into a single outer
  684 + io_result. On success all values are returned; on failure the
  685 + first error_code wins.
  686 +
  687 + @par Exception Safety
  688 + Exception always beats error_code. If any child throws, the
  689 + exception is rethrown regardless of error_code results.
  690 +
  691 + @param awaitables One or more awaitables each returning
  692 + io_result<Ts...>.
  693 +
  694 + @return A task yielding io_result<R1, R2, ..., Rn> where each Ri
  695 + follows the payload flattening rules.
442   */ 696   */
443   template<IoAwaitable... As> 697   template<IoAwaitable... As>
  698 + requires (sizeof...(As) > 0)
  699 + && detail::all_io_result_awaitables<As...>
HITCBC 444   59 [[nodiscard]] auto when_all(As... awaitables) 700   50 [[nodiscard]] auto when_all(As... awaitables)
445 - -> task<detail::when_all_result_t<detail::awaitable_result_t<As>...>> 701 + -> task<io_result<
  702 + detail::io_result_payload_t<awaitable_result_t<As>>...>>
446   { 703   {
447 - using result_type = detail::when_all_result_t<detail::awaitable_result_t<As>...>; 704 + using result_type = io_result<
448 - 705 + detail::io_result_payload_t<awaitable_result_t<As>>...>;
449 - // State is stored in the coroutine frame, using the frame allocator  
450 - detail::when_all_state<detail::awaitable_result_t<As>...> state;  
451   706  
452 - // Store awaitables in the frame 707 + detail::when_all_state<awaitable_result_t<As>...> state;
453   std::tuple<As...> awaitable_tuple(std::move(awaitables)...); 708   std::tuple<As...> awaitable_tuple(std::move(awaitables)...);
454   709  
455 - // Launch all awaitables and wait for completion 710 + co_await detail::when_all_io_launcher<As...>(&awaitable_tuple, &state);
456 - co_await detail::when_all_launcher<As...>(&awaitable_tuple, &state);  
457   711  
458 - // Propagate first exception if any. 712 + // Exception always wins over error_code
459 - // Safe without explicit acquire: capture_exception() is sequenced-before 713 + if(state.core_.first_exception_)
460 - // signal_completion()'s acq_rel fetch_sub, which synchronizes-with the 714 + std::rethrow_exception(state.core_.first_exception_);
461 - // last task's decrement that resumes this coroutine.  
462 - if(state.first_exception_)  
463 - std::rethrow_exception(state.first_exception_);  
464   715  
465 - // Extract and return results 716 + auto r = detail::build_when_all_io_result<result_type>(
466 - if constexpr (std::is_void_v<result_type>) 717 + detail::extract_results(state));
467 - co_return; 718 + if(state.has_error_.load(std::memory_order_relaxed))
468 - else 719 + r.ec = state.first_error_;
469 - co_return detail::extract_results(state); 720 + co_return r;
DCB 470 - 118  
471 - /// Compute the result type of `when_all` for the given task types.  
472 - template<typename... Ts>  
473 - using when_all_result_type = detail::when_all_result_t<Ts...>;  
HITGIC 474   } 721   100 }
475   722  
476   } // namespace capy 723   } // namespace capy
477   } // namespace boost 724   } // namespace boost
478   725  
479   #endif 726   #endif