100.00% Lines (139/139) 100.00% Functions (29/29)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // Copyright (c) 2026 Michael Vandeberg 3   // Copyright (c) 2026 Michael Vandeberg
4   // 4   //
5   // Distributed under the Boost Software License, Version 1.0. (See accompanying 5   // Distributed under the Boost Software License, Version 1.0. (See accompanying
6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7   // 7   //
8   // Official repository: https://github.com/boostorg/capy 8   // Official repository: https://github.com/boostorg/capy
9   // 9   //
10   10  
11   #include <boost/capy/ex/thread_pool.hpp> 11   #include <boost/capy/ex/thread_pool.hpp>
12 - #include <boost/capy/detail/intrusive.hpp> 12 + #include <boost/capy/continuation.hpp>
  13 + #include <boost/capy/detail/thread_local_ptr.hpp>
  14 + #include <boost/capy/ex/frame_allocator.hpp>
13   #include <boost/capy/test/thread_name.hpp> 15   #include <boost/capy/test/thread_name.hpp>
  16 + #include <algorithm>
14   #include <atomic> 17   #include <atomic>
15   #include <condition_variable> 18   #include <condition_variable>
16   #include <cstdio> 19   #include <cstdio>
17   #include <mutex> 20   #include <mutex>
18   #include <thread> 21   #include <thread>
19   #include <vector> 22   #include <vector>
20   23  
21   /* 24   /*
22   Thread pool implementation using a shared work queue. 25   Thread pool implementation using a shared work queue.
23   26  
24 - Work items are coroutine handles wrapped in intrusive list nodes, stored 27 + Work items are continuations linked via their intrusive next pointer,
25 - in a single queue protected by a mutex. Worker threads wait on a 28 + stored in a single queue protected by a mutex. No per-post heap
26 - condition_variable until work is available or stop is requested. 29 + allocation: the continuation is owned by the caller and linked
  30 + directly. Worker threads wait on a condition_variable until work
  31 + is available or stop is requested.
27   32  
28   Threads are started lazily on first post() via std::call_once to avoid 33   Threads are started lazily on first post() via std::call_once to avoid
29   spawning threads for pools that are constructed but never used. Each 34   spawning threads for pools that are constructed but never used. Each
30   thread is named with a configurable prefix plus index for debugger 35   thread is named with a configurable prefix plus index for debugger
31   visibility. 36   visibility.
32   37  
33 - Shutdown sequence: stop() sets the stop flag and notifies all threads, 38 + Work tracking: on_work_started/on_work_finished maintain an atomic
34 - then the destructor joins threads and destroys any remaining queued 39 + outstanding_work_ counter. join() blocks until this counter reaches
35 - work without executing it. 40 + zero, then signals workers to stop and joins threads.
  41 +
  42 + Two shutdown paths:
  43 + - join(): waits for outstanding work to drain, then stops workers.
  44 + - stop(): immediately signals workers to exit; queued work is abandoned.
  45 + - Destructor: stop() then join() (abandon + wait for threads).
36   */ 46   */
37   47  
38   namespace boost { 48   namespace boost {
39   namespace capy { 49   namespace capy {
40   50  
41   //------------------------------------------------------------------------------ 51   //------------------------------------------------------------------------------
42   52  
43   class thread_pool::impl 53   class thread_pool::impl
44   { 54   {
45 - struct work : detail::intrusive_queue<work>::node 55 + // Identifies the pool owning the current worker thread, or
46 - { 56 + // nullptr if the calling thread is not a pool worker. Checked
47 - std::coroutine_handle<> h_; 57 + // by dispatch() to decide between symmetric transfer (inline
  58 + // resume) and post.
  59 + static inline detail::thread_local_ptr<impl const> current_;
48   60  
ECB 49 - 127 explicit work(std::coroutine_handle<> h) noexcept 61 + // Intrusive queue of continuations via continuation::next.
ECB 50 - 127 : h_(h) 62 + // No per-post allocation: the continuation is owned by the caller.
51 - { 63 + continuation* head_ = nullptr;
ECB 52 - 127 } 64 + continuation* tail_ = nullptr;
53   65  
HITCBC 54 - 127 void run() 66 + 20398 void push(continuation* c) noexcept
55 - { 67 + {
HITCBC 56 - 127 auto h = h_; 68 + 20398 c->next = nullptr;
HITCBC 57 - 127 delete this; 69 + 20398 if(tail_)
HITCBC 58 - 127 h.resume(); 70 + 2321 tail_->next = c;
ECB 59 - 127 } 71 + else
HITGNC   72 + 18077 head_ = c;
HITGNC   73 + 20398 tail_ = c;
HITGNC   74 + 20398 }
60   75  
HITGBC 61 - void destroy() 76 + 20564 continuation* pop() noexcept
62 - { 77 + {
HITGBC 63 - delete this; 78 + 20564 if(!head_)
HITGBC 64 - } 79 + 166 return nullptr;
HITGIC 65 - }; 80 + 20398 continuation* c = head_;
HITGNC   81 + 20398 head_ = head_->next;
HITGNC   82 + 20398 if(!head_)
HITGNC   83 + 18077 tail_ = nullptr;
HITGNC   84 + 20398 return c;
  85 + }
  86 +
HITGNC   87 + 40056 bool empty() const noexcept
  88 + {
HITGNC   89 + 40056 return head_ == nullptr;
  90 + }
66   91  
67   std::mutex mutex_; 92   std::mutex mutex_;
68 - std::condition_variable cv_; 93 + std::condition_variable work_cv_;
69 - detail::intrusive_queue<work> q_; 94 + std::condition_variable done_cv_;
70   std::vector<std::thread> threads_; 95   std::vector<std::thread> threads_;
71 - std::atomic<bool> stop_{false}; 96 + std::atomic<std::size_t> outstanding_work_{0};
  97 + bool stop_{false};
  98 + bool joined_{false};
72   std::size_t num_threads_; 99   std::size_t num_threads_;
73   char thread_name_prefix_[13]{}; // 12 chars max + null terminator 100   char thread_name_prefix_[13]{}; // 12 chars max + null terminator
74   std::once_flag start_flag_; 101   std::once_flag start_flag_;
75   102  
76   public: 103   public:
HITCBC 77 - 61 ~impl() 104 + 166 ~impl() = default;
  105 +
  106 + bool
HITGNC   107 + 355 running_in_this_thread() const noexcept
78   { 108   {
HITCBC 79 - 61 stop(); 109 + 355 return current_.get() == this;
ECB 80 - 99 for(auto& t : threads_) 110 + }
DCB 81 - 38 if(t.joinable())  
DCB 82 - 38 t.join();  
83   111  
ECB 84 - 61 while(auto* w = q_.pop()) 112 + // Destroy abandoned coroutine frames. Must be called
EUB 85 - w->destroy(); 113 + // before execution_context::shutdown()/destroy() so
  114 + // that suspended-frame destructors (e.g. delay_awaitable
  115 + // calling timer_service::cancel()) run while services
  116 + // are still valid.
  117 + void
HITGNC   118 + 166 drain_abandoned() noexcept
  119 + {
HITGNC   120 + 362 while(auto* c = pop())
  121 + {
HITGNC   122 + 196 auto h = c->h;
HITGNC   123 + 196 if(h && h != std::noop_coroutine())
HITGNC   124 + 145 h.destroy();
HITGNC   125 + 196 }
HITCBC 86   61 } 126   166 }
87   127  
HITCBC 88   61 impl(std::size_t num_threads, std::string_view thread_name_prefix) 128   166 impl(std::size_t num_threads, std::string_view thread_name_prefix)
HITCBC 89   61 : num_threads_(num_threads) 129   166 : num_threads_(num_threads)
90   { 130   {
HITCBC 91   61 if(num_threads_ == 0) 131   166 if(num_threads_ == 0)
HITCBC 92 - 2 num_threads_ = std::thread::hardware_concurrency(); 132 + 4 num_threads_ = std::max(
HITCBC 93 - 61 if(num_threads_ == 0) 133 + 2 std::thread::hardware_concurrency(), 1u);
DUB 94 - num_threads_ = 1;  
95   134  
96   // Truncate prefix to 12 chars, leaving room for up to 3-digit index. 135   // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
HITCBC 97   61 auto n = thread_name_prefix.copy(thread_name_prefix_, 12); 136   166 auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
HITCBC 98   61 thread_name_prefix_[n] = '\0'; 137   166 thread_name_prefix_[n] = '\0';
HITCBC 99   61 } 138   166 }
100   139  
101   void 140   void
HITCBC 102 - 127 post(std::coroutine_handle<> h) 141 + 20398 post(continuation& c)
103   { 142   {
DCB 104 - 127 auto* w = new work(h);  
HITCBC 105   127 ensure_started(); 143   20398 ensure_started();
106   { 144   {
HITCBC 107   127 std::lock_guard<std::mutex> lock(mutex_); 145   20398 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 108 - 127 q_.push(w); 146 + 20398 push(&c);
HITCBC 109   127 } 147   20398 }
HITCBC 110 - 127 cv_.notify_one(); 148 + 20398 work_cv_.notify_one();
HITGNC   149 + 20398 }
  150 +
  151 + void
HITGNC   152 + 347 on_work_started() noexcept
  153 + {
HITGNC   154 + 347 outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
HITGNC   155 + 347 }
  156 +
  157 + void
HITGNC   158 + 347 on_work_finished() noexcept
  159 + {
HITGNC   160 + 347 if(outstanding_work_.fetch_sub(
HITGNC   161 + 347 1, std::memory_order_acq_rel) == 1)
  162 + {
HITGNC   163 + 87 std::lock_guard<std::mutex> lock(mutex_);
HITGNC   164 + 87 if(joined_ && !stop_)
HITGNC   165 + 4 stop_ = true;
HITGNC   166 + 87 done_cv_.notify_all();
HITGNC   167 + 87 work_cv_.notify_all();
HITGNC   168 + 87 }
HITGNC   169 + 347 }
  170 +
  171 + void
HITGNC   172 + 178 join() noexcept
  173 + {
  174 + {
HITGNC   175 + 178 std::unique_lock<std::mutex> lock(mutex_);
HITGNC   176 + 178 if(joined_)
HITGNC   177 + 12 return;
HITGNC   178 + 166 joined_ = true;
  179 +
HITGNC   180 + 166 if(outstanding_work_.load(
HITGNC   181 + 166 std::memory_order_acquire) == 0)
  182 + {
HITGNC   183 + 109 stop_ = true;
HITGNC   184 + 109 work_cv_.notify_all();
  185 + }
  186 + else
  187 + {
HITGNC   188 + 57 done_cv_.wait(lock, [this]{
HITGNC   189 + 62 return stop_;
  190 + });
  191 + }
HITGNC   192 + 178 }
  193 +
HITGNC   194 + 359 for(auto& t : threads_)
HITGNC   195 + 193 if(t.joinable())
HITGNC   196 + 193 t.join();
ECB 111   127 } 197   }
112   198  
113   void 199   void
HITCBC 114   61 stop() noexcept 200   168 stop() noexcept
115   { 201   {
ECB 116 - 61 stop_.store(true, std::memory_order_release); 202 + {
HITCBC 117 - 61 cv_.notify_all(); 203 + 168 std::lock_guard<std::mutex> lock(mutex_);
HITGNC   204 + 168 stop_ = true;
HITGNC   205 + 168 }
HITGNC   206 + 168 work_cv_.notify_all();
HITGNC   207 + 168 done_cv_.notify_all();
HITCBC 118   61 } 208   168 }
119   209  
120   private: 210   private:
121   void 211   void
HITCBC 122   127 ensure_started() 212   20398 ensure_started()
123   { 213   {
HITCBC 124   127 std::call_once(start_flag_, [this]{ 214   20398 std::call_once(start_flag_, [this]{
HITCBC 125   22 threads_.reserve(num_threads_); 215   109 threads_.reserve(num_threads_);
HITCBC 126   60 for(std::size_t i = 0; i < num_threads_; ++i) 216   302 for(std::size_t i = 0; i < num_threads_; ++i)
HITCBC 127   76 threads_.emplace_back([this, i]{ run(i); }); 217   386 threads_.emplace_back([this, i]{ run(i); });
HITCBC 128   22 }); 218   109 });
HITCBC 129   127 } 219   20398 }
130   220  
131   void 221   void
HITCBC 132   38 run(std::size_t index) 222   193 run(std::size_t index)
133   { 223   {
134   // Build name; set_current_thread_name truncates to platform limits. 224   // Build name; set_current_thread_name truncates to platform limits.
135   char name[16]; 225   char name[16];
HITCBC 136   38 std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index); 226   193 std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
HITCBC 137   38 set_current_thread_name(name); 227   193 set_current_thread_name(name);
138   228  
  229 + // Mark this thread as a worker of this pool so dispatch()
  230 + // can symmetric-transfer when called from within pool work.
  231 + struct scoped_pool
  232 + {
HITGNC   233 + 193 scoped_pool(impl const* p) noexcept { current_.set(p); }
HITGNC   234 + 193 ~scoped_pool() noexcept { current_.set(nullptr); }
HITGNC   235 + 193 } guard(this);
  236 +
139   for(;;) 237   for(;;)
140   { 238   {
HITCBC 141 - 165 work* w = nullptr; 239 + 20395 continuation* c = nullptr;
142   { 240   {
HITCBC 143   165 std::unique_lock<std::mutex> lock(mutex_); 241   20395 std::unique_lock<std::mutex> lock(mutex_);
HITCBC 144 - 165 cv_.wait(lock, [this]{ 242 + 20395 work_cv_.wait(lock, [this]{
HITCBC 145 - 321 return !q_.empty() || 243 + 59817 return !empty() ||
HITCBC 146 - 321 stop_.load(std::memory_order_acquire); 244 + 59817 stop_;
147   }); 245   });
HITCBC 148 - 165 if(stop_.load(std::memory_order_acquire) && q_.empty()) 246 + 20395 if(stop_)
HITCBC 149   76 return; 247   386 return;
HITCBC 150 - 127 w = q_.pop(); 248 + 20202 c = pop();
HITCBC 151   165 } 249   20395 }
HITCBC 152 - 127 if(w) 250 + 20202 if(c)
HITCBC 153 - 127 w->run(); 251 + 20202 safe_resume(c->h);
HITCBC 154   127 } 252   20202 }
HITGIC 155   } 253   193 }
156   }; 254   };
157   255  
158   //------------------------------------------------------------------------------ 256   //------------------------------------------------------------------------------
159   257  
HITCBC 160   61 thread_pool:: 258   166 thread_pool::
161   ~thread_pool() 259   ~thread_pool()
162   { 260   {
HITGNC   261 + 166 impl_->stop();
HITGNC   262 + 166 impl_->join();
HITGNC   263 + 166 impl_->drain_abandoned();
HITCBC 163   61 shutdown(); 264   166 shutdown();
HITCBC 164   61 destroy(); 265   166 destroy();
HITCBC 165   61 delete impl_; 266   166 delete impl_;
HITCBC 166   61 } 267   166 }
167   268  
HITCBC 168   61 thread_pool:: 269   166 thread_pool::
HITCBC 169   61 thread_pool(std::size_t num_threads, std::string_view thread_name_prefix) 270   166 thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
HITCBC 170   61 : impl_(new impl(num_threads, thread_name_prefix)) 271   166 : impl_(new impl(num_threads, thread_name_prefix))
171   { 272   {
HITCBC 172   61 this->set_frame_allocator(std::allocator<void>{}); 273   166 this->set_frame_allocator(std::allocator<void>{});
HITCBC 173   61 } 274   166 }
174   275  
175   void 276   void
HITGBC 176   thread_pool:: 277   12 thread_pool::
  278 + join() noexcept
  279 + {
HITGNC   280 + 12 impl_->join();
HITGNC   281 + 12 }
  282 +
  283 + void
HITGNC   284 + 2 thread_pool::
177   stop() noexcept 285   stop() noexcept
178   { 286   {
HITGBC 179   impl_->stop(); 287   2 impl_->stop();
HITGBC 180   } 288   2 }
181   289  
182   //------------------------------------------------------------------------------ 290   //------------------------------------------------------------------------------
183   291  
  292 + thread_pool::executor_type
HITGNC   293 + 11580 thread_pool::
  294 + get_executor() const noexcept
  295 + {
HITGNC   296 + 11580 return executor_type(
HITGNC   297 + 11580 const_cast<thread_pool&>(*this));
  298 + }
  299 +
184   void 300   void
HITCBC 185   127 thread_pool::executor_type:: 301   347 thread_pool::executor_type::
186 - post(std::coroutine_handle<> h) const 302 + on_work_started() const noexcept
187   { 303   {
HITCBC 188 - 127 pool_->impl_->post(h); 304 + 347 pool_->impl_->on_work_started();
HITGNC   305 + 347 }
  306 +
  307 + void
HITGNC   308 + 347 thread_pool::executor_type::
  309 + on_work_finished() const noexcept
  310 + {
HITGNC   311 + 347 pool_->impl_->on_work_finished();
HITGNC   312 + 347 }
  313 +
  314 + void
HITGNC   315 + 20056 thread_pool::executor_type::
  316 + post(continuation& c) const
  317 + {
HITGNC   318 + 20056 pool_->impl_->post(c);
HITGNC   319 + 20056 }
  320 +
  321 + std::coroutine_handle<>
HITGNC   322 + 355 thread_pool::executor_type::
  323 + dispatch(continuation& c) const
  324 + {
HITGNC   325 + 355 if(pool_->impl_->running_in_this_thread())
HITGNC   326 + 13 return c.h;
HITGNC   327 + 342 pool_->impl_->post(c);
HITGNC   328 + 342 return std::noop_coroutine();
ECB 189   127 } 329   }
190   330  
191   } // capy 331   } // capy
192   } // boost 332   } // boost