100.00% Lines (66/66) 100.00% Functions (12/12)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/capy 7   // Official repository: https://github.com/cppalliance/capy
8   // 8   //
9   9  
10   #ifndef BOOST_CAPY_ASYNC_EVENT_HPP 10   #ifndef BOOST_CAPY_ASYNC_EVENT_HPP
11   #define BOOST_CAPY_ASYNC_EVENT_HPP 11   #define BOOST_CAPY_ASYNC_EVENT_HPP
12   12  
13   #include <boost/capy/detail/config.hpp> 13   #include <boost/capy/detail/config.hpp>
14   #include <boost/capy/detail/intrusive.hpp> 14   #include <boost/capy/detail/intrusive.hpp>
  15 + #include <boost/capy/continuation.hpp>
15   #include <boost/capy/concept/executor.hpp> 16   #include <boost/capy/concept/executor.hpp>
16   #include <boost/capy/error.hpp> 17   #include <boost/capy/error.hpp>
17   #include <boost/capy/ex/io_env.hpp> 18   #include <boost/capy/ex/io_env.hpp>
18   #include <boost/capy/io_result.hpp> 19   #include <boost/capy/io_result.hpp>
19   20  
20   #include <stop_token> 21   #include <stop_token>
21   22  
22   #include <atomic> 23   #include <atomic>
23   #include <coroutine> 24   #include <coroutine>
24   #include <new> 25   #include <new>
25   #include <utility> 26   #include <utility>
26   27  
27   /* async_event implementation notes 28   /* async_event implementation notes
28   ================================= 29   =================================
29   30  
30   Same cancellation pattern as async_mutex (see that file for the 31   Same cancellation pattern as async_mutex (see that file for the
31   full discussion on claimed_, stop_cb lifetime, member ordering, 32   full discussion on claimed_, stop_cb lifetime, member ordering,
32   and threading assumptions). 33   and threading assumptions).
33   34  
34   Key difference: set() wakes ALL waiters (broadcast), not one. 35   Key difference: set() wakes ALL waiters (broadcast), not one.
35   It pops every waiter from the list and posts the ones it 36   It pops every waiter from the list and posts the ones it
36   claims. Waiters already claimed by a stop callback are skipped. 37   claims. Waiters already claimed by a stop callback are skipped.
37   38  
38   Because set() pops all waiters, a canceled waiter may have been 39   Because set() pops all waiters, a canceled waiter may have been
39   removed from the list by set() before its await_resume runs. 40   removed from the list by set() before its await_resume runs.
40   This requires a separate in_list_ flag (unlike async_mutex where 41   This requires a separate in_list_ flag (unlike async_mutex where
41   active_ served double duty). await_resume only calls remove() 42   active_ served double duty). await_resume only calls remove()
42   when in_list_ is true. 43   when in_list_ is true.
43   */ 44   */
44   45  
45   namespace boost { 46   namespace boost {
46   namespace capy { 47   namespace capy {
47   48  
48   /** An asynchronous event for coroutines. 49   /** An asynchronous event for coroutines.
49   50  
50   This event provides a way to notify multiple coroutines that some 51   This event provides a way to notify multiple coroutines that some
51   condition has occurred. When a coroutine awaits an unset event, it 52   condition has occurred. When a coroutine awaits an unset event, it
52   suspends and is added to a wait queue. When the event is set, all 53   suspends and is added to a wait queue. When the event is set, all
53   waiting coroutines are resumed. 54   waiting coroutines are resumed.
54   55  
55   @par Cancellation 56   @par Cancellation
56   57  
57   When a coroutine is suspended waiting for the event and its stop 58   When a coroutine is suspended waiting for the event and its stop
58   token is triggered, the waiter completes with `error::canceled` 59   token is triggered, the waiter completes with `error::canceled`
59   instead of waiting for `set()`. 60   instead of waiting for `set()`.
60   61  
61   Cancellation only applies while the coroutine is suspended in the 62   Cancellation only applies while the coroutine is suspended in the
62   wait queue. If the event is already set when `wait()` is called, 63   wait queue. If the event is already set when `wait()` is called,
63   the wait completes immediately even if the stop token is already 64   the wait completes immediately even if the stop token is already
64   signaled. 65   signaled.
65   66  
66   @par Zero Allocation 67   @par Zero Allocation
67   68  
68   No heap allocation occurs for wait operations. 69   No heap allocation occurs for wait operations.
69   70  
70   @par Thread Safety 71   @par Thread Safety
71   72  
  73 + Distinct objects: Safe.@n
  74 + Shared objects: Unsafe.
  75 +
72   The event operations are designed for single-threaded use on one 76   The event operations are designed for single-threaded use on one
73   executor. The stop callback may fire from any thread. 77   executor. The stop callback may fire from any thread.
74   78  
  79 + This type is non-copyable and non-movable because suspended
  80 + waiters hold intrusive pointers into the event's internal list.
  81 +
75   @par Example 82   @par Example
76   @code 83   @code
77   async_event event; 84   async_event event;
78   85  
79   task<> waiter() { 86   task<> waiter() {
80   auto [ec] = co_await event.wait(); 87   auto [ec] = co_await event.wait();
81   if(ec) 88   if(ec)
82   co_return; 89   co_return;
83   // ... event was set ... 90   // ... event was set ...
84   } 91   }
85   92  
86   task<> notifier() { 93   task<> notifier() {
87   // ... do some work ... 94   // ... do some work ...
88   event.set(); // Wake all waiters 95   event.set(); // Wake all waiters
89   } 96   }
90   @endcode 97   @endcode
91   */ 98   */
92   class async_event 99   class async_event
93   { 100   {
94   public: 101   public:
95   class wait_awaiter; 102   class wait_awaiter;
96   103  
97   private: 104   private:
98   bool set_ = false; 105   bool set_ = false;
99   detail::intrusive_list<wait_awaiter> waiters_; 106   detail::intrusive_list<wait_awaiter> waiters_;
100   107  
101   public: 108   public:
102   /** Awaiter returned by wait(). 109   /** Awaiter returned by wait().
103   */ 110   */
104   class wait_awaiter 111   class wait_awaiter
105   : public detail::intrusive_list<wait_awaiter>::node 112   : public detail::intrusive_list<wait_awaiter>::node
106   { 113   {
107   friend class async_event; 114   friend class async_event;
108   115  
109   async_event* e_; 116   async_event* e_;
110 - std::coroutine_handle<> h_; 117 + continuation cont_;
111   executor_ref ex_; 118   executor_ref ex_;
112   119  
113   // Declared before stop_cb_buf_: the callback 120   // Declared before stop_cb_buf_: the callback
114   // accesses these members, so they must still be 121   // accesses these members, so they must still be
115   // alive if the stop_cb_ destructor blocks. 122   // alive if the stop_cb_ destructor blocks.
116   std::atomic<bool> claimed_{false}; 123   std::atomic<bool> claimed_{false};
117   bool canceled_ = false; 124   bool canceled_ = false;
118   bool active_ = false; 125   bool active_ = false;
119   bool in_list_ = false; 126   bool in_list_ = false;
120   127  
121   struct cancel_fn 128   struct cancel_fn
122   { 129   {
123   wait_awaiter* self_; 130   wait_awaiter* self_;
124   131  
HITCBC 125   21 void operator()() const noexcept 132   4 void operator()() const noexcept
126   { 133   {
HITCBC 127   21 if(!self_->claimed_.exchange( 134   4 if(!self_->claimed_.exchange(
128   true, std::memory_order_acq_rel)) 135   true, std::memory_order_acq_rel))
129   { 136   {
HITCBC 130   20 self_->canceled_ = true; 137   3 self_->canceled_ = true;
HITCBC 131 - 20 self_->ex_.post(self_->h_); 138 + 3 self_->ex_.post(self_->cont_);
132   } 139   }
HITCBC 133   21 } 140   4 }
134   }; 141   };
135   142  
136   using stop_cb_t = 143   using stop_cb_t =
137   std::stop_callback<cancel_fn>; 144   std::stop_callback<cancel_fn>;
138   145  
139   // Aligned storage for stop_cb_t. Declared last: 146   // Aligned storage for stop_cb_t. Declared last:
140   // its destructor may block while the callback 147   // its destructor may block while the callback
141   // accesses the members above. 148   // accesses the members above.
142 - #ifdef _MSC_VER 149 + BOOST_CAPY_MSVC_WARNING_PUSH
143 - # pragma warning(push) 150 + BOOST_CAPY_MSVC_WARNING_DISABLE(4324) // padded due to alignas
144 - # pragma warning(disable: 4324) // padded due to alignas  
145 - #endif  
146   alignas(stop_cb_t) 151   alignas(stop_cb_t)
147   unsigned char stop_cb_buf_[sizeof(stop_cb_t)]; 152   unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
148 - #ifdef _MSC_VER 153 + BOOST_CAPY_MSVC_WARNING_POP
149 - # pragma warning(pop)  
150 - #endif  
151   154  
HITCBC 152   37 stop_cb_t& stop_cb_() noexcept 155   20 stop_cb_t& stop_cb_() noexcept
153   { 156   {
154   return *reinterpret_cast<stop_cb_t*>( 157   return *reinterpret_cast<stop_cb_t*>(
HITCBC 155   37 stop_cb_buf_); 158   20 stop_cb_buf_);
156   } 159   }
157   160  
158   public: 161   public:
HITCBC 159   251 ~wait_awaiter() 162   52 ~wait_awaiter()
160   { 163   {
HITCBC 161   251 if(active_) 164   52 if(active_)
HITCBC 162   1 stop_cb_().~stop_cb_t(); 165   1 stop_cb_().~stop_cb_t();
HITCBC 163   251 if(in_list_) 166   52 if(in_list_)
HITCBC 164   1 e_->waiters_.remove(this); 167   1 e_->waiters_.remove(this);
HITCBC 165   251 } 168   52 }
166   169  
HITCBC 167   57 explicit wait_awaiter(async_event* e) noexcept 170   25 explicit wait_awaiter(async_event* e) noexcept
HITCBC 168   57 : e_(e) 171   25 : e_(e)
169   { 172   {
HITCBC 170   57 } 173   25 }
171   174  
HITCBC 172   194 wait_awaiter(wait_awaiter&& o) noexcept 175   27 wait_awaiter(wait_awaiter&& o) noexcept
HITCBC 173   194 : e_(o.e_) 176   27 : e_(o.e_)
HITCBC 174 - 194 , h_(o.h_) 177 + 27 , cont_(o.cont_)
HITCBC 175   194 , ex_(o.ex_) 178   27 , ex_(o.ex_)
HITCBC 176   194 , claimed_(o.claimed_.load( 179   27 , claimed_(o.claimed_.load(
177   std::memory_order_relaxed)) 180   std::memory_order_relaxed))
HITCBC 178   194 , canceled_(o.canceled_) 181   27 , canceled_(o.canceled_)
HITCBC 179   194 , active_(std::exchange(o.active_, false)) 182   27 , active_(std::exchange(o.active_, false))
HITCBC 180   194 , in_list_(std::exchange(o.in_list_, false)) 183   27 , in_list_(std::exchange(o.in_list_, false))
181   { 184   {
HITCBC 182   194 } 185   27 }
183   186  
184   wait_awaiter(wait_awaiter const&) = delete; 187   wait_awaiter(wait_awaiter const&) = delete;
185   wait_awaiter& operator=(wait_awaiter const&) = delete; 188   wait_awaiter& operator=(wait_awaiter const&) = delete;
186   wait_awaiter& operator=(wait_awaiter&&) = delete; 189   wait_awaiter& operator=(wait_awaiter&&) = delete;
187   190  
HITCBC 188   57 bool await_ready() const noexcept 191   25 bool await_ready() const noexcept
189   { 192   {
HITCBC 190   57 return e_->set_; 193   25 return e_->set_;
191   } 194   }
192   195  
193   /** IoAwaitable protocol overload. */ 196   /** IoAwaitable protocol overload. */
194   std::coroutine_handle<> 197   std::coroutine_handle<>
HITCBC 195   47 await_suspend( 198   21 await_suspend(
196   std::coroutine_handle<> h, 199   std::coroutine_handle<> h,
197   io_env const* env) noexcept 200   io_env const* env) noexcept
198   { 201   {
HITCBC 199   47 if(env->stop_token.stop_requested()) 202   21 if(env->stop_token.stop_requested())
200   { 203   {
HITCBC 201   10 canceled_ = true; 204   1 canceled_ = true;
HITCBC 202   10 return h; 205   1 return h;
203   } 206   }
HITCBC 204 - 37 h_ = h; 207 + 20 cont_.h = h;
HITCBC 205   37 ex_ = env->executor; 208   20 ex_ = env->executor;
HITCBC 206   37 e_->waiters_.push_back(this); 209   20 e_->waiters_.push_back(this);
HITCBC 207   37 in_list_ = true; 210   20 in_list_ = true;
HITCBC 208   111 ::new(stop_cb_buf_) stop_cb_t( 211   60 ::new(stop_cb_buf_) stop_cb_t(
HITCBC 209   37 env->stop_token, cancel_fn{this}); 212   20 env->stop_token, cancel_fn{this});
HITCBC 210   37 active_ = true; 213   20 active_ = true;
HITCBC 211   37 return std::noop_coroutine(); 214   20 return std::noop_coroutine();
212   } 215   }
213   216  
HITCBC 214   54 io_result<> await_resume() noexcept 217   22 io_result<> await_resume() noexcept
215   { 218   {
HITCBC 216   54 if(active_) 219   22 if(active_)
217   { 220   {
HITCBC 218   36 stop_cb_().~stop_cb_t(); 221   19 stop_cb_().~stop_cb_t();
HITCBC 219   36 active_ = false; 222   19 active_ = false;
220   } 223   }
HITCBC 221   54 if(canceled_) 224   22 if(canceled_)
222   { 225   {
HITCBC 223   30 if(in_list_) 226   4 if(in_list_)
224   { 227   {
HITCBC 225   20 e_->waiters_.remove(this); 228   3 e_->waiters_.remove(this);
HITCBC 226   20 in_list_ = false; 229   3 in_list_ = false;
227   } 230   }
ECB 228   30 return {make_error_code( 231   return {make_error_code(
HITCBC 229   30 error::canceled)}; 232   4 error::canceled)};
230   } 233   }
HITCBC 231   24 return {{}}; 234   18 return {{}};
232   } 235   }
233   }; 236   };
234   237  
  238 + /// Construct an unset event.
ECB 235   20 async_event() = default; 239   async_event() = default;
236   240  
237 - // Non-copyable, non-movable 241 + /// Copy constructor (deleted).
238   async_event(async_event const&) = delete; 242   async_event(async_event const&) = delete;
  243 +
  244 + /// Copy assignment (deleted).
239   async_event& operator=(async_event const&) = delete; 245   async_event& operator=(async_event const&) = delete;
240   246  
  247 + /// Move constructor (deleted).
  248 + async_event(async_event&&) = delete;
  249 +
  250 + /// Move assignment (deleted).
  251 + async_event& operator=(async_event&&) = delete;
  252 +
241   /** Returns an awaiter that waits until the event is set. 253   /** Returns an awaiter that waits until the event is set.
242   254  
243   If the event is already set, completes immediately. 255   If the event is already set, completes immediately.
244   256  
245 - @return An awaitable yielding `(error_code)`. 257 + @return An awaitable that await-returns `(error_code)`.
246   */ 258   */
HITCBC 247   57 wait_awaiter wait() noexcept 259   25 wait_awaiter wait() noexcept
248   { 260   {
HITCBC 249   57 return wait_awaiter{this}; 261   25 return wait_awaiter{this};
250   } 262   }
251   263  
252   /** Sets the event. 264   /** Sets the event.
253   265  
254   All waiting coroutines are resumed. Canceled waiters 266   All waiting coroutines are resumed. Canceled waiters
255   are skipped. Subsequent calls to wait() complete 267   are skipped. Subsequent calls to wait() complete
256   immediately until clear() is called. 268   immediately until clear() is called.
257   */ 269   */
HITCBC 258   23 void set() 270   17 void set()
259   { 271   {
HITCBC 260   23 set_ = true; 272   17 set_ = true;
261   for(;;) 273   for(;;)
262   { 274   {
HITCBC 263   39 auto* w = waiters_.pop_front(); 275   33 auto* w = waiters_.pop_front();
HITCBC 264   39 if(!w) 276   33 if(!w)
HITCBC 265   23 break; 277   17 break;
HITCBC 266   16 w->in_list_ = false; 278   16 w->in_list_ = false;
HITCBC 267   16 if(!w->claimed_.exchange( 279   16 if(!w->claimed_.exchange(
268   true, std::memory_order_acq_rel)) 280   true, std::memory_order_acq_rel))
269   { 281   {
HITCBC 270 - 16 w->ex_.post(w->h_); 282 + 16 w->ex_.post(w->cont_);
271   } 283   }
HITCBC 272   16 } 284   16 }
HITCBC 273   23 } 285   17 }
274   286  
275   /** Clears the event. 287   /** Clears the event.
276   288  
277   Subsequent calls to wait() will suspend until 289   Subsequent calls to wait() will suspend until
278   set() is called again. 290   set() is called again.
279   */ 291   */
HITCBC 280   2 void clear() noexcept 292   2 void clear() noexcept
281   { 293   {
HITCBC 282   2 set_ = false; 294   2 set_ = false;
HITCBC 283   2 } 295   2 }
284   296  
285   /** Returns true if the event is currently set. 297   /** Returns true if the event is currently set.
286   */ 298   */
HITCBC 287   9 bool is_set() const noexcept 299   9 bool is_set() const noexcept
288   { 300   {
HITCBC 289   9 return set_; 301   9 return set_;
290   } 302   }
291   }; 303   };
292   304  
293   } // namespace capy 305   } // namespace capy
294   } // namespace boost 306   } // namespace boost
295   307  
296   #endif 308   #endif