100.00% Lines (21/21) 100.00% Functions (5/5)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
  3 + // Copyright (c) 2026 Michael Vandeberg
3   // 4   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 5   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 7   //
7   // Official repository: https://github.com/cppalliance/capy 8   // Official repository: https://github.com/cppalliance/capy
8   // 9   //
9   10  
10   #ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP 11   #ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
11   #define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP 12   #define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
12   13  
  14 + #include <boost/capy/continuation.hpp>
13   #include <boost/capy/detail/config.hpp> 15   #include <boost/capy/detail/config.hpp>
14 - 16 + #include <boost/capy/ex/frame_allocator.hpp>
15 - #include <coroutine>  
16 - #include <cstddef>  
17 - #include <exception>  
18   17  
19   namespace boost { 18   namespace boost {
20   namespace capy { 19   namespace capy {
21   namespace detail { 20   namespace detail {
22   21  
23 - class strand_queue; 22 + /** Single-threaded intrusive FIFO of pending continuations.
24 -  
25 - //----------------------------------------------------------  
26 -  
27 - // Metadata stored before the coroutine frame  
28 - struct frame_prefix  
29 - {  
30 - frame_prefix* next;  
31 - strand_queue* queue;  
32 - std::size_t alloc_size;  
33 - };  
34 -  
35 - //----------------------------------------------------------  
36 -  
37 - /** Wrapper coroutine for strand queue dispatch operations.  
38 -  
39 - This coroutine wraps a target coroutine handle and resumes  
40 - it when dispatched. The wrapper ensures control returns to  
41 - the dispatch loop after the target suspends or completes.  
42 -  
43 - The promise contains an intrusive list node for queue  
44 - storage and supports a custom allocator that recycles  
45 - coroutine frames via a free list.  
46 - */  
47 - struct strand_op  
48 - {  
49 - struct promise_type  
50 - {  
51 - promise_type* next = nullptr;  
52 -  
53 - void*  
54 - operator new(  
55 - std::size_t size,  
56 - strand_queue& q,  
57 - std::coroutine_handle<void>);  
58 -  
59 - void  
60 - operator delete(void* p, std::size_t);  
61 -  
62 - strand_op  
DCB 63 - 322 get_return_object() noexcept  
64 - {  
DCB 65 - 322 return {std::coroutine_handle<promise_type>::from_promise(*this)};  
66 - }  
67 -  
68 - std::suspend_always  
DCB 69 - 322 initial_suspend() noexcept  
70 - {  
DCB 71 - 322 return {};  
72 - }  
73 -  
74 - std::suspend_always  
DCB 75 - 322 final_suspend() noexcept  
76 - {  
DCB 77 - 322 return {};  
78 - }  
79 -  
80 - void  
DCB 81 - 322 return_void() noexcept  
82 - {  
DCB 83 - 322 }  
84 -  
85 - void  
DUB 86 - unhandled_exception()  
87 - {  
DUB 88 - std::terminate();  
89 - }  
90 - };  
91 -  
92 - std::coroutine_handle<promise_type> h_;  
93 - };  
94 -  
95 - //----------------------------------------------------------  
96 -  
97 - /** Single-threaded dispatch queue for coroutine handles.  
98 -  
99 - This queue stores coroutine handles and resumes them  
100 - sequentially when dispatch() is called. Each pushed  
101 - handle is wrapped in a strand_op coroutine that ensures  
102 - control returns to the dispatch loop after the target  
103 - suspends or completes.  
104   23  
105 - The queue uses an intrusive singly-linked list through 24 + Links continuations directly through `continuation::next`, so
106 - the promise type to avoid separate node allocations. 25 + push() carries no per-item allocation.
107 - A free list recycles wrapper coroutine frames to reduce  
108 - allocation overhead during repeated push/dispatch cycles.  
109   26  
110   @par Thread Safety 27   @par Thread Safety
111 - This class is not thread-safe. All operations must be 28 + Not thread-safe. Caller must externally synchronize push() and
112 - called from a single thread. 29 + take_all(). dispatch_batch() does not touch queue state and may
  30 + run unlocked once the batch has been taken.
113   */ 31   */
114   class strand_queue 32   class strand_queue
115   { 33   {
116 - using promise_type = strand_op::promise_type; 34 + continuation* head_ = nullptr;
117 - 35 + continuation* tail_ = nullptr;
118 - promise_type* head_ = nullptr;  
119 - promise_type* tail_ = nullptr;  
120 - frame_prefix* free_list_ = nullptr;  
121 -  
122 - friend struct strand_op::promise_type;  
123 -  
124 - static  
125 - strand_op  
DCB 126 - 322 make_strand_op(  
127 - strand_queue& q,  
128 - std::coroutine_handle<void> target)  
129 - {  
130 - (void)q;  
131 - target.resume();  
132 - co_return;  
DCB 133 - 644 }  
134   36  
135   public: 37   public:
DCB 136 - 4009  
HITGIC 137   strand_queue() = default; 38   11442 strand_queue() = default;
138   strand_queue(strand_queue const&) = delete; 39   strand_queue(strand_queue const&) = delete;
139   strand_queue& operator=(strand_queue const&) = delete; 40   strand_queue& operator=(strand_queue const&) = delete;
140   41  
141 - /** Destructor. 42 + /** Returns true if there are no pending continuations. */
142 -  
143 - Destroys any pending wrappers without resuming them,  
144 - then frees all memory in the free list.  
145 - */  
DCB 146 - 4009 ~strand_queue()  
147 - {  
148 - // Destroy pending wrappers  
DCB 149 - 4009 while(head_)  
150 - {  
DUB 151 - promise_type* p = head_;  
DUB 152 - head_ = p->next;  
153 -  
DUB 154 - auto h = std::coroutine_handle<promise_type>::from_promise(*p);  
DUB 155 - h.destroy();  
156 - }  
157 -  
158 - // Free the free list memory  
DCB 159 - 4009 while(free_list_)  
160 - {  
DUB 161 - frame_prefix* prefix = free_list_;  
DUB 162 - free_list_ = prefix->next;  
DUB 163 - ::operator delete(prefix);  
164 - }  
DCB 165 - 4009 }  
166 -  
167 - /** Returns true if there are no pending operations.  
168 - */  
169   bool 43   bool
HITCBC 170   14 empty() const noexcept 44   20474 empty() const noexcept
171   { 45   {
HITCBC 172   14 return head_ == nullptr; 46   20474 return head_ == nullptr;
173   } 47   }
174   48  
175 - /** Push a coroutine handle to the queue. 49 + /** Push a continuation to the queue.
176 -  
177 - Creates a wrapper coroutine and appends it to the  
178 - queue. The wrapper will resume the target handle  
179 - when dispatch() processes it.  
180   50  
181 - @param h The coroutine handle to dispatch. 51 + @param c The continuation to enqueue; see `continuation`
  52 + for lifetime and aliasing requirements.
182   */ 53   */
183   void 54   void
HITCBC 184 - 322 push(std::coroutine_handle<void> h) 55 + 30340 push(continuation& c) noexcept
185   { 56   {
HITCBC 186 - 322 strand_op op = make_strand_op(*this, h); 57 + 30340 c.next = nullptr;
187 -  
DCB 188 - 322 promise_type* p = &op.h_.promise();  
DCB 189 - 322 p->next = nullptr;  
190 -  
HITCBC 191   322 if(tail_) 58   30340 if(tail_)
HITCBC 192 - 308 tail_->next = p; 59 + 9866 tail_->next = &c;
193   else 60   else
HITCBC 194 - 14 head_ = p; 61 + 20474 head_ = &c;
HITCBC 195 - 322 tail_ = p; 62 + 30340 tail_ = &c;
DCB 196 - 322 }  
197 -  
198 - /** Resume all queued coroutines in sequence.  
199 -  
200 - Processes each wrapper in FIFO order, resuming its  
201 - target coroutine. After each target suspends or  
202 - completes, the wrapper is destroyed and its frame  
203 - is added to the free list for reuse.  
204 -  
205 - Coroutines resumed during dispatch may push new  
206 - handles, which will also be processed in the same  
207 - dispatch call.  
208 -  
209 - @warning Not thread-safe. Do not call while another  
210 - thread may be calling push().  
211 - */  
212 - void  
213 - dispatch()  
214 - {  
215 - while(head_)  
216 - {  
217 - promise_type* p = head_;  
218 - head_ = p->next;  
219 - if(!head_)  
220 - tail_ = nullptr;  
221 -  
222 - auto h = std::coroutine_handle<promise_type>::from_promise(*p);  
223 - h.resume();  
224 - h.destroy();  
225 - }  
HITGIC 226   } 63   30340 }
227   64  
228   /** Batch of taken items for thread-safe dispatch. */ 65   /** Batch of taken items for thread-safe dispatch. */
229   struct taken_batch 66   struct taken_batch
230   { 67   {
231 - promise_type* head = nullptr; 68 + continuation* head = nullptr;
232 - promise_type* tail = nullptr; 69 + continuation* tail = nullptr;
233   }; 70   };
234   71  
235   /** Take all pending items atomically. 72   /** Take all pending items atomically.
236   73  
237 - Removes all items from the queue and returns them 74 + Removes all items from the queue and returns them as a
238 - as a batch. The queue is left empty. 75 + batch. The queue is left empty.
239   76  
240   @return The batch of taken items. 77   @return The batch of taken items.
241   */ 78   */
242   taken_batch 79   taken_batch
HITCBC 243   14 take_all() noexcept 80   20474 take_all() noexcept
244   { 81   {
HITCBC 245   14 taken_batch batch{head_, tail_}; 82   20474 taken_batch batch{head_, tail_};
HITCBC 246   14 head_ = tail_ = nullptr; 83   20474 head_ = tail_ = nullptr;
HITCBC 247   14 return batch; 84   20474 return batch;
248   } 85   }
249   86  
250 - /** Dispatch a batch of taken items. 87 + /** Resume each continuation in a taken batch.
  88 +
  89 + Advances past each node before resuming, since the
  90 + resumed coroutine may destroy the awaitable (and thus
  91 + the continuation) before control returns here.
251   92  
252   @param batch The batch to dispatch. 93   @param batch The batch to dispatch.
253   94  
254 - @note This is thread-safe w.r.t. push() because it doesn't 95 + @note Thread-safe with respect to push() because the queue
255 - access the queue's free_list_. Frames are deleted directly 96 + itself is not touched.
256 - rather than recycled.  
257   */ 97   */
258   static 98   static
259   void 99   void
HITCBC 260   14 dispatch_batch(taken_batch& batch) 100   20474 dispatch_batch(taken_batch& batch)
261   { 101   {
HITCBC 262   336 while(batch.head) 102   50814 while(batch.head)
263   { 103   {
HITCBC 264 - 322 promise_type* p = batch.head; 104 + 30340 continuation* c = batch.head;
HITCBC 265 - 322 batch.head = p->next; 105 + 30340 batch.head = c->next;
HITGIC 266 - 106 + 30340 safe_resume(c->h);
DCB 267 - 322 auto h = std::coroutine_handle<promise_type>::from_promise(*p);  
DCB 268 - 322 h.resume();  
269 - // Don't use h.destroy() - it would call operator delete which  
270 - // accesses the queue's free_list_ (race with push).  
271 - // Instead, manually free the frame without recycling.  
272 - // h.address() returns the frame base (what operator new returned).  
DCB 273 - 322 frame_prefix* prefix = static_cast<frame_prefix*>(h.address()) - 1;  
DCB 274 - 322 ::operator delete(prefix);  
275   } 107   }
HITCBC 276   14 batch.tail = nullptr; 108   20474 batch.tail = nullptr;
HITCBC 277   14 } 109   20474 }
278 -  
279 - //----------------------------------------------------------  
280 -  
281 - inline  
282 - void*  
283 - strand_op::promise_type::operator new(  
DCB 284 - 322 std::size_t size,  
285 - strand_queue& q,  
286 - std::coroutine_handle<void>)  
287 - {  
288 - // Total size includes prefix  
289 - std::size_t alloc_size = size + sizeof(frame_prefix);  
DCB 290 - 322 void* raw;  
291 -  
292 - // Try to reuse from free list  
293 - if(q.free_list_)  
DCB 294 - 322 {  
295 - frame_prefix* prefix = q.free_list_;  
DUB 296 - q.free_list_ = prefix->next;  
DUB 297 - raw = prefix;  
DUB 298 - }  
299 - else  
300 - {  
301 - raw = ::operator new(alloc_size);  
DCB 302 - 322 }  
303 -  
304 - // Initialize prefix  
305 - frame_prefix* prefix = static_cast<frame_prefix*>(raw);  
DCB 306 - 322 prefix->next = nullptr;  
DCB 307 - 322 prefix->queue = &q;  
DCB 308 - 322 prefix->alloc_size = alloc_size;  
DCB 309 - 322  
310 - // Return pointer AFTER the prefix (this is where coroutine frame goes)  
311 - return prefix + 1;  
DCB 312 - 322 }  
313 -  
314 - inline  
315 - void  
316 - strand_op::promise_type::operator delete(void* p, std::size_t)  
DUB 317 - {  
318 - // Calculate back to get the prefix  
319 - frame_prefix* prefix = static_cast<frame_prefix*>(p) - 1;  
DUB 320 -  
321 - // Add to free list  
322 - prefix->next = prefix->queue->free_list_;  
DUB 323 - prefix->queue->free_list_ = prefix;  
DUB 324 - }  
EUB 325   }; 110   };
326   111  
327   } // namespace detail 112   } // namespace detail
328   } // namespace capy 113   } // namespace capy
329   } // namespace boost 114   } // namespace boost
330   115  
331   #endif 116   #endif