100.00% Lines (28/28) 100.00% Functions (13/13)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/capy 7   // Official repository: https://github.com/cppalliance/capy
8   // 8   //
9   9  
10   #ifndef BOOST_CAPY_EX_STRAND_HPP 10   #ifndef BOOST_CAPY_EX_STRAND_HPP
11   #define BOOST_CAPY_EX_STRAND_HPP 11   #define BOOST_CAPY_EX_STRAND_HPP
12   12  
13   #include <boost/capy/detail/config.hpp> 13   #include <boost/capy/detail/config.hpp>
  14 + #include <boost/capy/continuation.hpp>
14   #include <coroutine> 15   #include <coroutine>
15   #include <boost/capy/ex/detail/strand_service.hpp> 16   #include <boost/capy/ex/detail/strand_service.hpp>
16   17  
17   #include <type_traits> 18   #include <type_traits>
18   19  
19   namespace boost { 20   namespace boost {
20   namespace capy { 21   namespace capy {
21 - //----------------------------------------------------------  
22 -  
23   22  
24   /** Provides serialized coroutine execution for any executor type. 23   /** Provides serialized coroutine execution for any executor type.
25   24  
26   A strand wraps an inner executor and ensures that coroutines 25   A strand wraps an inner executor and ensures that coroutines
27   dispatched through it never run concurrently. At most one 26   dispatched through it never run concurrently. At most one
28   coroutine executes at a time within a strand, even when the 27   coroutine executes at a time within a strand, even when the
29   underlying executor runs on multiple threads. 28   underlying executor runs on multiple threads.
30   29  
31   Strands are lightweight handles that can be copied freely. 30   Strands are lightweight handles that can be copied freely.
32   Copies share the same internal serialization state, so 31   Copies share the same internal serialization state, so
33   coroutines dispatched through any copy are serialized with 32   coroutines dispatched through any copy are serialized with
34   respect to all other copies. 33   respect to all other copies.
35   34  
36   @par Invariant 35   @par Invariant
37   Coroutines resumed through a strand shall not run concurrently. 36   Coroutines resumed through a strand shall not run concurrently.
38   37  
39   @par Implementation 38   @par Implementation
40 - The strand uses a service-based architecture with a fixed pool 39 + Each strand allocates a private serialization state. Strands
41 - of 211 implementation objects. New strands hash to select an 40 + constructed from the same execution context share a small pool
42 - impl from the pool. Strands that hash to the same index share 41 + of mutexes (193 entries) selected by hash; mutex sharing causes
43 - serialization, which is harmless (just extra serialization) 42 + only brief contention on the push/pop critical section, never
44 - and rare with 211 buckets. 43 + cross-strand state sharing. Construction cost: one
  44 + `std::make_shared` per strand.
45   45  
46   @par Executor Concept 46   @par Executor Concept
47   This class satisfies the `Executor` concept, providing: 47   This class satisfies the `Executor` concept, providing:
48   - `context()` - Returns the underlying execution context 48   - `context()` - Returns the underlying execution context
49   - `on_work_started()` / `on_work_finished()` - Work tracking 49   - `on_work_started()` / `on_work_finished()` - Work tracking
50 - - `dispatch(h)` - May run immediately if strand is idle 50 + - `dispatch(continuation&)` - May run immediately if strand is idle
51 - - `post(h)` - Always queues for later execution 51 + - `post(continuation&)` - Always queues for later execution
52   52  
53   @par Thread Safety 53   @par Thread Safety
54   Distinct objects: Safe. 54   Distinct objects: Safe.
55   Shared objects: Safe. 55   Shared objects: Safe.
56   56  
57   @par Example 57   @par Example
58   @code 58   @code
59   thread_pool pool(4); 59   thread_pool pool(4);
60   auto strand = make_strand(pool.get_executor()); 60   auto strand = make_strand(pool.get_executor());
61   61  
62 - // These coroutines will never run concurrently 62 + // Continuations are linked intrusively into the strand's queue,
63 - strand.post(coro1); 63 + // so each one must outlive its time there. Storage is typically
64 - strand.post(coro2); 64 + // owned by the awaitable or operation state that posted it.
65 - strand.post(coro3); 65 + continuation c1{h1}, c2{h2}, c3{h3};
  66 + strand.post(c1);
  67 + strand.post(c2);
  68 + strand.post(c3);
66   @endcode 69   @endcode
67   70  
68   @tparam E The type of the underlying executor. Must 71   @tparam E The type of the underlying executor. Must
69   satisfy the `Executor` concept. 72   satisfy the `Executor` concept.
70   73  
71   @see make_strand, Executor 74   @see make_strand, Executor
72   */ 75   */
73   template<typename Ex> 76   template<typename Ex>
74   class strand 77   class strand
75   { 78   {
76 - detail::strand_impl* impl_; 79 + std::shared_ptr<detail::strand_impl> impl_;
77   Ex ex_; 80   Ex ex_;
78   81  
  82 + friend struct strand_test;
  83 +
79   public: 84   public:
80   /** The type of the underlying executor. 85   /** The type of the underlying executor.
81   */ 86   */
82   using inner_executor_type = Ex; 87   using inner_executor_type = Ex;
83   88  
84   /** Construct a strand for the specified executor. 89   /** Construct a strand for the specified executor.
85   90  
86 - Obtains a strand implementation from the service associated 91 + Allocates a fresh strand implementation from the service
87 - with the executor's context. The implementation is selected 92 + associated with the executor's context.
88 - from a fixed pool using a hash function.  
89   93  
90   @param ex The inner executor to wrap. Coroutines will 94   @param ex The inner executor to wrap. Coroutines will
91   ultimately be dispatched through this executor. 95   ultimately be dispatched through this executor.
92   96  
93   @note This constructor is disabled if the argument is a 97   @note This constructor is disabled if the argument is a
94   strand type, to prevent strand-of-strand wrapping. 98   strand type, to prevent strand-of-strand wrapping.
95   */ 99   */
96   template<typename Ex1, 100   template<typename Ex1,
97   typename = std::enable_if_t< 101   typename = std::enable_if_t<
98   !std::is_same_v<std::decay_t<Ex1>, strand> && 102   !std::is_same_v<std::decay_t<Ex1>, strand> &&
99   !detail::is_strand<std::decay_t<Ex1>>::value && 103   !detail::is_strand<std::decay_t<Ex1>>::value &&
100   std::is_convertible_v<Ex1, Ex>>> 104   std::is_convertible_v<Ex1, Ex>>>
101   explicit 105   explicit
HITCBC 102   23 strand(Ex1&& ex) 106   11442 strand(Ex1&& ex)
HITCBC 103   23 : impl_(detail::get_strand_service(ex.context()) 107   11442 : impl_(detail::get_strand_service(ex.context())
HITCBC 104 - 23 .get_implementation()) 108 + 11442 .create_implementation())
HITCBC 105   23 , ex_(std::forward<Ex1>(ex)) 109   11442 , ex_(std::forward<Ex1>(ex))
106   { 110   {
HITCBC 107   23 } 111   11442 }
108   112  
109 - /** Copy constructor. 113 + /** Construct a copy.
110   114  
111   Creates a strand that shares serialization state with 115   Creates a strand that shares serialization state with
112   the original. Coroutines dispatched through either strand 116   the original. Coroutines dispatched through either strand
113   will be serialized with respect to each other. 117   will be serialized with respect to each other.
114   */ 118   */
HITCBC 115   1 strand(strand const&) = default; 119   9 strand(strand const&) = default;
116   120  
117 - /** Move constructor. 121 + /** Construct by moving.
  122 +
  123 + @note A moved-from strand is only safe to destroy
  124 + or reassign.
118   */ 125   */
HITGIC 119   strand(strand&&) = default; 126   11443 strand(strand&&) = default;
120   127  
121 - /** Copy assignment operator. 128 + /** Assign by copying.
122   */ 129   */
HITGIC 123   strand& operator=(strand const&) = default; 130   1 strand& operator=(strand const&) = default;
124   131  
125 - /** Move assignment operator. 132 + /** Assign by moving.
  133 +
  134 + @note A moved-from strand is only safe to destroy
  135 + or reassign.
126   */ 136   */
HITGIC 127   strand& operator=(strand&&) = default; 137   1 strand& operator=(strand&&) = default;
128   138  
129   /** Return the underlying executor. 139   /** Return the underlying executor.
130   140  
131   @return A const reference to the inner executor. 141   @return A const reference to the inner executor.
132   */ 142   */
133   Ex const& 143   Ex const&
HITCBC 134   1 get_inner_executor() const noexcept 144   1 get_inner_executor() const noexcept
135   { 145   {
HITCBC 136   1 return ex_; 146   1 return ex_;
137   } 147   }
138   148  
139   /** Return the underlying execution context. 149   /** Return the underlying execution context.
140   150  
141   @return A reference to the execution context associated 151   @return A reference to the execution context associated
142   with the inner executor. 152   with the inner executor.
143   */ 153   */
144   auto& 154   auto&
HITCBC 145   1 context() const noexcept 155   5 context() const noexcept
146   { 156   {
HITCBC 147   1 return ex_.context(); 157   5 return ex_.context();
148   } 158   }
149   159  
150   /** Notify that work has started. 160   /** Notify that work has started.
151   161  
152   Delegates to the inner executor's `on_work_started()`. 162   Delegates to the inner executor's `on_work_started()`.
153   This is a no-op for most executor types. 163   This is a no-op for most executor types.
154   */ 164   */
155   void 165   void
HITCBC 156   2 on_work_started() const noexcept 166   6 on_work_started() const noexcept
157   { 167   {
HITCBC 158   2 ex_.on_work_started(); 168   6 ex_.on_work_started();
HITCBC 159   2 } 169   6 }
160   170  
161   /** Notify that work has finished. 171   /** Notify that work has finished.
162   172  
163   Delegates to the inner executor's `on_work_finished()`. 173   Delegates to the inner executor's `on_work_finished()`.
164   This is a no-op for most executor types. 174   This is a no-op for most executor types.
165   */ 175   */
166   void 176   void
HITCBC 167   2 on_work_finished() const noexcept 177   6 on_work_finished() const noexcept
168   { 178   {
HITCBC 169   2 ex_.on_work_finished(); 179   6 ex_.on_work_finished();
HITCBC 170   2 } 180   6 }
171   181  
172   /** Determine whether the strand is running in the current thread. 182   /** Determine whether the strand is running in the current thread.
173   183  
174   @return true if the current thread is executing a coroutine 184   @return true if the current thread is executing a coroutine
175   within this strand's dispatch loop. 185   within this strand's dispatch loop.
176   */ 186   */
177   bool 187   bool
HITCBC 178   1 running_in_this_thread() const noexcept 188   4 running_in_this_thread() const noexcept
179   { 189   {
HITCBC 180   1 return detail::strand_service::running_in_this_thread(*impl_); 190   4 return detail::strand_service::running_in_this_thread(*impl_);
181   } 191   }
182   192  
183   /** Compare two strands for equality. 193   /** Compare two strands for equality.
184   194  
185   Two strands are equal if they share the same internal 195   Two strands are equal if they share the same internal
186   serialization state. Equal strands serialize coroutines 196   serialization state. Equal strands serialize coroutines
187   with respect to each other. 197   with respect to each other.
188   198  
189   @param other The strand to compare against. 199   @param other The strand to compare against.
190   @return true if both strands share the same implementation. 200   @return true if both strands share the same implementation.
191   */ 201   */
192   bool 202   bool
HITCBC 193   4 operator==(strand const& other) const noexcept 203   499505 operator==(strand const& other) const noexcept
194   { 204   {
HITCBC 195 - 4 return impl_ == other.impl_; 205 + 499505 return impl_.get() == other.impl_.get();
196   } 206   }
197   207  
198 - /** Post a coroutine to the strand. 208 + /** Post a continuation to the strand.
199   209  
200 - The coroutine is always queued for execution, never resumed 210 + The continuation is always queued for execution, never resumed
201   immediately. When the strand becomes available, queued 211   immediately. When the strand becomes available, queued
202 - coroutines execute in FIFO order on the underlying executor. 212 + work executes in FIFO order on the underlying executor.
203   213  
204   @par Ordering 214   @par Ordering
205   Guarantees strict FIFO ordering relative to other post() calls. 215   Guarantees strict FIFO ordering relative to other post() calls.
206   Use this instead of dispatch() when ordering matters. 216   Use this instead of dispatch() when ordering matters.
207   217  
208 - @param h The coroutine handle to post. 218 + @param c The continuation to post. The caller retains
  219 + ownership; the continuation must remain valid until
  220 + it is dequeued and resumed.
209   */ 221   */
210   void 222   void
HITCBC 211 - 321 post(std::coroutine_handle<> h) const 223 + 30335 post(continuation& c) const
212   { 224   {
HITCBC 213 - 321 detail::strand_service::post(*impl_, executor_ref(ex_), h); 225 + 30335 detail::strand_service::post(impl_, executor_ref(ex_), c);
HITCBC 214   321 } 226   30335 }
215   227  
216 - /** Dispatch a coroutine through the strand. 228 + /** Dispatch a continuation through the strand.
217   229  
218   Returns a handle for symmetric transfer. If the calling 230   Returns a handle for symmetric transfer. If the calling
219 - thread is already executing within this strand, returns `h`. 231 + thread is already executing within this strand, returns `c.h`.
220 - Otherwise, the coroutine is queued and 232 + Otherwise, the continuation is queued and
221   `std::noop_coroutine()` is returned. 233   `std::noop_coroutine()` is returned.
222   234  
223   @par Ordering 235   @par Ordering
224   Callers requiring strict FIFO ordering should use post() 236   Callers requiring strict FIFO ordering should use post()
225 - instead, which always queues the coroutine. 237 + instead, which always queues the continuation.
226   238  
227 - @param h The coroutine handle to dispatch. 239 + @param c The continuation to dispatch. The caller retains
  240 + ownership; the continuation must remain valid until
  241 + it is dequeued and resumed.
228   242  
229   @return A handle for symmetric transfer or `std::noop_coroutine()`. 243   @return A handle for symmetric transfer or `std::noop_coroutine()`.
230   */ 244   */
231   std::coroutine_handle<> 245   std::coroutine_handle<>
HITCBC 232 - 1 dispatch(std::coroutine_handle<> h) const 246 + 8 dispatch(continuation& c) const
233   { 247   {
HITCBC 234 - 1 return detail::strand_service::dispatch(*impl_, executor_ref(ex_), h); 248 + 8 return detail::strand_service::dispatch(impl_, executor_ref(ex_), c);
235   } 249   }
236   }; 250   };
237   251  
238   // Deduction guide 252   // Deduction guide
239   template<typename Ex> 253   template<typename Ex>
240   strand(Ex) -> strand<Ex>; 254   strand(Ex) -> strand<Ex>;
241   255  
242   } // namespace capy 256   } // namespace capy
243   } // namespace boost 257   } // namespace boost
244   258  
245   #endif 259   #endif