95.61% Lines (109/114) 95.83% Functions (23/24)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/capy 7   // Official repository: https://github.com/cppalliance/capy
8   // 8   //
9   9  
10 - #include "src/ex/detail/strand_queue.hpp" 10 + #include "src/ex/detail/strand_impl.hpp"
11   #include <boost/capy/ex/detail/strand_service.hpp> 11   #include <boost/capy/ex/detail/strand_service.hpp>
12 - #include <atomic> 12 + #include <boost/capy/continuation.hpp>
13   #include <coroutine> 13   #include <coroutine>
14 - #include <mutex> 14 + #include <memory>
15 - #include <thread>  
16   #include <utility> 15   #include <utility>
17   16  
18   namespace boost { 17   namespace boost {
19   namespace capy { 18   namespace capy {
20   namespace detail { 19   namespace detail {
21   20  
22 - //---------------------------------------------------------- 21 + // Sentinel stored in invoker_frame_cache_ after shutdown to prevent
23 - 22 + // in-flight invokers from repopulating a freed cache slot.
24 - /** Implementation state for a strand. 23 + inline void* const kCacheClosed = reinterpret_cast<void*>(1);
25 -  
26 - Each strand_impl provides serialization for coroutines  
27 - dispatched through strands that share it.  
28 - */  
29 - struct strand_impl  
30 - {  
31 - std::mutex mutex_;  
32 - strand_queue pending_;  
33 - bool locked_ = false;  
34 - std::atomic<std::thread::id> dispatch_thread_{};  
35 - void* cached_frame_ = nullptr;  
36 - };  
37 -  
38 - //----------------------------------------------------------  
39 -  
40 - /** Invoker coroutine for strand dispatch.  
41 -  
42 - Uses custom allocator to recycle frame - one allocation  
43 - per strand_impl lifetime, stored in trailer for recovery.  
44 - */  
45 - struct strand_invoker  
46 - {  
47 - struct promise_type  
48 - {  
DCB 49 - 9 void* operator new(std::size_t n, strand_impl& impl)  
50 - {  
DCB 51 - 9 constexpr auto A = alignof(strand_impl*);  
DCB 52 - 9 std::size_t padded = (n + A - 1) & ~(A - 1);  
DCB 53 - 9 std::size_t total = padded + sizeof(strand_impl*);  
54 -  
DCB 55 - 9 void* p = impl.cached_frame_  
DCB 56 - 9 ? std::exchange(impl.cached_frame_, nullptr)  
DCB 57 - 7 : ::operator new(total);  
58 -  
59 - // Trailer lets delete recover impl  
DCB 60 - 9 *reinterpret_cast<strand_impl**>(  
DCB 61 - 9 static_cast<char*>(p) + padded) = &impl;  
DCB 62 - 9 return p;  
63 - }  
64 -  
DCB 65 - 9 void operator delete(void* p, std::size_t n) noexcept  
66 - {  
DCB 67 - 9 constexpr auto A = alignof(strand_impl*);  
DCB 68 - 9 std::size_t padded = (n + A - 1) & ~(A - 1);  
69 -  
DCB 70 - 9 auto* impl = *reinterpret_cast<strand_impl**>(  
71 - static_cast<char*>(p) + padded);  
72 -  
DCB 73 - 9 if (!impl->cached_frame_)  
DCB 74 - 9 impl->cached_frame_ = p;  
75 - else  
DUB 76 - ::operator delete(p);  
DCB 77 - 9 }  
78 -  
DCB 79 - 9 strand_invoker get_return_object() noexcept  
DCB 80 - 9 { return {std::coroutine_handle<promise_type>::from_promise(*this)}; }  
81 -  
DCB 82 - 9 std::suspend_always initial_suspend() noexcept { return {}; }  
DCB 83 - 9 std::suspend_never final_suspend() noexcept { return {}; }  
DCB 84 - 9 void return_void() noexcept {}  
DUB 85 - void unhandled_exception() { std::terminate(); }  
86 - };  
87 -  
88 - std::coroutine_handle<promise_type> h_;  
89 - };  
90   24  
91 - //---------------------------------------------------------- 25 + /** Concrete strand_service.
92   26  
93 - /** Concrete implementation of strand_service. 27 + Holds a shared mutex pool (193 entries), a linked list of live
  28 + impls (for shutdown traversal), and a single-slot invoker
  29 + coroutine frame cache shared across all strands of this service.
94   30  
95 - Holds the fixed pool of strand_impl objects. 31 + The dispatch helpers (`enqueue`, `dispatch_pending`, etc.) are
  32 + public so the namespace-scope `make_invoker` coroutine and the
  33 + `strand_service` static methods can call them without friendship.
96   */ 34   */
97   class strand_service_impl : public strand_service 35   class strand_service_impl : public strand_service
98   { 36   {
99 - static constexpr std::size_t num_impls = 211; 37 + public:
  38 + static constexpr std::size_t num_mutexes = 193;
100 - strand_impl impls_[num_impls];  
101 - std::size_t salt_ = 0;  
102   39  
103   std::mutex mutex_; 40   std::mutex mutex_;
  41 + std::size_t salt_ = 0;
  42 + std::shared_ptr<std::mutex> mutexes_[num_mutexes];
  43 + intrusive_list<strand_impl> impl_list_;
  44 + std::atomic<void*> invoker_frame_cache_{nullptr};
104 - public:  
105   45  
106   explicit 46   explicit
HITCBC 107   19 strand_service_impl(execution_context&) 47   30 strand_service_impl(execution_context&)
HITCBC 108   4028 { 48   30 {
HITCBC 109   19 } 49   30 }
110   50  
111 - strand_impl* 51 + std::shared_ptr<strand_impl>
HITCBC 112 - 23 get_implementation() override 52 + 11442 create_implementation() override
113   { 53   {
HITGNC   54 + 11442 auto new_impl = std::make_shared<strand_impl>();
  55 +
DCB 114 - 23 std::size_t index = salt_++;  
DCB 115 - 23 index = index % num_impls;  
DCB 116 - 23 return &impls_[index];  
DCB 117 - 23 }  
HITCBC 118   23 std::lock_guard<std::mutex> lock(mutex_); 56   11442 std::lock_guard<std::mutex> lock(mutex_);
119   57  
HITGIC 120 - protected: 58 + 11442 std::size_t s = salt_++;
HITGIC 121 - void 59 + 11442 std::size_t idx = reinterpret_cast<std::size_t>(new_impl.get());
HITCBC 122 - 19 shutdown() override 60 + 11442 idx += idx >> 3;
HITGIC 123 - { 61 + 11442 idx ^= s + 0x9e3779b9 + (idx << 6) + (idx >> 2);
HITCBC 124 - 4028 for(std::size_t i = 0; i < num_impls; ++i) 62 + 11442 idx %= num_mutexes;
HITGIC 125 - { 63 + 11442 if(!mutexes_[idx])
HITCBC 126 - 4009 std::lock_guard<std::mutex> lock(impls_[i].mutex_); 64 + 670 mutexes_[idx] = std::make_shared<std::mutex>();
HITCBC 127 - 4009 impls_[i].locked_ = true; 65 + 11442 new_impl->mutex_ = mutexes_[idx].get();
128   66  
HITCBC 129 - 4009 if(impls_[i].cached_frame_) 67 + 11442 impl_list_.push_back(new_impl.get());
HITGIC 130 - { 68 + 11442 new_impl->service_.store(this, std::memory_order_release);
ECB 131 - 7 ::operator delete(impls_[i].cached_frame_); 69 +
HITCBC 132 - 7 impls_[i].cached_frame_ = nullptr; 70 + 22884 return new_impl;
133 - }  
DCB 134 - 4009 }  
HITCBC 135   19 } 71   11442 }
136 - private:  
137   72  
138   static bool 73   static bool
HITCBC 139 - 322 enqueue(strand_impl& impl, std::coroutine_handle<> h) 74 + 30340 enqueue(strand_impl& impl, continuation& c)
140   { 75   {
HITCBC 141 - 322 std::lock_guard<std::mutex> lock(impl.mutex_); 76 + 30340 std::lock_guard<std::mutex> lock(*impl.mutex_);
HITCBC 142 - 322 impl.pending_.push(h); 77 + 30340 impl.pending_.push(c);
HITCBC 143   322 if(!impl.locked_) 78   30340 if(!impl.locked_)
144   { 79   {
HITCBC 145   9 impl.locked_ = true; 80   19593 impl.locked_ = true;
HITCBC 146   9 return true; 81   19593 return true;
147   } 82   }
HITCBC 148   313 return false; 83   10747 return false;
HITCBC 149   322 } 84   30340 }
150   85  
151   static void 86   static void
HITCBC 152   14 dispatch_pending(strand_impl& impl) 87   20474 dispatch_pending(strand_impl& impl)
153   { 88   {
HITCBC 154   14 strand_queue::taken_batch batch; 89   20474 strand_queue::taken_batch batch;
155   { 90   {
HITCBC 156 - 14 std::lock_guard<std::mutex> lock(impl.mutex_); 91 + 20474 std::lock_guard<std::mutex> lock(*impl.mutex_);
HITCBC 157   14 batch = impl.pending_.take_all(); 92   20474 batch = impl.pending_.take_all();
HITCBC 158   14 } 93   20474 }
HITCBC 159   14 impl.pending_.dispatch_batch(batch); 94   20474 impl.pending_.dispatch_batch(batch);
HITCBC 160   14 } 95   20474 }
161   96  
162   static bool 97   static bool
HITCBC 163   14 try_unlock(strand_impl& impl) 98   20474 try_unlock(strand_impl& impl)
164   { 99   {
HITCBC 165 - 14 std::lock_guard<std::mutex> lock(impl.mutex_); 100 + 20474 std::lock_guard<std::mutex> lock(*impl.mutex_);
HITCBC 166   14 if(impl.pending_.empty()) 101   20474 if(impl.pending_.empty())
167   { 102   {
HITCBC 168   9 impl.locked_ = false; 103   19593 impl.locked_ = false;
HITCBC 169   9 return true; 104   19593 return true;
170   } 105   }
HITCBC 171   5 return false; 106   881 return false;
HITCBC 172   14 } 107   20474 }
173   108  
174   static void 109   static void
HITCBC 175   14 set_dispatch_thread(strand_impl& impl) noexcept 110   20474 set_dispatch_thread(strand_impl& impl) noexcept
176   { 111   {
HITCBC 177   14 impl.dispatch_thread_.store(std::this_thread::get_id()); 112   20474 impl.dispatch_thread_.store(std::this_thread::get_id());
HITCBC 178   14 } 113   20474 }
179   114  
180   static void 115   static void
HITCBC 181   9 clear_dispatch_thread(strand_impl& impl) noexcept 116   19593 clear_dispatch_thread(strand_impl& impl) noexcept
182   { 117   {
HITCBC 183   9 impl.dispatch_thread_.store(std::thread::id{}); 118   19593 impl.dispatch_thread_.store(std::thread::id{});
HITCBC 184   9 } 119   19593 }
185   120  
186 - // Loops until queue empty (aggressive). Alternative: per-batch fairness 121 + // Defined below; needs strand_invoker complete.
187 - // (repost after each batch to let other work run) - explore if starvation observed. 122 + static void
188 - static strand_invoker 123 + post_invoker(std::shared_ptr<strand_impl> impl, executor_ref ex);
ECB 189 - 9 make_invoker(strand_impl& impl) 124 +
  125 + protected:
  126 + void
HITGNC   127 + 30 shutdown() override
190   { 128   {
HITGIC 191 - strand_impl* p = &impl; 129 + 30 std::lock_guard<std::mutex> lock(mutex_);
HITGIC 192 - for(;;) 130 + 30 while(auto* p = impl_list_.pop_front())
193   { 131   {
MISUIC 194 - set_dispatch_thread(*p); 132 + std::lock_guard<std::mutex> impl_lock(*p->mutex_);
MISUIC 195 - dispatch_pending(*p); 133 + p->locked_ = true;
MISUIC 196 - if(try_unlock(*p)) 134 + p->service_.store(nullptr, std::memory_order_release);
197 - {  
198 - clear_dispatch_thread(*p);  
199 - co_return;  
200 - }  
MISUIC 201   } 135   }
  136 +
HITGNC   137 + 30 void* fp = invoker_frame_cache_.exchange(
  138 + kCacheClosed, std::memory_order_acq_rel);
HITGNC   139 + 30 if(fp) ::operator delete(fp);
HITCBC 202   18 } 140   30 }
  141 + };
203   142  
204 - friend class strand_service; 143 + /** Invoker coroutine that drains a strand's pending queue.
  144 +
  145 + Runs once the strand transitions from unlocked to locked. Holds
  146 + the impl alive via the coroutine parameter (a shared_ptr in the
  147 + coroutine frame), so user code may drop its strand handle while
  148 + the invoker is mid-flight.
  149 +
  150 + The frame's allocator recycles a single per-service slot. The
  151 + trailer points at the service (lifetime: execution_context),
  152 + NOT the impl (lifetime: per-strand), so operator delete is
  153 + safe even after the impl has been destroyed.
  154 + */
  155 + struct strand_invoker
  156 + {
  157 + struct promise_type
  158 + {
  159 + // Stored in the coroutine frame so its address is stable for
  160 + // posting to the inner executor.
  161 + continuation self_;
  162 +
  163 + void*
HITGNC   164 + 19593 operator new(
  165 + std::size_t n,
  166 + std::shared_ptr<strand_impl> const& impl)
  167 + {
HITGNC   168 + 19593 auto* svc = impl->service_.load(std::memory_order_acquire);
HITGNC   169 + 19593 constexpr auto A = alignof(strand_service_impl*);
HITGNC   170 + 19593 std::size_t padded = (n + A - 1) & ~(A - 1);
HITGNC   171 + 19593 std::size_t total = padded + sizeof(strand_service_impl*);
  172 +
HITGNC   173 + 19593 void* p = svc->invoker_frame_cache_.exchange(
  174 + nullptr, std::memory_order_acquire);
HITGNC   175 + 19593 if(!p || p == kCacheClosed)
HITGNC   176 + 6761 p = ::operator new(total);
  177 +
HITGNC   178 + 19593 *reinterpret_cast<strand_service_impl**>(
HITGNC   179 + 19593 static_cast<char*>(p) + padded) = svc;
HITGNC   180 + 19593 return p;
  181 + }
  182 +
  183 + void
HITGNC   184 + 19593 operator delete(void* p, std::size_t n) noexcept
  185 + {
HITGNC   186 + 19593 constexpr auto A = alignof(strand_service_impl*);
HITGNC   187 + 19593 std::size_t padded = (n + A - 1) & ~(A - 1);
HITGNC   188 + 19593 auto* svc = *reinterpret_cast<strand_service_impl**>(
  189 + static_cast<char*>(p) + padded);
  190 +
HITGNC   191 + 19593 void* expected = nullptr;
HITGNC   192 + 19593 if(!svc->invoker_frame_cache_.compare_exchange_strong(
  193 + expected, p, std::memory_order_release))
HITGNC   194 + 6744 ::operator delete(p);
HITGNC   195 + 19593 }
  196 +
  197 + strand_invoker
HITGNC   198 + 19593 get_return_object() noexcept
  199 + {
HITGNC   200 + 19593 return {std::coroutine_handle<promise_type>::from_promise(*this)};
  201 + }
  202 +
HITGNC   203 + 19593 std::suspend_always initial_suspend() noexcept { return {}; }
HITGNC   204 + 19593 std::suspend_never final_suspend() noexcept { return {}; }
HITGNC   205 + 19593 void return_void() noexcept {}
MISUNC   206 + void unhandled_exception() { std::terminate(); }
  207 + };
  208 +
  209 + std::coroutine_handle<promise_type> h_;
205   }; 210   };
206   211  
207 - //---------------------------------------------------------- 212 + // The by-value parameter lives in the coroutine frame for the
  213 + // invoker's lifetime, keeping the impl alive past any user-side
  214 + // strand drop.
  215 + static
  216 + strand_invoker
HITGNC   217 + 19593 make_invoker(std::shared_ptr<strand_impl> impl)
  218 + {
  219 + auto* p = impl.get();
  220 + for(;;)
  221 + {
  222 + strand_service_impl::set_dispatch_thread(*p);
  223 + strand_service_impl::dispatch_pending(*p);
  224 + if(strand_service_impl::try_unlock(*p))
  225 + {
  226 + strand_service_impl::clear_dispatch_thread(*p);
  227 + co_return;
  228 + }
  229 + }
HITGNC   230 + 39186 }
  231 +
  232 + void
HITGNC   233 + 19593 strand_service_impl::post_invoker(
  234 + std::shared_ptr<strand_impl> impl,
  235 + executor_ref ex)
  236 + {
HITGNC   237 + 19593 auto invoker = make_invoker(std::move(impl));
HITGNC   238 + 19593 auto& self = invoker.h_.promise().self_;
HITGNC   239 + 19593 self.h = invoker.h_;
HITGNC   240 + 19593 ex.post(self);
HITGNC   241 + 19593 }
  242 +
HITGNC   243 + 22884 strand_impl::~strand_impl()
  244 + {
HITGNC   245 + 11442 auto* svc = service_.load(std::memory_order_acquire);
HITGNC   246 + 11442 if(!svc) return;
HITGNC   247 + 11442 std::lock_guard<std::mutex> lock(svc->mutex_);
HITGNC   248 + 11442 svc->impl_list_.remove(this);
HITGNC   249 + 11442 }
208   250  
HITCBC 209   19 strand_service:: 251   30 strand_service::
HITCBC 210   19 strand_service() 252   30 strand_service()
HITCBC 211   19 : service() 253   30 : service()
212   { 254   {
HITCBC 213   19 } 255   30 }
214   256  
HITCBC 215   19 strand_service:: 257   30 strand_service::
216   ~strand_service() = default; 258   ~strand_service() = default;
217   259  
218   bool 260   bool
HITCBC 219   2 strand_service:: 261   12 strand_service::
220   running_in_this_thread(strand_impl& impl) noexcept 262   running_in_this_thread(strand_impl& impl) noexcept
221   { 263   {
HITCBC 222   2 return impl.dispatch_thread_.load() == std::this_thread::get_id(); 264   12 return impl.dispatch_thread_.load() == std::this_thread::get_id();
223   } 265   }
224   266  
225   std::coroutine_handle<> 267   std::coroutine_handle<>
HITCBC 226   1 strand_service:: 268   8 strand_service::
227 - dispatch(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h) 269 + dispatch(
  270 + std::shared_ptr<strand_impl> const& impl,
  271 + executor_ref ex,
  272 + continuation& c)
228   { 273   {
HITCBC 229 - 1 if(running_in_this_thread(impl)) 274 + 8 if(running_in_this_thread(*impl))
HITGBC 230 - return h; 275 + 3 return c.h;
231   276  
HITCBC 232 - 1 if(strand_service_impl::enqueue(impl, h)) 277 + 5 if(strand_service_impl::enqueue(*impl, c))
HITCBC 233 - 1 ex.post(strand_service_impl::make_invoker(impl).h_); 278 + 5 strand_service_impl::post_invoker(impl, ex);
HITCBC 234   1 return std::noop_coroutine(); 279   5 return std::noop_coroutine();
235   } 280   }
236   281  
237   void 282   void
HITCBC 238   321 strand_service:: 283   30335 strand_service::
239 - post(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h) 284 + post(
  285 + std::shared_ptr<strand_impl> const& impl,
  286 + executor_ref ex,
  287 + continuation& c)
240   { 288   {
HITCBC 241 - 321 if(strand_service_impl::enqueue(impl, h)) 289 + 30335 if(strand_service_impl::enqueue(*impl, c))
HITCBC 242 - 8 ex.post(strand_service_impl::make_invoker(impl).h_); 290 + 19588 strand_service_impl::post_invoker(impl, ex);
HITCBC 243   321 } 291   30335 }
244   292  
245   strand_service& 293   strand_service&
HITCBC 246   23 get_strand_service(execution_context& ctx) 294   11442 get_strand_service(execution_context& ctx)
247   { 295   {
HITCBC 248   23 return ctx.use_service<strand_service_impl>(); 296   11442 return ctx.use_service<strand_service_impl>();
249   } 297   }
250   298  
251   } // namespace detail 299   } // namespace detail
252   } // namespace capy 300   } // namespace capy
253   } // namespace boost 301   } // namespace boost