93.94% Lines (155/165) 92.86% Functions (39/42)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2026 Michael Vandeberg 2   // Copyright (c) 2026 Michael Vandeberg
  3 + // Copyright (c) 2026 Steve Gerbino
3   // 4   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 5   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 7   //
7   // Official repository: https://github.com/cppalliance/capy 8   // Official repository: https://github.com/cppalliance/capy
8   // 9   //
9   10  
10   #ifndef BOOST_CAPY_WHEN_ANY_HPP 11   #ifndef BOOST_CAPY_WHEN_ANY_HPP
11   #define BOOST_CAPY_WHEN_ANY_HPP 12   #define BOOST_CAPY_WHEN_ANY_HPP
12   13  
13   #include <boost/capy/detail/config.hpp> 14   #include <boost/capy/detail/config.hpp>
  15 + #include <boost/capy/detail/io_result_combinators.hpp>
  16 + #include <boost/capy/continuation.hpp>
14   #include <boost/capy/concept/executor.hpp> 17   #include <boost/capy/concept/executor.hpp>
15   #include <boost/capy/concept/io_awaitable.hpp> 18   #include <boost/capy/concept/io_awaitable.hpp>
16   #include <coroutine> 19   #include <coroutine>
17   #include <boost/capy/ex/executor_ref.hpp> 20   #include <boost/capy/ex/executor_ref.hpp>
  21 + #include <boost/capy/ex/frame_alloc_mixin.hpp>
18   #include <boost/capy/ex/frame_allocator.hpp> 22   #include <boost/capy/ex/frame_allocator.hpp>
19   #include <boost/capy/ex/io_env.hpp> 23   #include <boost/capy/ex/io_env.hpp>
20   #include <boost/capy/task.hpp> 24   #include <boost/capy/task.hpp>
21   25  
22   #include <array> 26   #include <array>
23   #include <atomic> 27   #include <atomic>
24   #include <exception> 28   #include <exception>
  29 + #include <memory>
  30 + #include <mutex>
25   #include <optional> 31   #include <optional>
26   #include <ranges> 32   #include <ranges>
27   #include <stdexcept> 33   #include <stdexcept>
28   #include <stop_token> 34   #include <stop_token>
29   #include <tuple> 35   #include <tuple>
30   #include <type_traits> 36   #include <type_traits>
31   #include <utility> 37   #include <utility>
32   #include <variant> 38   #include <variant>
33   #include <vector> 39   #include <vector>
34   40  
35   /* 41   /*
36 - when_any - Race multiple tasks, return first completion 42 + when_any - Race multiple io_result tasks, select first success
37 - ======================================================== 43 + =============================================================
38   44  
39   OVERVIEW: 45   OVERVIEW:
40   --------- 46   ---------
41 - when_any launches N tasks concurrently and completes when the FIRST task 47 + when_any launches N io_result-returning tasks concurrently. A task
42 - finishes (success or failure). It then requests stop for all siblings and 48 + wins by returning !ec; errors and exceptions do not win. Once a
43 - waits for them to acknowledge before returning. 49 + winner is found, stop is requested for siblings and the winner's
  50 + payload is returned. If no winner exists (all fail), the first
  51 + error_code is returned or the last exception is rethrown.
44   52  
45   ARCHITECTURE: 53   ARCHITECTURE:
46   ------------- 54   -------------
47   The design mirrors when_all but with inverted completion semantics: 55   The design mirrors when_all but with inverted completion semantics:
48   56  
49   when_all: complete when remaining_count reaches 0 (all done) 57   when_all: complete when remaining_count reaches 0 (all done)
50   when_any: complete when has_winner becomes true (first done) 58   when_any: complete when has_winner becomes true (first done)
51   BUT still wait for remaining_count to reach 0 for cleanup 59   BUT still wait for remaining_count to reach 0 for cleanup
52   60  
53   Key components: 61   Key components:
54 - - when_any_state: Shared state tracking winner and completion 62 + - when_any_core: Shared state tracking winner and completion
55 - - when_any_runner: Wrapper coroutine for each child task 63 + - when_any_io_runner: Wrapper coroutine for each child task
56 - - when_any_launcher: Awaitable that starts all runners concurrently 64 + - when_any_io_launcher/when_any_io_homogeneous_launcher:
  65 + Awaitables that start all runners concurrently
57   66  
58   CRITICAL INVARIANTS: 67   CRITICAL INVARIANTS:
59   -------------------- 68   --------------------
60 - 1. Exactly one task becomes the winner (via atomic compare_exchange) 69 + 1. Only a task returning !ec can become the winner (via atomic CAS)
61   2. All tasks must complete before parent resumes (cleanup safety) 70   2. All tasks must complete before parent resumes (cleanup safety)
62   3. Stop is requested immediately when winner is determined 71   3. Stop is requested immediately when winner is determined
63 - 4. Only the winner's result/exception is stored 72 + 4. Exceptions and errors do not claim winner status
64   73  
65 - TYPE DEDUPLICATION: 74 + POSITIONAL VARIANT:
66   ------------------- 75   -------------------
67 - std::variant requires unique alternative types. Since when_any can race 76 + The variadic overload returns std::variant<error_code, R1, R2, ..., Rn>.
68 - tasks with identical return types (e.g., three task<int>), we must 77 + Index 0 is error_code (failure/no-winner). Index 1..N identifies the
69 - deduplicate types before constructing the variant. 78 + winning child and carries its payload.
70 -  
71 - Example: when_any(task<int>, task<string>, task<int>)  
72 - - Raw types after void->monostate: int, string, int  
73 - - Deduplicated variant: std::variant<int, string>  
74 - - Return: pair<size_t, variant<int, string>>  
75 -  
76 - The winner_index tells you which task won (0, 1, or 2), while the variant  
77 - holds the result. Use the index to determine how to interpret the variant.  
78   79  
79 - VOID HANDLING: 80 + RANGE OVERLOAD:
80 - -------------- 81 + ---------------
81 - void tasks contribute std::monostate to the variant (then deduplicated). 82 + The range overload returns variant<error_code, pair<size_t, T>> for
82 - All-void tasks result in: pair<size_t, variant<monostate>> 83 + non-void children or variant<error_code, size_t> for void children.
83   84  
84   MEMORY MODEL: 85   MEMORY MODEL:
85   ------------- 86   -------------
86   Synchronization chain from winner's write to parent's read: 87   Synchronization chain from winner's write to parent's read:
87   88  
88 - 1. Winner thread writes result_/winner_exception_ (non-atomic) 89 + 1. Winner thread writes result_ (non-atomic)
89 - 2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_ 90 + 2. Winner thread calls signal_completion() -> fetch_sub(acq_rel) on remaining_count_
90   3. Last task thread (may be winner or non-winner) calls signal_completion() 91   3. Last task thread (may be winner or non-winner) calls signal_completion()
91 - → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0 92 + -> fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
92   4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer 93   4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
93 - 5. Parent coroutine resumes and reads result_/winner_exception_ 94 + 5. Parent coroutine resumes and reads result_
94   95  
95   Synchronization analysis: 96   Synchronization analysis:
96   - All fetch_sub operations on remaining_count_ form a release sequence 97   - All fetch_sub operations on remaining_count_ form a release sequence
97   - Winner's fetch_sub releases; subsequent fetch_sub operations participate 98   - Winner's fetch_sub releases; subsequent fetch_sub operations participate
98   in the modification order of remaining_count_ 99   in the modification order of remaining_count_
99   - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the 100   - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
100   modification order, establishing happens-before from winner's writes 101   modification order, establishing happens-before from winner's writes
101   - Executor dispatch() is expected to provide queue-based synchronization 102   - Executor dispatch() is expected to provide queue-based synchronization
102   (release-on-post, acquire-on-execute) completing the chain to parent 103   (release-on-post, acquire-on-execute) completing the chain to parent
103   - Even inline executors work (same thread = sequenced-before) 104   - Even inline executors work (same thread = sequenced-before)
104 - Alternative considered: Adding winner_ready_ atomic (set with release after  
105 - storing winner data, acquired before reading) would make synchronization  
106 - self-contained and not rely on executor implementation details. Current  
107 - approach is correct but requires careful reasoning about release sequences  
108 - and executor behavior.  
109 -  
110   105  
111   EXCEPTION SEMANTICS: 106   EXCEPTION SEMANTICS:
112   -------------------- 107   --------------------
113 - Unlike when_all (which captures first exception, discards others), when_any 108 + Exceptions do NOT claim winner status. If a child throws, the exception
114 - treats exceptions as valid completions. If the winning task threw, that 109 + is recorded but the combinator keeps waiting for a success. Only when
115 - exception is rethrown. Exceptions from non-winners are silently discarded. 110 + all children complete without a winner does the combinator check: if
  111 + any exception was recorded, it is rethrown (exception beats error_code).
116   */ 112   */
117   113  
118   namespace boost { 114   namespace boost {
119   namespace capy { 115   namespace capy {
120   116  
121   namespace detail { 117   namespace detail {
122 - /** Convert void to monostate for variant storage.  
123 -  
124 - std::variant<void, ...> is ill-formed, so void tasks contribute  
125 - std::monostate to the result variant instead. Non-void types  
126 - pass through unchanged.  
127 -  
128 - @tparam T The type to potentially convert (void becomes monostate).  
129 - */  
130 - template<typename T>  
131 - using void_to_monostate_t = std::conditional_t<std::is_void_v<T>, std::monostate, T>;  
132 -  
133 - // Type deduplication: std::variant requires unique alternative types.  
134 - // Fold left over the type list, appending each type only if not already present.  
135 - template<typename Variant, typename T>  
136 - struct variant_append_if_unique;  
137 -  
138 - template<typename... Vs, typename T>  
139 - struct variant_append_if_unique<std::variant<Vs...>, T>  
140 - {  
141 - using type = std::conditional_t<  
142 - (std::is_same_v<T, Vs> || ...),  
143 - std::variant<Vs...>,  
144 - std::variant<Vs..., T>>;  
145 - };  
146 -  
147 - template<typename Accumulated, typename... Remaining>  
148 - struct deduplicate_impl;  
149 -  
150 - template<typename Accumulated>  
151 - struct deduplicate_impl<Accumulated>  
152 - {  
153 - using type = Accumulated;  
154 - };  
155 -  
156 - template<typename Accumulated, typename T, typename... Rest>  
157 - struct deduplicate_impl<Accumulated, T, Rest...>  
158 - {  
159 - using next = typename variant_append_if_unique<Accumulated, T>::type;  
160 - using type = typename deduplicate_impl<next, Rest...>::type;  
161 - };  
162 -  
163 - // Deduplicated variant; void types become monostate before deduplication  
164 - template<typename T0, typename... Ts>  
165 - using unique_variant_t = typename deduplicate_impl<  
166 - std::variant<void_to_monostate_t<T0>>,  
167 - void_to_monostate_t<Ts>...>::type;  
168 -  
169 - // Result: (winner_index, deduplicated_variant). Use index to disambiguate  
170 - // when multiple tasks share the same return type.  
171 - template<typename T0, typename... Ts>  
172 - using when_any_result_t = std::pair<std::size_t, unique_variant_t<T0, Ts...>>;  
173 -  
174   118  
175   /** Core shared state for when_any operations. 119   /** Core shared state for when_any operations.
176   120  
177   Contains all members and methods common to both heterogeneous (variadic) 121   Contains all members and methods common to both heterogeneous (variadic)
178   and homogeneous (range) when_any implementations. State classes embed 122   and homogeneous (range) when_any implementations. State classes embed
179   this via composition to avoid CRTP destructor ordering issues. 123   this via composition to avoid CRTP destructor ordering issues.
180   124  
181   @par Thread Safety 125   @par Thread Safety
182   Atomic operations protect winner selection and completion count. 126   Atomic operations protect winner selection and completion count.
183   */ 127   */
184   struct when_any_core 128   struct when_any_core
185   { 129   {
186   std::atomic<std::size_t> remaining_count_; 130   std::atomic<std::size_t> remaining_count_;
187   std::size_t winner_index_{0}; 131   std::size_t winner_index_{0};
188   std::exception_ptr winner_exception_; 132   std::exception_ptr winner_exception_;
189   std::stop_source stop_source_; 133   std::stop_source stop_source_;
190   134  
191   // Bridges parent's stop token to our stop_source 135   // Bridges parent's stop token to our stop_source
192   struct stop_callback_fn 136   struct stop_callback_fn
193   { 137   {
194   std::stop_source* source_; 138   std::stop_source* source_;
HITCBC 195   9 void operator()() const noexcept { source_->request_stop(); } 139   2 void operator()() const noexcept { source_->request_stop(); }
196   }; 140   };
197   using stop_callback_t = std::stop_callback<stop_callback_fn>; 141   using stop_callback_t = std::stop_callback<stop_callback_fn>;
198   std::optional<stop_callback_t> parent_stop_callback_; 142   std::optional<stop_callback_t> parent_stop_callback_;
199   143  
200 - std::coroutine_handle<> continuation_; 144 + continuation continuation_;
201   io_env const* caller_env_ = nullptr; 145   io_env const* caller_env_ = nullptr;
202   146  
203   // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members) 147   // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
204   std::atomic<bool> has_winner_{false}; 148   std::atomic<bool> has_winner_{false};
205   149  
HITCBC 206   65 explicit when_any_core(std::size_t count) noexcept 150   31 explicit when_any_core(std::size_t count) noexcept
HITCBC 207   65 : remaining_count_(count) 151   31 : remaining_count_(count)
208   { 152   {
HITCBC 209   65 } 153   31 }
210   154  
211   /** Atomically claim winner status; exactly one task succeeds. */ 155   /** Atomically claim winner status; exactly one task succeeds. */
HITCBC 212   190 bool try_win(std::size_t index) noexcept 156   52 bool try_win(std::size_t index) noexcept
213   { 157   {
HITCBC 214   190 bool expected = false; 158   52 bool expected = false;
HITCBC 215   190 if(has_winner_.compare_exchange_strong( 159   52 if(has_winner_.compare_exchange_strong(
216   expected, true, std::memory_order_acq_rel)) 160   expected, true, std::memory_order_acq_rel))
217   { 161   {
HITCBC 218   65 winner_index_ = index; 162   22 winner_index_ = index;
HITCBC 219   65 stop_source_.request_stop(); 163   22 stop_source_.request_stop();
HITCBC 220   65 return true; 164   22 return true;
221   } 165   }
HITCBC 222   125 return false; 166   30 return false;
223   } 167   }
224   168  
225   /** @pre try_win() returned true. */ 169   /** @pre try_win() returned true. */
MISLBC 226   8 void set_winner_exception(std::exception_ptr ep) noexcept 170   void set_winner_exception(std::exception_ptr ep) noexcept
227   { 171   {
MISLBC 228   8 winner_exception_ = ep; 172   winner_exception_ = ep;
MISLBC 229   8 } 173   }
230   174  
231   // Runners signal completion directly via final_suspend; no member function needed. 175   // Runners signal completion directly via final_suspend; no member function needed.
232   }; 176   };
233   177  
234 - /** Shared state for heterogeneous when_any operation. 178 + } // namespace detail
235 -  
236 - Coordinates winner selection, result storage, and completion tracking  
237 - for all child tasks in a when_any operation. Uses composition with  
238 - when_any_core for shared functionality.  
239   179  
240 - @par Lifetime 180 + namespace detail {
241 - Allocated on the parent coroutine's frame, outlives all runners.  
242   181  
243 - @tparam T0 First task's result type. 182 + // State for io_result-aware when_any: only !ec wins.
244 - @tparam Ts Remaining tasks' result types. 183 + template<typename... Ts>
245 - */ 184 + struct when_any_io_state
246 - template<typename T0, typename... Ts>  
247 - struct when_any_state  
248   { 185   {
249 - static constexpr std::size_t task_count = 1 + sizeof...(Ts); 186 + static constexpr std::size_t task_count = sizeof...(Ts);
250 - using variant_type = unique_variant_t<T0, Ts...>; 187 + using variant_type = std::variant<std::error_code, Ts...>;
251   188  
252   when_any_core core_; 189   when_any_core core_;
253   std::optional<variant_type> result_; 190   std::optional<variant_type> result_;
254 - std::array<std::coroutine_handle<>, task_count> runner_handles_{}; 191 + std::array<continuation, task_count> runner_handles_{};
255   192  
ECB 256 - 43 when_any_state() 193 + // Last failure (error or exception) for the all-fail case.
  194 + // Last writer wins — no priority between errors and exceptions.
  195 + std::mutex failure_mu_;
  196 + std::error_code last_error_;
  197 + std::exception_ptr last_exception_;
  198 +
HITGNC   199 + 16 when_any_io_state()
HITCBC 257   43 : core_(task_count) 200   16 : core_(task_count)
258   { 201   {
HITCBC 259   43 } 202   16 }
260   203  
HITGIC 261 - // Runners self-destruct in final_suspend. No destruction needed here. 204 + 12 void record_error(std::error_code ec)
262 -  
263 - /** @pre core_.try_win() returned true.  
264 - @note Uses in_place_type (not index) because variant is deduplicated.  
265 - */  
266 - template<typename T>  
DCB 267 - 35 void set_winner_result(T value)  
268 - noexcept(std::is_nothrow_move_constructible_v<T>)  
269   { 205   {
HITCBC 270 - 35 result_.emplace(std::in_place_type<T>, std::move(value)); 206 + 12 std::lock_guard lk(failure_mu_);
HITGNC   207 + 12 last_error_ = ec;
HITGNC   208 + 12 last_exception_ = nullptr;
HITCBC 271   35 } 209   12 }
272   210  
HITGIC 273 - /** @pre core_.try_win() returned true. */ 211 + 7 void record_exception(std::exception_ptr ep)
DCB 274 - 3 void set_winner_void() noexcept  
275   { 212   {
HITCBC 276 - 3 result_.emplace(std::in_place_type<std::monostate>, std::monostate{}); 213 + 7 std::lock_guard lk(failure_mu_);
HITGNC   214 + 7 last_exception_ = ep;
HITGNC   215 + 7 last_error_ = {};
HITCBC 277   3 } 216   7 }
278   }; 217   };
279   218  
280 - /** Wrapper coroutine that runs a single child task for when_any. 219 + // Wrapper coroutine for io_result-aware when_any children.
281 - 220 + // unhandled_exception records the exception but does NOT claim winner status.
282 - Propagates executor/stop_token to the child, attempts to claim winner  
283 - status on completion, and signals completion for cleanup coordination.  
284 -  
285 - @tparam StateType The state type (when_any_state or when_any_homogeneous_state).  
286 - */  
287   template<typename StateType> 221   template<typename StateType>
288 - struct when_any_runner 222 + struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE when_any_io_runner
289   { 223   {
290 - struct promise_type // : frame_allocating_base // DISABLED FOR TESTING 224 + struct promise_type
  225 + : frame_alloc_mixin
291   { 226   {
292   StateType* state_ = nullptr; 227   StateType* state_ = nullptr;
293   std::size_t index_ = 0; 228   std::size_t index_ = 0;
294   io_env env_; 229   io_env env_;
295   230  
HITCBC 296 - 190 when_any_runner get_return_object() noexcept 231 + 82 when_any_io_runner get_return_object() noexcept
297   { 232   {
ECB 298 - 190 return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this)); 233 + return when_any_io_runner(
HITGNC   234 + 82 std::coroutine_handle<promise_type>::from_promise(*this));
299   } 235   }
300   236  
HITGIC 301 - // Starts suspended; launcher sets up state/ex/token then resumes 237 + 82 std::suspend_always initial_suspend() noexcept { return {}; }
DCB 302 - 190 std::suspend_always initial_suspend() noexcept  
303 - {  
DCB 304 - 190 return {};  
305 - }  
306   238  
HITCBC 307   190 auto final_suspend() noexcept 239   82 auto final_suspend() noexcept
308   { 240   {
309   struct awaiter 241   struct awaiter
310   { 242   {
311   promise_type* p_; 243   promise_type* p_;
HITCBC 312   190 bool await_ready() const noexcept { return false; } 244   82 bool await_ready() const noexcept { return false; }
HITCBC 313 - 190 std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept 245 + 82 auto await_suspend(std::coroutine_handle<> h) noexcept
314 - // Extract everything needed before self-destruction.  
315   { 246   {
HITCBC 316   190 auto& core = p_->state_->core_; 247   82 auto& core = p_->state_->core_;
HITCBC 317   190 auto* counter = &core.remaining_count_; 248   82 auto* counter = &core.remaining_count_;
HITCBC 318   190 auto* caller_env = core.caller_env_; 249   82 auto* caller_env = core.caller_env_;
HITCBC 319 - 190 auto cont = core.continuation_; 250 + 82 auto& cont = core.continuation_;
320   251  
HITCBC 321   190 h.destroy(); 252   82 h.destroy();
322 - // If last runner, dispatch parent for symmetric transfer.  
323   253  
HITCBC 324   190 auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel); 254   82 auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
HITCBC 325   190 if(remaining == 1) 255   82 if(remaining == 1)
HITCBC 326 - 65 return caller_env->executor.dispatch(cont); 256 + 31 return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
HITCBC 327 - 125 return std::noop_coroutine(); 257 + 51 return detail::symmetric_transfer(std::noop_coroutine());
328   } 258   }
MISUBC 329   void await_resume() const noexcept {} 259   void await_resume() const noexcept {}
330   }; 260   };
HITCBC 331   190 return awaiter{this}; 261   82 return awaiter{this};
332   } 262   }
333   263  
HITCBC 334   178 void return_void() noexcept {} 264   71 void return_void() noexcept {}
335   265  
336 - // Exceptions are valid completions in when_any (unlike when_all) 266 + // Exceptions do NOT win in io_result when_any
HITCBC 337 - 12 void unhandled_exception() 267 + 11 void unhandled_exception() noexcept
338   { 268   {
HITCBC 339 - 12 if(state_->core_.try_win(index_)) 269 + 11 state_->record_exception(std::current_exception());
DCB 340 - 8 state_->core_.set_winner_exception(std::current_exception());  
HITCBC 341   12 } 270   11 }
342 - /** Injects executor and stop token into child awaitables. */  
343   271  
344   template<class Awaitable> 272   template<class Awaitable>
345   struct transform_awaiter 273   struct transform_awaiter
346   { 274   {
347   std::decay_t<Awaitable> a_; 275   std::decay_t<Awaitable> a_;
348   promise_type* p_; 276   promise_type* p_;
349   277  
HITCBC 350   190 bool await_ready() { return a_.await_ready(); } 278   82 bool await_ready() { return a_.await_ready(); }
HITCBC 351 - 190 auto await_resume() { return a_.await_resume(); } 279 + 82 decltype(auto) await_resume() { return a_.await_resume(); }
352   280  
353   template<class Promise> 281   template<class Promise>
HITCBC 354   185 auto await_suspend(std::coroutine_handle<Promise> h) 282   81 auto await_suspend(std::coroutine_handle<Promise> h)
355   { 283   {
ECB 356 - 185 return a_.await_suspend(h, &p_->env_); 284 + using R = decltype(a_.await_suspend(h, &p_->env_));
  285 + if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
HITGNC   286 + 81 return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
  287 + else
  288 + return a_.await_suspend(h, &p_->env_);
357   } 289   }
358   }; 290   };
359   291  
360   template<class Awaitable> 292   template<class Awaitable>
HITCBC 361   190 auto await_transform(Awaitable&& a) 293   82 auto await_transform(Awaitable&& a)
362   { 294   {
363   using A = std::decay_t<Awaitable>; 295   using A = std::decay_t<Awaitable>;
364   if constexpr (IoAwaitable<A>) 296   if constexpr (IoAwaitable<A>)
365   { 297   {
366   return transform_awaiter<Awaitable>{ 298   return transform_awaiter<Awaitable>{
HITCBC 367   380 std::forward<Awaitable>(a), this}; 299   163 std::forward<Awaitable>(a), this};
368   } 300   }
369   else 301   else
370   { 302   {
371   static_assert(sizeof(A) == 0, "requires IoAwaitable"); 303   static_assert(sizeof(A) == 0, "requires IoAwaitable");
372   } 304   }
HITCBC 373   190 } 305   81 }
374   }; 306   };
375   307  
376   std::coroutine_handle<promise_type> h_; 308   std::coroutine_handle<promise_type> h_;
377   309  
HITCBC 378 - 190 explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept 310 + 82 explicit when_any_io_runner(std::coroutine_handle<promise_type> h) noexcept
HITCBC 379   190 : h_(h) 311   82 : h_(h)
380   { 312   {
HITCBC 381   190 } 313   82 }
382   314  
383 - // Enable move for all clang versions - some versions need it 315 + when_any_io_runner(when_any_io_runner&& other) noexcept
384 - when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {} 316 + : h_(std::exchange(other.h_, nullptr))
  317 + {
  318 + }
385   319  
386 - // Non-copyable 320 + when_any_io_runner(when_any_io_runner const&) = delete;
387 - when_any_runner(when_any_runner const&) = delete; 321 + when_any_io_runner& operator=(when_any_io_runner const&) = delete;
388 - when_any_runner& operator=(when_any_runner const&) = delete; 322 + when_any_io_runner& operator=(when_any_io_runner&&) = delete;
389 - when_any_runner& operator=(when_any_runner&&) = delete;  
390   323  
HITCBC 391   190 auto release() noexcept 324   82 auto release() noexcept
392   { 325   {
HITCBC 393   190 return std::exchange(h_, nullptr); 326   82 return std::exchange(h_, nullptr);
394   } 327   }
395   }; 328   };
396   329  
397 - /** Wraps a child awaitable, attempts to claim winner on completion. 330 + // Runner coroutine: only tries to win when the child returns !ec.
398 - 331 + template<std::size_t I, IoAwaitable Awaitable, typename StateType>
399 - Uses requires-expressions to detect state capabilities: 332 + when_any_io_runner<StateType>
HITGIC 400 - - set_winner_void(): for heterogeneous void tasks (stores monostate) 333 + 30 make_when_any_io_runner(Awaitable inner, StateType* state)
401 - - set_winner_result(): for non-void tasks  
402 - - Neither: for homogeneous void tasks (no result storage)  
403 - */  
404 - template<IoAwaitable Awaitable, typename StateType>  
405 - when_any_runner<StateType>  
DCB 406 - 190 make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)  
407   { 334   {
408 - using T = awaitable_result_t<Awaitable>; 335 + auto result = co_await std::move(inner);
409 - if constexpr (std::is_void_v<T>) 336 +
410 - { 337 + if(!result.ec)
411 - co_await std::move(inner);  
412 - if(state->core_.try_win(index))  
413 - {  
414 - // Heterogeneous void tasks store monostate in the variant  
415 - if constexpr (requires { state->set_winner_void(); })  
416 - state->set_winner_void();  
417 - // Homogeneous void tasks have no result to store  
418 - }  
419 - }  
420 - else  
421   { 338   {
422 - auto result = co_await std::move(inner); 339 + // Success: try to claim winner
423 - if(state->core_.try_win(index)) 340 + if(state->core_.try_win(I))
424 - // Defensive: move should not throw (already moved once), but we  
425 - // catch just in case since an uncaught exception would be devastating.  
426   { 341   {
427   try 342   try
428   { 343   {
429 - state->set_winner_result(std::move(result)); 344 + state->result_.emplace(
  345 + std::in_place_index<I + 1>,
  346 + detail::extract_io_payload(std::move(result)));
430   } 347   }
431   catch(...) 348   catch(...)
432   { 349   {
433   state->core_.set_winner_exception(std::current_exception()); 350   state->core_.set_winner_exception(std::current_exception());
434   } 351   }
435   } 352   }
436   } 353   }
  354 + else
  355 + {
  356 + // Error: record but don't win
  357 + state->record_error(result.ec);
  358 + }
HITCBC 437   380 } 359   60 }
438   360  
439 - /** Launches all runners concurrently; see await_suspend for lifetime concerns. */ 361 + // Launcher for io_result-aware when_any.
440   template<IoAwaitable... Awaitables> 362   template<IoAwaitable... Awaitables>
441 - class when_any_launcher 363 + class when_any_io_launcher
442   { 364   {
443 - using state_type = when_any_state<awaitable_result_t<Awaitables>...>; 365 + using state_type = when_any_io_state<
  366 + io_result_payload_t<awaitable_result_t<Awaitables>>...>;
444   367  
445   std::tuple<Awaitables...>* tasks_; 368   std::tuple<Awaitables...>* tasks_;
446   state_type* state_; 369   state_type* state_;
447   370  
448   public: 371   public:
HITCBC 449 - 43 when_any_launcher( 372 + 16 when_any_io_launcher(
450   std::tuple<Awaitables...>* tasks, 373   std::tuple<Awaitables...>* tasks,
451   state_type* state) 374   state_type* state)
HITCBC 452   43 : tasks_(tasks) 375   16 : tasks_(tasks)
HITCBC 453   43 , state_(state) 376   16 , state_(state)
454   { 377   {
HITCBC 455   43 } 378   16 }
456   379  
HITCBC 457   43 bool await_ready() const noexcept 380   16 bool await_ready() const noexcept
458   { 381   {
HITCBC 459   43 return sizeof...(Awaitables) == 0; 382   16 return sizeof...(Awaitables) == 0;
460   } 383   }
461   384  
HITGIC 462 - /** CRITICAL: If the last task finishes synchronously, parent resumes and 385 + 16 std::coroutine_handle<> await_suspend(
463 - destroys this object before await_suspend returns. Must not reference 386 + std::coroutine_handle<> continuation, io_env const* caller_env)
464 - `this` after the final launch_one call.  
465 - */  
DCB 466 - 43 std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)  
467   { 387   {
HITCBC 468 - 43 state_->core_.continuation_ = continuation; 388 + 16 state_->core_.continuation_.h = continuation;
HITCBC 469   43 state_->core_.caller_env_ = caller_env; 389   16 state_->core_.caller_env_ = caller_env;
470   390  
HITCBC 471   43 if(caller_env->stop_token.stop_possible()) 391   16 if(caller_env->stop_token.stop_possible())
472   { 392   {
HITCBC 473   18 state_->core_.parent_stop_callback_.emplace( 393   2 state_->core_.parent_stop_callback_.emplace(
HITCBC 474   9 caller_env->stop_token, 394   1 caller_env->stop_token,
HITCBC 475   9 when_any_core::stop_callback_fn{&state_->core_.stop_source_}); 395   1 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
476   396  
HITCBC 477   9 if(caller_env->stop_token.stop_requested()) 397   1 if(caller_env->stop_token.stop_requested())
MISLBC 478   3 state_->core_.stop_source_.request_stop(); 398   state_->core_.stop_source_.request_stop();
479   } 399   }
480   400  
HITCBC 481   43 auto token = state_->core_.stop_source_.get_token(); 401   16 auto token = state_->core_.stop_source_.get_token();
HITCBC 482   86 [&]<std::size_t... Is>(std::index_sequence<Is...>) { 402   28 [&]<std::size_t... Is>(std::index_sequence<Is...>) {
HITCBC 483   43 (..., launch_one<Is>(caller_env->executor, token)); 403   16 (..., launch_one<Is>(caller_env->executor, token));
HITCBC 484   43 }(std::index_sequence_for<Awaitables...>{}); 404   16 }(std::index_sequence_for<Awaitables...>{});
485   405  
HITCBC 486   86 return std::noop_coroutine(); 406   32 return std::noop_coroutine();
HITCBC 487   43 } 407   16 }
488   408  
HITCBC 489 - 43 void await_resume() const noexcept 409 + 16 void await_resume() const noexcept {}
490 - {  
DCB 491 - 43 }  
492   410  
493 - /** @pre Ex::dispatch() and std::coroutine_handle<>::resume() must not throw (handle may leak). */  
494   private: 411   private:
495   template<std::size_t I> 412   template<std::size_t I>
HITCBC 496   105 void launch_one(executor_ref caller_ex, std::stop_token token) 413   30 void launch_one(executor_ref caller_ex, std::stop_token token)
497   { 414   {
HITCBC 498 - 105 auto runner = make_when_any_runner( 415 + 30 auto runner = make_when_any_io_runner<I>(
HITCBC 499 - 105 std::move(std::get<I>(*tasks_)), state_, I); 416 + 30 std::move(std::get<I>(*tasks_)), state_);
500   417  
HITCBC 501   105 auto h = runner.release(); 418   30 auto h = runner.release();
HITCBC 502   105 h.promise().state_ = state_; 419   30 h.promise().state_ = state_;
HITCBC 503   105 h.promise().index_ = I; 420   30 h.promise().index_ = I;
HITCBC 504 - 105 h.promise().env_ = io_env{caller_ex, token, state_->core_.caller_env_->frame_allocator}; 421 + 30 h.promise().env_ = io_env{caller_ex, token,
HITGNC   422 + 30 state_->core_.caller_env_->frame_allocator};
505   423  
HITCBC 506 - 105 std::coroutine_handle<> ch{h}; 424 + 30 state_->runner_handles_[I].h = std::coroutine_handle<>{h};
HITCBC 507 - 105 state_->runner_handles_[I] = ch; 425 + 30 caller_ex.post(state_->runner_handles_[I]);
DCB 508 - 105 caller_ex.post(ch);  
HITCBC 509   210 } 426   60 }
510   }; 427   };
511   428  
512 - } // namespace detail 429 + /** Shared state for homogeneous io_result-aware when_any (range overload).
513 -  
514 - /** Wait for the first awaitable to complete.  
515 -  
516 - Races multiple heterogeneous awaitables concurrently and returns when the  
517 - first one completes. The result includes the winner's index and a  
518 - deduplicated variant containing the result value.  
519 -  
520 - @par Suspends  
521 - The calling coroutine suspends when co_await is invoked. All awaitables  
522 - are launched concurrently and execute in parallel. The coroutine resumes  
523 - only after all awaitables have completed, even though the winner is  
524 - determined by the first to finish.  
525 -  
526 - @par Completion Conditions  
527 - @li Winner is determined when the first awaitable completes (success or exception)  
528 - @li Only one task can claim winner status via atomic compare-exchange  
529 - @li Once a winner exists, stop is requested for all remaining siblings  
530 - @li Parent coroutine resumes only after all siblings acknowledge completion  
531 - @li The winner's result is returned; if the winner threw, the exception is rethrown  
532 -  
533 - @par Cancellation Semantics  
534 - Cancellation is supported via stop_token propagated through the  
535 - IoAwaitable protocol:  
536 - @li Each child awaitable receives a stop_token derived from a shared stop_source  
537 - @li When the parent's stop token is activated, the stop is forwarded to all children  
538 - @li When a winner is determined, stop_source_.request_stop() is called immediately  
539 - @li Siblings must handle cancellation gracefully and complete before parent resumes  
540 - @li Stop requests are cooperative; tasks must check and respond to them  
541 -  
542 - @par Concurrency/Overlap  
543 - All awaitables are launched concurrently before any can complete.  
544 - The launcher iterates through the arguments, starting each task on the  
545 - caller's executor. Tasks may execute in parallel on multi-threaded  
546 - executors or interleave on single-threaded executors. There is no  
547 - guaranteed ordering of task completion.  
548 -  
549 - @par Notable Error Conditions  
550 - @li Winner exception: if the winning task threw, that exception is rethrown  
551 - @li Non-winner exceptions: silently discarded (only winner's result matters)  
552 - @li Cancellation: tasks may complete via cancellation without throwing  
553 -  
554 - @par Example  
555 - @code  
556 - task<void> example() {  
557 - auto [index, result] = co_await when_any(  
558 - fetch_from_primary(), // task<Response>  
559 - fetch_from_backup() // task<Response>  
560 - );  
561 - // index is 0 or 1, result holds the winner's Response  
562 - auto response = std::get<Response>(result);  
563 - }  
564 - @endcode  
565 -  
566 - @par Example with Heterogeneous Types  
567 - @code  
568 - task<void> mixed_types() {  
569 - auto [index, result] = co_await when_any(  
570 - fetch_int(), // task<int>  
571 - fetch_string() // task<std::string>  
572 - );  
573 - if (index == 0)  
574 - std::cout << "Got int: " << std::get<int>(result) << "\n";  
575 - else  
576 - std::cout << "Got string: " << std::get<std::string>(result) << "\n";  
577 - }  
578 - @endcode  
579 -  
580 - @tparam A0 First awaitable type (must satisfy IoAwaitable).  
581 - @tparam As Remaining awaitable types (must satisfy IoAwaitable).  
582 - @param a0 The first awaitable to race.  
583 - @param as Additional awaitables to race concurrently.  
584 - @return A task yielding a pair of (winner_index, result_variant).  
585 -  
586 - @throws Rethrows the winner's exception if the winning task threw an exception.  
587 -  
588 - @par Remarks  
589 - Awaitables are moved into the coroutine frame; original objects become  
590 - empty after the call. When multiple awaitables share the same return type,  
591 - the variant is deduplicated to contain only unique types. Use the winner  
592 - index to determine which awaitable completed first. Void awaitables  
593 - contribute std::monostate to the variant.  
594 -  
595 - @see when_all, IoAwaitable  
596 - */  
597 - template<IoAwaitable A0, IoAwaitable... As>  
DCB 598 - 43 [[nodiscard]] auto when_any(A0 a0, As... as)  
599 - -> task<detail::when_any_result_t<  
600 - detail::awaitable_result_t<A0>,  
601 - detail::awaitable_result_t<As>...>>  
602 - {  
603 - using result_type = detail::when_any_result_t<  
604 - detail::awaitable_result_t<A0>,  
605 - detail::awaitable_result_t<As>...>;  
606 -  
607 - detail::when_any_state<  
608 - detail::awaitable_result_t<A0>,  
609 - detail::awaitable_result_t<As>...> state;  
610 - std::tuple<A0, As...> awaitable_tuple(std::move(a0), std::move(as)...);  
611 -  
612 - co_await detail::when_any_launcher<A0, As...>(&awaitable_tuple, &state);  
613 -  
614 - if(state.core_.winner_exception_)  
615 - std::rethrow_exception(state.core_.winner_exception_);  
616 -  
617 - co_return result_type{state.core_.winner_index_, std::move(*state.result_)};  
DCB 618 - 86 }  
619 -  
620 - /** Concept for ranges of full I/O awaitables.  
621 -  
622 - A range satisfies `IoAwaitableRange` if it is a sized input range  
623 - whose value type satisfies @ref IoAwaitable. This enables when_any  
624 - to accept any container or view of awaitables, not just std::vector.  
625 -  
626 - @tparam R The range type.  
627 -  
628 - @par Requirements  
629 - @li `R` must satisfy `std::ranges::input_range`  
630 - @li `R` must satisfy `std::ranges::sized_range`  
631 - @li `std::ranges::range_value_t<R>` must satisfy @ref IoAwaitable  
632 -  
633 - @par Syntactic Requirements  
634 - Given `r` of type `R`:  
635 - @li `std::ranges::begin(r)` is valid  
636 - @li `std::ranges::end(r)` is valid  
637 - @li `std::ranges::size(r)` returns `std::ranges::range_size_t<R>`  
638 - @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable  
639 -  
640 - @par Example  
641 - @code  
642 - template<IoAwaitableRange R>  
643 - task<void> race_all(R&& awaitables) {  
644 - auto winner = co_await when_any(std::forward<R>(awaitables));  
645 - // Process winner...  
646 - }  
647 - @endcode  
648 -  
649 - @see when_any, IoAwaitable  
650 - */  
651 - template<typename R>  
652 - concept IoAwaitableRange =  
653 - std::ranges::input_range<R> &&  
654 - std::ranges::sized_range<R> &&  
655 - IoAwaitable<std::ranges::range_value_t<R>>;  
656 -  
657 - namespace detail {  
658 -  
659 - /** Shared state for homogeneous when_any (range overload).  
660   430  
661 - Uses composition with when_any_core for shared functionality. 431 + @tparam T The payload type extracted from io_result.
662 - Simpler than heterogeneous: optional<T> instead of variant, vector  
663 - instead of array for runner handles.  
664   */ 432   */
665   template<typename T> 433   template<typename T>
666 - struct when_any_homogeneous_state 434 + struct when_any_io_homogeneous_state
667   { 435   {
668   when_any_core core_; 436   when_any_core core_;
669   std::optional<T> result_; 437   std::optional<T> result_;
670 - std::vector<std::coroutine_handle<>> runner_handles_; 438 + std::unique_ptr<continuation[]> runner_handles_;
671   439  
ECB 672 - 19 explicit when_any_homogeneous_state(std::size_t count) 440 + std::mutex failure_mu_;
  441 + std::error_code last_error_;
  442 + std::exception_ptr last_exception_;
  443 +
HITGNC   444 + 13 explicit when_any_io_homogeneous_state(std::size_t count)
HITCBC 673   19 : core_(count) 445   13 : core_(count)
HITCBC 674 - 38 , runner_handles_(count) 446 + 13 , runner_handles_(std::make_unique<continuation[]>(count))
675   { 447   {
HITCBC 676   19 } 448   13 }
677   449  
HITGIC 678 - // Runners self-destruct in final_suspend. No destruction needed here. 450 + 6 void record_error(std::error_code ec)
  451 + {
HITGNC   452 + 6 std::lock_guard lk(failure_mu_);
HITGNC   453 + 6 last_error_ = ec;
HITGNC   454 + 6 last_exception_ = nullptr;
HITGNC   455 + 6 }
679   456  
HITGIC 680 - /** @pre core_.try_win() returned true. */ 457 + 4 void record_exception(std::exception_ptr ep)
DCB 681 - 17 void set_winner_result(T value)  
682 - noexcept(std::is_nothrow_move_constructible_v<T>)  
683   { 458   {
HITCBC 684 - 17 result_.emplace(std::move(value)); 459 + 4 std::lock_guard lk(failure_mu_);
HITGNC   460 + 4 last_exception_ = ep;
HITGNC   461 + 4 last_error_ = {};
HITCBC 685   17 } 462   4 }
686   }; 463   };
687   464  
688 - /** Specialization for void tasks (no result storage needed). */ 465 + /** Specialization for void io_result children (no payload storage). */
689   template<> 466   template<>
690 - struct when_any_homogeneous_state<void> 467 + struct when_any_io_homogeneous_state<std::tuple<>>
691   { 468   {
692   when_any_core core_; 469   when_any_core core_;
693 - std::vector<std::coroutine_handle<>> runner_handles_; 470 + std::unique_ptr<continuation[]> runner_handles_;
694   471  
ECB 695 - 3 explicit when_any_homogeneous_state(std::size_t count) 472 + std::mutex failure_mu_;
  473 + std::error_code last_error_;
  474 + std::exception_ptr last_exception_;
  475 +
HITGNC   476 + 2 explicit when_any_io_homogeneous_state(std::size_t count)
HITCBC 696   3 : core_(count) 477   2 : core_(count)
HITCBC 697 - 6 , runner_handles_(count) 478 + 2 , runner_handles_(std::make_unique<continuation[]>(count))
698   { 479   {
HITCBC 699   3 } 480   2 }
700   481  
HITGIC 701 - // Runners self-destruct in final_suspend. No destruction needed here. 482 + 1 void record_error(std::error_code ec)
  483 + {
HITGNC   484 + 1 std::lock_guard lk(failure_mu_);
HITGNC   485 + 1 last_error_ = ec;
HITGNC   486 + 1 last_exception_ = nullptr;
HITGNC   487 + 1 }
702   488  
MISUIC 703 - // No set_winner_result - void tasks have no result to store 489 + void record_exception(std::exception_ptr ep)
  490 + {
MISUNC   491 + std::lock_guard lk(failure_mu_);
MISUNC   492 + last_exception_ = ep;
MISUNC   493 + last_error_ = {};
MISUNC   494 + }
704   }; 495   };
705   496  
706 - /** Launches all runners concurrently; see await_suspend for lifetime concerns. */ 497 + /** Create an io_result-aware runner for homogeneous when_any (range path).
  498 +
  499 + Only tries to win when the child returns !ec.
  500 + */
  501 + template<IoAwaitable Awaitable, typename StateType>
  502 + when_any_io_runner<StateType>
HITGNC   503 + 52 make_when_any_io_homogeneous_runner(
  504 + Awaitable inner, StateType* state, std::size_t index)
  505 + {
  506 + auto result = co_await std::move(inner);
  507 +
  508 + if(!result.ec)
  509 + {
  510 + if(state->core_.try_win(index))
  511 + {
  512 + using PayloadT = io_result_payload_t<
  513 + awaitable_result_t<Awaitable>>;
  514 + if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
  515 + {
  516 + try
  517 + {
  518 + state->result_.emplace(
  519 + extract_io_payload(std::move(result)));
  520 + }
  521 + catch(...)
  522 + {
  523 + state->core_.set_winner_exception(
  524 + std::current_exception());
  525 + }
  526 + }
  527 + }
  528 + }
  529 + else
  530 + {
  531 + state->record_error(result.ec);
  532 + }
HITGNC   533 + 104 }
  534 +
  535 + /** Launches all io_result-aware homogeneous runners concurrently. */
707   template<IoAwaitableRange Range> 536   template<IoAwaitableRange Range>
708 - class when_any_homogeneous_launcher 537 + class when_any_io_homogeneous_launcher
709   { 538   {
710   using Awaitable = std::ranges::range_value_t<Range>; 539   using Awaitable = std::ranges::range_value_t<Range>;
711 - using T = awaitable_result_t<Awaitable>; 540 + using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
712   541  
713   Range* range_; 542   Range* range_;
714 - when_any_homogeneous_state<T>* state_; 543 + when_any_io_homogeneous_state<PayloadT>* state_;
715   544  
716   public: 545   public:
HITCBC 717 - 22 when_any_homogeneous_launcher( 546 + 15 when_any_io_homogeneous_launcher(
718   Range* range, 547   Range* range,
719 - when_any_homogeneous_state<T>* state) 548 + when_any_io_homogeneous_state<PayloadT>* state)
HITCBC 720   22 : range_(range) 549   15 : range_(range)
HITCBC 721   22 , state_(state) 550   15 , state_(state)
722   { 551   {
HITCBC 723   22 } 552   15 }
724   553  
HITCBC 725   22 bool await_ready() const noexcept 554   15 bool await_ready() const noexcept
726   { 555   {
HITCBC 727   22 return std::ranges::empty(*range_); 556   15 return std::ranges::empty(*range_);
728   } 557   }
729   558  
HITGIC 730 - /** CRITICAL: If the last task finishes synchronously, parent resumes and 559 + 15 std::coroutine_handle<> await_suspend(
731 - destroys this object before await_suspend returns. Must not reference 560 + std::coroutine_handle<> continuation, io_env const* caller_env)
732 - `this` after dispatching begins.  
733 -  
734 - Two-phase approach:  
735 - 1. Create all runners (safe - no dispatch yet)  
736 - 2. Dispatch all runners (any may complete synchronously)  
737 - */  
DCB 738 - 22 std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)  
739   { 561   {
HITCBC 740 - 22 state_->core_.continuation_ = continuation; 562 + 15 state_->core_.continuation_.h = continuation;
HITCBC 741   22 state_->core_.caller_env_ = caller_env; 563   15 state_->core_.caller_env_ = caller_env;
742   564  
HITCBC 743   22 if(caller_env->stop_token.stop_possible()) 565   15 if(caller_env->stop_token.stop_possible())
744   { 566   {
HITCBC 745   14 state_->core_.parent_stop_callback_.emplace( 567   4 state_->core_.parent_stop_callback_.emplace(
HITCBC 746   7 caller_env->stop_token, 568   2 caller_env->stop_token,
HITCBC 747   7 when_any_core::stop_callback_fn{&state_->core_.stop_source_}); 569   2 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
748   570  
HITCBC 749   7 if(caller_env->stop_token.stop_requested()) 571   2 if(caller_env->stop_token.stop_requested())
HITCBC 750   4 state_->core_.stop_source_.request_stop(); 572   1 state_->core_.stop_source_.request_stop();
751   } 573   }
752   574  
HITCBC 753   22 auto token = state_->core_.stop_source_.get_token(); 575   15 auto token = state_->core_.stop_source_.get_token();
754   576  
755 - // This iterates over *range_ safely because no runners execute yet.  
756   // Phase 1: Create all runners without dispatching. 577   // Phase 1: Create all runners without dispatching.
HITCBC 757   22 std::size_t index = 0; 578   15 std::size_t index = 0;
HITCBC 758   107 for(auto&& a : *range_) 579   67 for(auto&& a : *range_)
759   { 580   {
HITCBC 760 - 85 auto runner = make_when_any_runner( 581 + 52 auto runner = make_when_any_io_homogeneous_runner(
HITCBC 761   85 std::move(a), state_, index); 582   52 std::move(a), state_, index);
762   583  
HITCBC 763   85 auto h = runner.release(); 584   52 auto h = runner.release();
HITCBC 764   85 h.promise().state_ = state_; 585   52 h.promise().state_ = state_;
HITCBC 765   85 h.promise().index_ = index; 586   52 h.promise().index_ = index;
HITCBC 766 - 85 h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator}; 587 + 52 h.promise().env_ = io_env{caller_env->executor, token,
HITGNC   588 + 52 caller_env->frame_allocator};
767   589  
HITCBC 768 - 85 state_->runner_handles_[index] = std::coroutine_handle<>{h}; 590 + 52 state_->runner_handles_[index].h = std::coroutine_handle<>{h};
HITCBC 769   85 ++index; 591   52 ++index;
770   } 592   }
771   593  
772   // Phase 2: Post all runners. Any may complete synchronously. 594   // Phase 2: Post all runners. Any may complete synchronously.
HITGIC 773 - // After last post, state_ and this may be destroyed. 595 + 15 auto* handles = state_->runner_handles_.get();
HITGIC 774 - // Use raw pointer/count captured before posting. 596 + 15 std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed);
DCB 775 - 22 std::coroutine_handle<>* handles = state_->runner_handles_.data();  
DCB 776 - 22 std::size_t count = state_->runner_handles_.size();  
HITCBC 777   107 for(std::size_t i = 0; i < count; ++i) 597   67 for(std::size_t i = 0; i < count; ++i)
HITCBC 778   85 caller_env->executor.post(handles[i]); 598   52 caller_env->executor.post(handles[i]);
779   599  
HITCBC 780   44 return std::noop_coroutine(); 600   30 return std::noop_coroutine();
HITCBC 781   107 } 601   67 }
782   602  
HITCBC 783 - 22 void await_resume() const noexcept 603 + 15 void await_resume() const noexcept {}
784 - {  
DCB 785 - 22 }  
786   }; 604   };
787   605  
788   } // namespace detail 606   } // namespace detail
789   607  
790 - /** Wait for the first awaitable to complete (range overload). 608 + /** Race a range of io_result-returning awaitables (non-void payloads).
791 -  
792 - Races a range of awaitables with the same result type. Accepts any  
793 - sized input range of IoAwaitable types, enabling use with arrays,  
794 - spans, or custom containers.  
795 -  
796 - @par Suspends  
797 - The calling coroutine suspends when co_await is invoked. All awaitables  
798 - in the range are launched concurrently and execute in parallel. The  
799 - coroutine resumes only after all awaitables have completed, even though  
800 - the winner is determined by the first to finish.  
801   609  
802 - @par Completion Conditions 610 + Only a child returning !ec can win. Errors and exceptions do not
803 - @li Winner is determined when the first awaitable completes (success or exception) 611 + claim winner status. If all children fail, the last failure
804 - @li Only one task can claim winner status via atomic compare-exchange 612 + is reported — either the last error_code at variant index 0,
805 - @li Once a winner exists, stop is requested for all remaining siblings 613 + or the last exception rethrown.
806 - @li Parent coroutine resumes only after all siblings acknowledge completion  
807 - @li The winner's index and result are returned; if the winner threw, the exception is rethrown  
808   614  
809 - @par Cancellation Semantics 615 + @param awaitables Range of io_result-returning awaitables (must
810 - Cancellation is supported via stop_token propagated through the 616 + not be empty).
811 - IoAwaitable protocol:  
812 - @li Each child awaitable receives a stop_token derived from a shared stop_source  
813 - @li When the parent's stop token is activated, the stop is forwarded to all children  
814 - @li When a winner is determined, stop_source_.request_stop() is called immediately  
815 - @li Siblings must handle cancellation gracefully and complete before parent resumes  
816 - @li Stop requests are cooperative; tasks must check and respond to them  
817   617  
818 - @par Concurrency/Overlap 618 + @return A task yielding variant<error_code, pair<size_t, PayloadT>>
819 - All awaitables are launched concurrently before any can complete. 619 + where index 0 is failure and index 1 carries the winner's
820 - The launcher iterates through the range, starting each task on the 620 + index and payload.
821 - caller's executor. Tasks may execute in parallel on multi-threaded  
822 - executors or interleave on single-threaded executors. There is no  
823 - guaranteed ordering of task completion.  
824   621  
825 - @par Notable Error Conditions 622 + @throws std::invalid_argument if range is empty.
826 - @li Empty range: throws std::invalid_argument immediately (not via co_return) 623 + @throws Rethrows last exception when no winner and the last
827 - @li Winner exception: if the winning task threw, that exception is rethrown 624 + failure was an exception.
828 - @li Non-winner exceptions: silently discarded (only winner's result matters)  
829 - @li Cancellation: tasks may complete via cancellation without throwing  
830   625  
831   @par Example 626   @par Example
832   @code 627   @code
833 - task<void> example() { 628 + task<void> example()
834 - std::array<task<Response>, 3> requests = { 629 + {
835 - fetch_from_server(0), 630 + std::vector<io_task<size_t>> reads;
836 - fetch_from_server(1), 631 + for (auto& buf : buffers)
837 - fetch_from_server(2) 632 + reads.push_back(stream.read_some(buf));
838 - };  
839 -  
840 - auto [index, response] = co_await when_any(std::move(requests));  
841 - }  
842 - @endcode  
843 -  
844 - @par Example with Vector  
845 - @code  
846 - task<Response> fetch_fastest(std::vector<Server> const& servers) {  
847 - std::vector<task<Response>> requests;  
848 - for (auto const& server : servers)  
849 - requests.push_back(fetch_from(server));  
850   633  
851 - auto [index, response] = co_await when_any(std::move(requests)); 634 + auto result = co_await when_any(std::move(reads));
852 - co_return response; 635 + if (result.index() == 1)
  636 + {
  637 + auto [idx, n] = std::get<1>(result);
  638 + }
853   } 639   }
854   @endcode 640   @endcode
855   641  
856 - @tparam R Range type satisfying IoAwaitableRange. 642 + @see IoAwaitableRange, when_any
857 - @param awaitables Range of awaitables to race concurrently (must not be empty).  
858 - @return A task yielding a pair of (winner_index, result).  
859 -  
860 - @throws std::invalid_argument if range is empty (thrown before coroutine suspends).  
861 - @throws Rethrows the winner's exception if the winning task threw an exception.  
862 -  
863 - @par Remarks  
864 - Elements are moved from the range; for lvalue ranges, the original  
865 - container will have moved-from elements after this call. The range  
866 - is moved onto the coroutine frame to ensure lifetime safety. Unlike  
867 - the variadic overload, no variant wrapper is needed since all tasks  
868 - share the same return type.  
869 -  
870 - @see when_any, IoAwaitableRange  
871   */ 643   */
872   template<IoAwaitableRange R> 644   template<IoAwaitableRange R>
873 - requires (!std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>) 645 + requires detail::is_io_result_v<
  646 + awaitable_result_t<std::ranges::range_value_t<R>>>
  647 + && (!std::is_same_v<
  648 + detail::io_result_payload_t<
  649 + awaitable_result_t<std::ranges::range_value_t<R>>>,
  650 + std::tuple<>>)
HITCBC 874   21 [[nodiscard]] auto when_any(R&& awaitables) 651   14 [[nodiscard]] auto when_any(R&& awaitables)
875 - -> task<std::pair<std::size_t, detail::awaitable_result_t<std::ranges::range_value_t<R>>>> 652 + -> task<std::variant<std::error_code,
  653 + std::pair<std::size_t,
  654 + detail::io_result_payload_t<
  655 + awaitable_result_t<std::ranges::range_value_t<R>>>>>>
876   { 656   {
877   using Awaitable = std::ranges::range_value_t<R>; 657   using Awaitable = std::ranges::range_value_t<R>;
878 - using T = detail::awaitable_result_t<Awaitable>; 658 + using PayloadT = detail::io_result_payload_t<
879 - using result_type = std::pair<std::size_t, T>; 659 + awaitable_result_t<Awaitable>>;
  660 + using result_type = std::variant<std::error_code,
  661 + std::pair<std::size_t, PayloadT>>;
880   using OwnedRange = std::remove_cvref_t<R>; 662   using OwnedRange = std::remove_cvref_t<R>;
881   663  
882   auto count = std::ranges::size(awaitables); 664   auto count = std::ranges::size(awaitables);
883   if(count == 0) 665   if(count == 0)
884   throw std::invalid_argument("when_any requires at least one awaitable"); 666   throw std::invalid_argument("when_any requires at least one awaitable");
885 - // Move/copy range onto coroutine frame to ensure lifetime  
886   667  
887   OwnedRange owned_awaitables = std::forward<R>(awaitables); 668   OwnedRange owned_awaitables = std::forward<R>(awaitables);
888   669  
889 - detail::when_any_homogeneous_state<T> state(count); 670 + detail::when_any_io_homogeneous_state<PayloadT> state(count);
890   671  
891 - co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state); 672 + co_await detail::when_any_io_homogeneous_launcher<OwnedRange>(
  673 + &owned_awaitables, &state);
892   674  
893 - if(state.core_.winner_exception_) 675 + // Winner found
894 - std::rethrow_exception(state.core_.winner_exception_); 676 + if(state.core_.has_winner_.load(std::memory_order_acquire))
  677 + {
  678 + if(state.core_.winner_exception_)
  679 + std::rethrow_exception(state.core_.winner_exception_);
  680 + co_return result_type{std::in_place_index<1>,
  681 + std::pair{state.core_.winner_index_, std::move(*state.result_)}};
  682 + }
895   683  
896 - co_return result_type{state.core_.winner_index_, std::move(*state.result_)}; 684 + // No winner — report last failure
  685 + if(state.last_exception_)
  686 + std::rethrow_exception(state.last_exception_);
  687 + co_return result_type{std::in_place_index<0>, state.last_error_};
HITCBC 897   42 } 688   28 }
898   689  
899 - /** Wait for the first awaitable to complete (void range overload). 690 + /** Race a range of void io_result-returning awaitables.
900 -  
901 - Races a range of void-returning awaitables. Since void awaitables have  
902 - no result value, only the winner's index is returned.  
903 -  
904 - @par Suspends  
905 - The calling coroutine suspends when co_await is invoked. All awaitables  
906 - in the range are launched concurrently and execute in parallel. The  
907 - coroutine resumes only after all awaitables have completed, even though  
908 - the winner is determined by the first to finish.  
909   691  
910 - @par Completion Conditions 692 + Only a child returning !ec can win. Returns the winner's index
911 - @li Winner is determined when the first awaitable completes (success or exception) 693 + at variant index 1, or error_code at index 0 on all-fail.
912 - @li Only one task can claim winner status via atomic compare-exchange  
913 - @li Once a winner exists, stop is requested for all remaining siblings  
914 - @li Parent coroutine resumes only after all siblings acknowledge completion  
915 - @li The winner's index is returned; if the winner threw, the exception is rethrown  
916   694  
917 - @par Cancellation Semantics 695 + @param awaitables Range of io_result<>-returning awaitables (must
918 - Cancellation is supported via stop_token propagated through the 696 + not be empty).
919 - IoAwaitable protocol:  
920 - @li Each child awaitable receives a stop_token derived from a shared stop_source  
921 - @li When the parent's stop token is activated, the stop is forwarded to all children  
922 - @li When a winner is determined, stop_source_.request_stop() is called immediately  
923 - @li Siblings must handle cancellation gracefully and complete before parent resumes  
924 - @li Stop requests are cooperative; tasks must check and respond to them  
925   697  
926 - @par Concurrency/Overlap 698 + @return A task yielding variant<error_code, size_t> where index 0
927 - All awaitables are launched concurrently before any can complete. 699 + is failure and index 1 carries the winner's index.
928 - The launcher iterates through the range, starting each task on the  
929 - caller's executor. Tasks may execute in parallel on multi-threaded  
930 - executors or interleave on single-threaded executors. There is no  
931 - guaranteed ordering of task completion.  
932   700  
933 - @par Notable Error Conditions 701 + @throws std::invalid_argument if range is empty.
934 - @li Empty range: throws std::invalid_argument immediately (not via co_return) 702 + @throws Rethrows first exception when no winner and at least one
935 - @li Winner exception: if the winning task threw, that exception is rethrown 703 + child threw.
936 - @li Non-winner exceptions: silently discarded (only winner's result matters)  
937 - @li Cancellation: tasks may complete via cancellation without throwing  
938   704  
939   @par Example 705   @par Example
940   @code 706   @code
941 - task<void> example() { 707 + task<void> example()
942 - std::vector<task<void>> tasks; 708 + {
943 - for (int i = 0; i < 5; ++i) 709 + std::vector<io_task<>> jobs;
944 - tasks.push_back(background_work(i)); 710 + jobs.push_back(background_work_a());
945 - 711 + jobs.push_back(background_work_b());
946 - std::size_t winner = co_await when_any(std::move(tasks));  
947 - // winner is the index of the first task to complete  
948 - }  
949 - @endcode  
950 -  
951 - @par Example with Timeout  
952 - @code  
953 - task<void> with_timeout() {  
954 - std::vector<task<void>> tasks;  
955 - tasks.push_back(long_running_operation());  
956 - tasks.push_back(delay(std::chrono::seconds(5)));  
957   712  
958 - std::size_t winner = co_await when_any(std::move(tasks)); 713 + auto result = co_await when_any(std::move(jobs));
959 - if (winner == 1) { 714 + if (result.index() == 1)
960 - // Timeout occurred 715 + {
  716 + auto winner = std::get<1>(result);
961   } 717   }
962   } 718   }
963   @endcode 719   @endcode
964   720  
965 - @tparam R Range type satisfying IoAwaitableRange with void result. 721 + @see IoAwaitableRange, when_any
966 - @param awaitables Range of void awaitables to race concurrently (must not be empty).  
967 - @return A task yielding the winner's index (zero-based).  
968 -  
969 - @throws std::invalid_argument if range is empty (thrown before coroutine suspends).  
970 - @throws Rethrows the winner's exception if the winning task threw an exception.  
971 -  
972 - @par Remarks  
973 - Elements are moved from the range; for lvalue ranges, the original  
974 - container will have moved-from elements after this call. The range  
975 - is moved onto the coroutine frame to ensure lifetime safety. Unlike  
976 - the non-void overload, no result storage is needed since void tasks  
977 - produce no value.  
978 -  
979 - @see when_any, IoAwaitableRange  
980   */ 722   */
981   template<IoAwaitableRange R> 723   template<IoAwaitableRange R>
982 - requires std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>> 724 + requires detail::is_io_result_v<
ECB 983 - 3 [[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t> 725 + awaitable_result_t<std::ranges::range_value_t<R>>>
  726 + && std::is_same_v<
  727 + detail::io_result_payload_t<
  728 + awaitable_result_t<std::ranges::range_value_t<R>>>,
  729 + std::tuple<>>
HITGNC   730 + 2 [[nodiscard]] auto when_any(R&& awaitables)
  731 + -> task<std::variant<std::error_code, std::size_t>>
984   { 732   {
985   using OwnedRange = std::remove_cvref_t<R>; 733   using OwnedRange = std::remove_cvref_t<R>;
  734 + using result_type = std::variant<std::error_code, std::size_t>;
986   735  
987   auto count = std::ranges::size(awaitables); 736   auto count = std::ranges::size(awaitables);
988   if(count == 0) 737   if(count == 0)
989   throw std::invalid_argument("when_any requires at least one awaitable"); 738   throw std::invalid_argument("when_any requires at least one awaitable");
990 - // Move/copy range onto coroutine frame to ensure lifetime  
991   739  
992   OwnedRange owned_awaitables = std::forward<R>(awaitables); 740   OwnedRange owned_awaitables = std::forward<R>(awaitables);
993   741  
994 - detail::when_any_homogeneous_state<void> state(count); 742 + detail::when_any_io_homogeneous_state<std::tuple<>> state(count);
995   743  
996 - co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state); 744 + co_await detail::when_any_io_homogeneous_launcher<OwnedRange>(
  745 + &owned_awaitables, &state);
  746 +
  747 + // Winner found
  748 + if(state.core_.has_winner_.load(std::memory_order_acquire))
  749 + {
  750 + if(state.core_.winner_exception_)
  751 + std::rethrow_exception(state.core_.winner_exception_);
  752 + co_return result_type{std::in_place_index<1>,
  753 + state.core_.winner_index_};
  754 + }
  755 +
  756 + // No winner — report last failure
  757 + if(state.last_exception_)
  758 + std::rethrow_exception(state.last_exception_);
  759 + co_return result_type{std::in_place_index<0>, state.last_error_};
HITGNC   760 + 4 }
  761 +
  762 + /** Race io_result-returning awaitables, selecting the first success.
  763 +
  764 + Overload selected when all children return io_result<Ts...>.
  765 + Only a child returning !ec can win. Errors and exceptions do
  766 + not claim winner status.
  767 +
  768 + @return A task yielding variant<error_code, R1, ..., Rn> where
  769 + index 0 is the failure/no-winner case and index i+1
  770 + identifies the winning child.
  771 + */
  772 + template<IoAwaitable... As>
  773 + requires (sizeof...(As) > 0)
  774 + && detail::all_io_result_awaitables<As...>
HITGNC   775 + 16 [[nodiscard]] auto when_any(As... as)
  776 + -> task<std::variant<
  777 + std::error_code,
  778 + detail::io_result_payload_t<awaitable_result_t<As>>...>>
  779 + {
  780 + using result_type = std::variant<
  781 + std::error_code,
  782 + detail::io_result_payload_t<awaitable_result_t<As>>...>;
  783 +
  784 + detail::when_any_io_state<
  785 + detail::io_result_payload_t<awaitable_result_t<As>>...> state;
  786 + std::tuple<As...> awaitable_tuple(std::move(as)...);
  787 +
  788 + co_await detail::when_any_io_launcher<As...>(
  789 + &awaitable_tuple, &state);
  790 +
  791 + // Winner found: return their result
  792 + if(state.result_.has_value())
  793 + co_return std::move(*state.result_);
997   794  
  795 + // Winner claimed but payload construction failed
998   if(state.core_.winner_exception_) 796   if(state.core_.winner_exception_)
999   std::rethrow_exception(state.core_.winner_exception_); 797   std::rethrow_exception(state.core_.winner_exception_);
1000   798  
1001 - co_return state.core_.winner_index_; 799 + // No winner — report last failure
  800 + if(state.last_exception_)
  801 + std::rethrow_exception(state.last_exception_);
  802 + co_return result_type{std::in_place_index<0>, state.last_error_};
HITCBC 1002   6 } 803   32 }
1003   804  
1004   } // namespace capy 805   } // namespace capy
1005   } // namespace boost 806   } // namespace boost
1006   807  
1007   #endif 808   #endif