95.61% Lines (109/114)
95.83% Functions (23/24)
| TLA | Baseline | Branch | ||||||
|---|---|---|---|---|---|---|---|---|
| Line | Hits | Code | Line | Hits | Code | |||
| 1 | // | 1 | // | |||||
| 2 | // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) | 2 | // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) | |||||
| 3 | // | 3 | // | |||||
| 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |||||
| 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |||||
| 6 | // | 6 | // | |||||
| 7 | // Official repository: https://github.com/cppalliance/capy | 7 | // Official repository: https://github.com/cppalliance/capy | |||||
| 8 | // | 8 | // | |||||
| 9 | 9 | |||||||
| 10 | - | #include "src/ex/detail/strand_queue.hpp" | 10 | + | #include "src/ex/detail/strand_impl.hpp" | |||
| 11 | #include <boost/capy/ex/detail/strand_service.hpp> | 11 | #include <boost/capy/ex/detail/strand_service.hpp> | |||||
| 12 | - | #include <atomic> | 12 | + | #include <boost/capy/continuation.hpp> | |||
| 13 | #include <coroutine> | 13 | #include <coroutine> | |||||
| 14 | - | #include <mutex> | 14 | + | #include <memory> | |||
| 15 | - | #include <thread> | ||||||
| 16 | #include <utility> | 15 | #include <utility> | |||||
| 17 | 16 | |||||||
| 18 | namespace boost { | 17 | namespace boost { | |||||
| 19 | namespace capy { | 18 | namespace capy { | |||||
| 20 | namespace detail { | 19 | namespace detail { | |||||
| 21 | 20 | |||||||
| 22 | - | //---------------------------------------------------------- | 21 | + | // Sentinel stored in invoker_frame_cache_ after shutdown to prevent | |||
| 23 | - | 22 | + | // in-flight invokers from repopulating a freed cache slot. | ||||
| 24 | - | /** Implementation state for a strand. | 23 | + | inline void* const kCacheClosed = reinterpret_cast<void*>(1); | |||
| 25 | - | |||||||
| 26 | - | Each strand_impl provides serialization for coroutines | ||||||
| 27 | - | dispatched through strands that share it. | ||||||
| 28 | - | */ | ||||||
| 29 | - | struct strand_impl | ||||||
| 30 | - | { | ||||||
| 31 | - | std::mutex mutex_; | ||||||
| 32 | - | strand_queue pending_; | ||||||
| 33 | - | bool locked_ = false; | ||||||
| 34 | - | std::atomic<std::thread::id> dispatch_thread_{}; | ||||||
| 35 | - | void* cached_frame_ = nullptr; | ||||||
| 36 | - | }; | ||||||
| 37 | - | |||||||
| 38 | - | //---------------------------------------------------------- | ||||||
| 39 | - | |||||||
| 40 | - | /** Invoker coroutine for strand dispatch. | ||||||
| 41 | - | |||||||
| 42 | - | Uses custom allocator to recycle frame - one allocation | ||||||
| 43 | - | per strand_impl lifetime, stored in trailer for recovery. | ||||||
| 44 | - | */ | ||||||
| 45 | - | struct strand_invoker | ||||||
| 46 | - | { | ||||||
| 47 | - | struct promise_type | ||||||
| 48 | - | { | ||||||
| DCB | 49 | - | 9 | void* operator new(std::size_t n, strand_impl& impl) | ||||
| 50 | - | { | ||||||
| DCB | 51 | - | 9 | constexpr auto A = alignof(strand_impl*); | ||||
| DCB | 52 | - | 9 | std::size_t padded = (n + A - 1) & ~(A - 1); | ||||
| DCB | 53 | - | 9 | std::size_t total = padded + sizeof(strand_impl*); | ||||
| 54 | - | |||||||
| DCB | 55 | - | 9 | void* p = impl.cached_frame_ | ||||
| DCB | 56 | - | 9 | ? std::exchange(impl.cached_frame_, nullptr) | ||||
| DCB | 57 | - | 7 | : ::operator new(total); | ||||
| 58 | - | |||||||
| 59 | - | // Trailer lets delete recover impl | ||||||
| DCB | 60 | - | 9 | *reinterpret_cast<strand_impl**>( | ||||
| DCB | 61 | - | 9 | static_cast<char*>(p) + padded) = &impl; | ||||
| DCB | 62 | - | 9 | return p; | ||||
| 63 | - | } | ||||||
| 64 | - | |||||||
| DCB | 65 | - | 9 | void operator delete(void* p, std::size_t n) noexcept | ||||
| 66 | - | { | ||||||
| DCB | 67 | - | 9 | constexpr auto A = alignof(strand_impl*); | ||||
| DCB | 68 | - | 9 | std::size_t padded = (n + A - 1) & ~(A - 1); | ||||
| 69 | - | |||||||
| DCB | 70 | - | 9 | auto* impl = *reinterpret_cast<strand_impl**>( | ||||
| 71 | - | static_cast<char*>(p) + padded); | ||||||
| 72 | - | |||||||
| DCB | 73 | - | 9 | if (!impl->cached_frame_) | ||||
| DCB | 74 | - | 9 | impl->cached_frame_ = p; | ||||
| 75 | - | else | ||||||
| DUB | 76 | - | ✗ | ::operator delete(p); | ||||
| DCB | 77 | - | 9 | } | ||||
| 78 | - | |||||||
| DCB | 79 | - | 9 | strand_invoker get_return_object() noexcept | ||||
| DCB | 80 | - | 9 | { return {std::coroutine_handle<promise_type>::from_promise(*this)}; } | ||||
| 81 | - | |||||||
| DCB | 82 | - | 9 | std::suspend_always initial_suspend() noexcept { return {}; } | ||||
| DCB | 83 | - | 9 | std::suspend_never final_suspend() noexcept { return {}; } | ||||
| DCB | 84 | - | 9 | void return_void() noexcept {} | ||||
| DUB | 85 | - | ✗ | void unhandled_exception() { std::terminate(); } | ||||
| 86 | - | }; | ||||||
| 87 | - | |||||||
| 88 | - | std::coroutine_handle<promise_type> h_; | ||||||
| 89 | - | }; | ||||||
| 90 | 24 | |||||||
| 91 | - | //---------------------------------------------------------- | 25 | + | /** Concrete strand_service. | |||
| 92 | 26 | |||||||
| 93 | - | /** Concrete implementation of strand_service. | 27 | + | Holds a shared mutex pool (193 entries), a linked list of live | |||
| 28 | + | impls (for shutdown traversal), and a single-slot invoker | ||||||
| 29 | + | coroutine frame cache shared across all strands of this service. | ||||||
| 94 | 30 | |||||||
| 95 | - | Holds the fixed pool of strand_impl objects. | 31 | + | The dispatch helpers (`enqueue`, `dispatch_pending`, etc.) are | |||
| 32 | + | public so the namespace-scope `make_invoker` coroutine and the | ||||||
| 33 | + | `strand_service` static methods can call them without friendship. | ||||||
| 96 | */ | 34 | */ | |||||
| 97 | class strand_service_impl : public strand_service | 35 | class strand_service_impl : public strand_service | |||||
| 98 | { | 36 | { | |||||
| 99 | - | static constexpr std::size_t num_impls = 211; | 37 | + | public: | |||
| 38 | + | static constexpr std::size_t num_mutexes = 193; | ||||||
| 100 | - | strand_impl impls_[num_impls]; | ||||||
| 101 | - | std::size_t salt_ = 0; | ||||||
| 102 | 39 | |||||||
| 103 | std::mutex mutex_; | 40 | std::mutex mutex_; | |||||
| 41 | + | std::size_t salt_ = 0; | ||||||
| 42 | + | std::shared_ptr<std::mutex> mutexes_[num_mutexes]; | ||||||
| 43 | + | intrusive_list<strand_impl> impl_list_; | ||||||
| 44 | + | std::atomic<void*> invoker_frame_cache_{nullptr}; | ||||||
| 104 | - | public: | ||||||
| 105 | 45 | |||||||
| 106 | explicit | 46 | explicit | |||||
| HITCBC | 107 | 19 | strand_service_impl(execution_context&) | 47 | 30 | strand_service_impl(execution_context&) | ||
| HITCBC | 108 | 4028 | { | 48 | 30 | { | ||
| HITCBC | 109 | 19 | } | 49 | 30 | } | ||
| 110 | 50 | |||||||
| 111 | - | strand_impl* | 51 | + | std::shared_ptr<strand_impl> | |||
| HITCBC | 112 | - | 23 | get_implementation() override | 52 | + | 11442 | create_implementation() override |
| 113 | { | 53 | { | |||||
| HITGNC | 54 | + | 11442 | auto new_impl = std::make_shared<strand_impl>(); | ||||
| 55 | + | |||||||
| DCB | 114 | - | 23 | std::size_t index = salt_++; | ||||
| DCB | 115 | - | 23 | index = index % num_impls; | ||||
| DCB | 116 | - | 23 | return &impls_[index]; | ||||
| DCB | 117 | - | 23 | } | ||||
| HITCBC | 118 | 23 | std::lock_guard<std::mutex> lock(mutex_); | 56 | 11442 | std::lock_guard<std::mutex> lock(mutex_); | ||
| 119 | 57 | |||||||
| HITGIC | 120 | - | protected: | 58 | + | 11442 | std::size_t s = salt_++; | |
| HITGIC | 121 | - | void | 59 | + | 11442 | std::size_t idx = reinterpret_cast<std::size_t>(new_impl.get()); | |
| HITCBC | 122 | - | 19 | shutdown() override | 60 | + | 11442 | idx += idx >> 3; |
| HITGIC | 123 | - | { | 61 | + | 11442 | idx ^= s + 0x9e3779b9 + (idx << 6) + (idx >> 2); | |
| HITCBC | 124 | - | 4028 | for(std::size_t i = 0; i < num_impls; ++i) | 62 | + | 11442 | idx %= num_mutexes; |
| HITGIC | 125 | - | { | 63 | + | 11442 | if(!mutexes_[idx]) | |
| HITCBC | 126 | - | 4009 | std::lock_guard<std::mutex> lock(impls_[i].mutex_); | 64 | + | 670 | mutexes_[idx] = std::make_shared<std::mutex>(); |
| HITCBC | 127 | - | 4009 | impls_[i].locked_ = true; | 65 | + | 11442 | new_impl->mutex_ = mutexes_[idx].get(); |
| 128 | 66 | |||||||
| HITCBC | 129 | - | 4009 | if(impls_[i].cached_frame_) | 67 | + | 11442 | impl_list_.push_back(new_impl.get()); |
| HITGIC | 130 | - | { | 68 | + | 11442 | new_impl->service_.store(this, std::memory_order_release); | |
| ECB | 131 | - | 7 | ::operator delete(impls_[i].cached_frame_); | 69 | + | ||
| HITCBC | 132 | - | 7 | impls_[i].cached_frame_ = nullptr; | 70 | + | 22884 | return new_impl; |
| 133 | - | } | ||||||
| DCB | 134 | - | 4009 | } | ||||
| HITCBC | 135 | 19 | } | 71 | 11442 | } | ||
| 136 | - | private: | ||||||
| 137 | 72 | |||||||
| 138 | static bool | 73 | static bool | |||||
| HITCBC | 139 | - | 322 | enqueue(strand_impl& impl, std::coroutine_handle<> h) | 74 | + | 30340 | enqueue(strand_impl& impl, continuation& c) |
| 140 | { | 75 | { | |||||
| HITCBC | 141 | - | 322 | std::lock_guard<std::mutex> lock(impl.mutex_); | 76 | + | 30340 | std::lock_guard<std::mutex> lock(*impl.mutex_); |
| HITCBC | 142 | - | 322 | impl.pending_.push(h); | 77 | + | 30340 | impl.pending_.push(c); |
| HITCBC | 143 | 322 | if(!impl.locked_) | 78 | 30340 | if(!impl.locked_) | ||
| 144 | { | 79 | { | |||||
| HITCBC | 145 | 9 | impl.locked_ = true; | 80 | 19593 | impl.locked_ = true; | ||
| HITCBC | 146 | 9 | return true; | 81 | 19593 | return true; | ||
| 147 | } | 82 | } | |||||
| HITCBC | 148 | 313 | return false; | 83 | 10747 | return false; | ||
| HITCBC | 149 | 322 | } | 84 | 30340 | } | ||
| 150 | 85 | |||||||
| 151 | static void | 86 | static void | |||||
| HITCBC | 152 | 14 | dispatch_pending(strand_impl& impl) | 87 | 20474 | dispatch_pending(strand_impl& impl) | ||
| 153 | { | 88 | { | |||||
| HITCBC | 154 | 14 | strand_queue::taken_batch batch; | 89 | 20474 | strand_queue::taken_batch batch; | ||
| 155 | { | 90 | { | |||||
| HITCBC | 156 | - | 14 | std::lock_guard<std::mutex> lock(impl.mutex_); | 91 | + | 20474 | std::lock_guard<std::mutex> lock(*impl.mutex_); |
| HITCBC | 157 | 14 | batch = impl.pending_.take_all(); | 92 | 20474 | batch = impl.pending_.take_all(); | ||
| HITCBC | 158 | 14 | } | 93 | 20474 | } | ||
| HITCBC | 159 | 14 | impl.pending_.dispatch_batch(batch); | 94 | 20474 | impl.pending_.dispatch_batch(batch); | ||
| HITCBC | 160 | 14 | } | 95 | 20474 | } | ||
| 161 | 96 | |||||||
| 162 | static bool | 97 | static bool | |||||
| HITCBC | 163 | 14 | try_unlock(strand_impl& impl) | 98 | 20474 | try_unlock(strand_impl& impl) | ||
| 164 | { | 99 | { | |||||
| HITCBC | 165 | - | 14 | std::lock_guard<std::mutex> lock(impl.mutex_); | 100 | + | 20474 | std::lock_guard<std::mutex> lock(*impl.mutex_); |
| HITCBC | 166 | 14 | if(impl.pending_.empty()) | 101 | 20474 | if(impl.pending_.empty()) | ||
| 167 | { | 102 | { | |||||
| HITCBC | 168 | 9 | impl.locked_ = false; | 103 | 19593 | impl.locked_ = false; | ||
| HITCBC | 169 | 9 | return true; | 104 | 19593 | return true; | ||
| 170 | } | 105 | } | |||||
| HITCBC | 171 | 5 | return false; | 106 | 881 | return false; | ||
| HITCBC | 172 | 14 | } | 107 | 20474 | } | ||
| 173 | 108 | |||||||
| 174 | static void | 109 | static void | |||||
| HITCBC | 175 | 14 | set_dispatch_thread(strand_impl& impl) noexcept | 110 | 20474 | set_dispatch_thread(strand_impl& impl) noexcept | ||
| 176 | { | 111 | { | |||||
| HITCBC | 177 | 14 | impl.dispatch_thread_.store(std::this_thread::get_id()); | 112 | 20474 | impl.dispatch_thread_.store(std::this_thread::get_id()); | ||
| HITCBC | 178 | 14 | } | 113 | 20474 | } | ||
| 179 | 114 | |||||||
| 180 | static void | 115 | static void | |||||
| HITCBC | 181 | 9 | clear_dispatch_thread(strand_impl& impl) noexcept | 116 | 19593 | clear_dispatch_thread(strand_impl& impl) noexcept | ||
| 182 | { | 117 | { | |||||
| HITCBC | 183 | 9 | impl.dispatch_thread_.store(std::thread::id{}); | 118 | 19593 | impl.dispatch_thread_.store(std::thread::id{}); | ||
| HITCBC | 184 | 9 | } | 119 | 19593 | } | ||
| 185 | 120 | |||||||
| 186 | - | // Loops until queue empty (aggressive). Alternative: per-batch fairness | 121 | + | // Defined below; needs strand_invoker complete. | |||
| 187 | - | // (repost after each batch to let other work run) - explore if starvation observed. | 122 | + | static void | |||
| 188 | - | static strand_invoker | 123 | + | post_invoker(std::shared_ptr<strand_impl> impl, executor_ref ex); | |||
| ECB | 189 | - | 9 | make_invoker(strand_impl& impl) | 124 | + | ||
| 125 | + | protected: | ||||||
| 126 | + | void | ||||||
| HITGNC | 127 | + | 30 | shutdown() override | ||||
| 190 | { | 128 | { | |||||
| HITGIC | 191 | - | strand_impl* p = &impl; | 129 | + | 30 | std::lock_guard<std::mutex> lock(mutex_); | |
| HITGIC | 192 | - | for(;;) | 130 | + | 30 | while(auto* p = impl_list_.pop_front()) | |
| 193 | { | 131 | { | |||||
| MISUIC | 194 | - | set_dispatch_thread(*p); | 132 | + | ✗ | std::lock_guard<std::mutex> impl_lock(*p->mutex_); | |
| MISUIC | 195 | - | dispatch_pending(*p); | 133 | + | ✗ | p->locked_ = true; | |
| MISUIC | 196 | - | if(try_unlock(*p)) | 134 | + | ✗ | p->service_.store(nullptr, std::memory_order_release); | |
| 197 | - | { | ||||||
| 198 | - | clear_dispatch_thread(*p); | ||||||
| 199 | - | co_return; | ||||||
| 200 | - | } | ||||||
| MISUIC | 201 | } | 135 | ✗ | } | |||
| 136 | + | |||||||
| HITGNC | 137 | + | 30 | void* fp = invoker_frame_cache_.exchange( | ||||
| 138 | + | kCacheClosed, std::memory_order_acq_rel); | ||||||
| HITGNC | 139 | + | 30 | if(fp) ::operator delete(fp); | ||||
| HITCBC | 202 | 18 | } | 140 | 30 | } | ||
| 141 | + | }; | ||||||
| 203 | 142 | |||||||
| 204 | - | friend class strand_service; | 143 | + | /** Invoker coroutine that drains a strand's pending queue. | |||
| 144 | + | |||||||
| 145 | + | Runs once the strand transitions from unlocked to locked. Holds | ||||||
| 146 | + | the impl alive via the coroutine parameter (a shared_ptr in the | ||||||
| 147 | + | coroutine frame), so user code may drop its strand handle while | ||||||
| 148 | + | the invoker is mid-flight. | ||||||
| 149 | + | |||||||
| 150 | + | The frame's allocator recycles a single per-service slot. The | ||||||
| 151 | + | trailer points at the service (lifetime: execution_context), | ||||||
| 152 | + | NOT the impl (lifetime: per-strand), so operator delete is | ||||||
| 153 | + | safe even after the impl has been destroyed. | ||||||
| 154 | + | */ | ||||||
| 155 | + | struct strand_invoker | ||||||
| 156 | + | { | ||||||
| 157 | + | struct promise_type | ||||||
| 158 | + | { | ||||||
| 159 | + | // Stored in the coroutine frame so its address is stable for | ||||||
| 160 | + | // posting to the inner executor. | ||||||
| 161 | + | continuation self_; | ||||||
| 162 | + | |||||||
| 163 | + | void* | ||||||
| HITGNC | 164 | + | 19593 | operator new( | ||||
| 165 | + | std::size_t n, | ||||||
| 166 | + | std::shared_ptr<strand_impl> const& impl) | ||||||
| 167 | + | { | ||||||
| HITGNC | 168 | + | 19593 | auto* svc = impl->service_.load(std::memory_order_acquire); | ||||
| HITGNC | 169 | + | 19593 | constexpr auto A = alignof(strand_service_impl*); | ||||
| HITGNC | 170 | + | 19593 | std::size_t padded = (n + A - 1) & ~(A - 1); | ||||
| HITGNC | 171 | + | 19593 | std::size_t total = padded + sizeof(strand_service_impl*); | ||||
| 172 | + | |||||||
| HITGNC | 173 | + | 19593 | void* p = svc->invoker_frame_cache_.exchange( | ||||
| 174 | + | nullptr, std::memory_order_acquire); | ||||||
| HITGNC | 175 | + | 19593 | if(!p || p == kCacheClosed) | ||||
| HITGNC | 176 | + | 6761 | p = ::operator new(total); | ||||
| 177 | + | |||||||
| HITGNC | 178 | + | 19593 | *reinterpret_cast<strand_service_impl**>( | ||||
| HITGNC | 179 | + | 19593 | static_cast<char*>(p) + padded) = svc; | ||||
| HITGNC | 180 | + | 19593 | return p; | ||||
| 181 | + | } | ||||||
| 182 | + | |||||||
| 183 | + | void | ||||||
| HITGNC | 184 | + | 19593 | operator delete(void* p, std::size_t n) noexcept | ||||
| 185 | + | { | ||||||
| HITGNC | 186 | + | 19593 | constexpr auto A = alignof(strand_service_impl*); | ||||
| HITGNC | 187 | + | 19593 | std::size_t padded = (n + A - 1) & ~(A - 1); | ||||
| HITGNC | 188 | + | 19593 | auto* svc = *reinterpret_cast<strand_service_impl**>( | ||||
| 189 | + | static_cast<char*>(p) + padded); | ||||||
| 190 | + | |||||||
| HITGNC | 191 | + | 19593 | void* expected = nullptr; | ||||
| HITGNC | 192 | + | 19593 | if(!svc->invoker_frame_cache_.compare_exchange_strong( | ||||
| 193 | + | expected, p, std::memory_order_release)) | ||||||
| HITGNC | 194 | + | 6744 | ::operator delete(p); | ||||
| HITGNC | 195 | + | 19593 | } | ||||
| 196 | + | |||||||
| 197 | + | strand_invoker | ||||||
| HITGNC | 198 | + | 19593 | get_return_object() noexcept | ||||
| 199 | + | { | ||||||
| HITGNC | 200 | + | 19593 | return {std::coroutine_handle<promise_type>::from_promise(*this)}; | ||||
| 201 | + | } | ||||||
| 202 | + | |||||||
| HITGNC | 203 | + | 19593 | std::suspend_always initial_suspend() noexcept { return {}; } | ||||
| HITGNC | 204 | + | 19593 | std::suspend_never final_suspend() noexcept { return {}; } | ||||
| HITGNC | 205 | + | 19593 | void return_void() noexcept {} | ||||
| MISUNC | 206 | + | ✗ | void unhandled_exception() { std::terminate(); } | ||||
| 207 | + | }; | ||||||
| 208 | + | |||||||
| 209 | + | std::coroutine_handle<promise_type> h_; | ||||||
| 205 | }; | 210 | }; | |||||
| 206 | 211 | |||||||
| 207 | - | //---------------------------------------------------------- | 212 | + | // The by-value parameter lives in the coroutine frame for the | |||
| 213 | + | // invoker's lifetime, keeping the impl alive past any user-side | ||||||
| 214 | + | // strand drop. | ||||||
| 215 | + | static | ||||||
| 216 | + | strand_invoker | ||||||
| HITGNC | 217 | + | 19593 | make_invoker(std::shared_ptr<strand_impl> impl) | ||||
| 218 | + | { | ||||||
| 219 | + | auto* p = impl.get(); | ||||||
| 220 | + | for(;;) | ||||||
| 221 | + | { | ||||||
| 222 | + | strand_service_impl::set_dispatch_thread(*p); | ||||||
| 223 | + | strand_service_impl::dispatch_pending(*p); | ||||||
| 224 | + | if(strand_service_impl::try_unlock(*p)) | ||||||
| 225 | + | { | ||||||
| 226 | + | strand_service_impl::clear_dispatch_thread(*p); | ||||||
| 227 | + | co_return; | ||||||
| 228 | + | } | ||||||
| 229 | + | } | ||||||
| HITGNC | 230 | + | 39186 | } | ||||
| 231 | + | |||||||
| 232 | + | void | ||||||
| HITGNC | 233 | + | 19593 | strand_service_impl::post_invoker( | ||||
| 234 | + | std::shared_ptr<strand_impl> impl, | ||||||
| 235 | + | executor_ref ex) | ||||||
| 236 | + | { | ||||||
| HITGNC | 237 | + | 19593 | auto invoker = make_invoker(std::move(impl)); | ||||
| HITGNC | 238 | + | 19593 | auto& self = invoker.h_.promise().self_; | ||||
| HITGNC | 239 | + | 19593 | self.h = invoker.h_; | ||||
| HITGNC | 240 | + | 19593 | ex.post(self); | ||||
| HITGNC | 241 | + | 19593 | } | ||||
| 242 | + | |||||||
| HITGNC | 243 | + | 22884 | strand_impl::~strand_impl() | ||||
| 244 | + | { | ||||||
| HITGNC | 245 | + | 11442 | auto* svc = service_.load(std::memory_order_acquire); | ||||
| HITGNC | 246 | + | 11442 | if(!svc) return; | ||||
| HITGNC | 247 | + | 11442 | std::lock_guard<std::mutex> lock(svc->mutex_); | ||||
| HITGNC | 248 | + | 11442 | svc->impl_list_.remove(this); | ||||
| HITGNC | 249 | + | 11442 | } | ||||
| 208 | 250 | |||||||
| HITCBC | 209 | 19 | strand_service:: | 251 | 30 | strand_service:: | ||
| HITCBC | 210 | 19 | strand_service() | 252 | 30 | strand_service() | ||
| HITCBC | 211 | 19 | : service() | 253 | 30 | : service() | ||
| 212 | { | 254 | { | |||||
| HITCBC | 213 | 19 | } | 255 | 30 | } | ||
| 214 | 256 | |||||||
| HITCBC | 215 | 19 | strand_service:: | 257 | 30 | strand_service:: | ||
| 216 | ~strand_service() = default; | 258 | ~strand_service() = default; | |||||
| 217 | 259 | |||||||
| 218 | bool | 260 | bool | |||||
| HITCBC | 219 | 2 | strand_service:: | 261 | 12 | strand_service:: | ||
| 220 | running_in_this_thread(strand_impl& impl) noexcept | 262 | running_in_this_thread(strand_impl& impl) noexcept | |||||
| 221 | { | 263 | { | |||||
| HITCBC | 222 | 2 | return impl.dispatch_thread_.load() == std::this_thread::get_id(); | 264 | 12 | return impl.dispatch_thread_.load() == std::this_thread::get_id(); | ||
| 223 | } | 265 | } | |||||
| 224 | 266 | |||||||
| 225 | std::coroutine_handle<> | 267 | std::coroutine_handle<> | |||||
| HITCBC | 226 | 1 | strand_service:: | 268 | 8 | strand_service:: | ||
| 227 | - | dispatch(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h) | 269 | + | dispatch( | |||
| 270 | + | std::shared_ptr<strand_impl> const& impl, | ||||||
| 271 | + | executor_ref ex, | ||||||
| 272 | + | continuation& c) | ||||||
| 228 | { | 273 | { | |||||
| HITCBC | 229 | - | 1 | if(running_in_this_thread(impl)) | 274 | + | 8 | if(running_in_this_thread(*impl)) |
| HITGBC | 230 | - | ✗ | return h; | 275 | + | 3 | return c.h; |
| 231 | 276 | |||||||
| HITCBC | 232 | - | 1 | if(strand_service_impl::enqueue(impl, h)) | 277 | + | 5 | if(strand_service_impl::enqueue(*impl, c)) |
| HITCBC | 233 | - | 1 | ex.post(strand_service_impl::make_invoker(impl).h_); | 278 | + | 5 | strand_service_impl::post_invoker(impl, ex); |
| HITCBC | 234 | 1 | return std::noop_coroutine(); | 279 | 5 | return std::noop_coroutine(); | ||
| 235 | } | 280 | } | |||||
| 236 | 281 | |||||||
| 237 | void | 282 | void | |||||
| HITCBC | 238 | 321 | strand_service:: | 283 | 30335 | strand_service:: | ||
| 239 | - | post(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h) | 284 | + | post( | |||
| 285 | + | std::shared_ptr<strand_impl> const& impl, | ||||||
| 286 | + | executor_ref ex, | ||||||
| 287 | + | continuation& c) | ||||||
| 240 | { | 288 | { | |||||
| HITCBC | 241 | - | 321 | if(strand_service_impl::enqueue(impl, h)) | 289 | + | 30335 | if(strand_service_impl::enqueue(*impl, c)) |
| HITCBC | 242 | - | 8 | ex.post(strand_service_impl::make_invoker(impl).h_); | 290 | + | 19588 | strand_service_impl::post_invoker(impl, ex); |
| HITCBC | 243 | 321 | } | 291 | 30335 | } | ||
| 244 | 292 | |||||||
| 245 | strand_service& | 293 | strand_service& | |||||
| HITCBC | 246 | 23 | get_strand_service(execution_context& ctx) | 294 | 11442 | get_strand_service(execution_context& ctx) | ||
| 247 | { | 295 | { | |||||
| HITCBC | 248 | 23 | return ctx.use_service<strand_service_impl>(); | 296 | 11442 | return ctx.use_service<strand_service_impl>(); | ||
| 249 | } | 297 | } | |||||
| 250 | 298 | |||||||
| 251 | } // namespace detail | 299 | } // namespace detail | |||||
| 252 | } // namespace capy | 300 | } // namespace capy | |||||
| 253 | } // namespace boost | 301 | } // namespace boost | |||||