91.11% Lines (123/135) 96.77% Functions (30/31)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/capy 7   // Official repository: https://github.com/cppalliance/capy
8   // 8   //
9   9  
10   #ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP 10   #ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
11   #define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP 11   #define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
12   12  
13   #include <boost/capy/detail/config.hpp> 13   #include <boost/capy/detail/config.hpp>
14   #include <boost/capy/detail/await_suspend_helper.hpp> 14   #include <boost/capy/detail/await_suspend_helper.hpp>
15   #include <boost/capy/buffers.hpp> 15   #include <boost/capy/buffers.hpp>
16 - #include <boost/capy/buffers/buffer_array.hpp> 16 + #include <boost/capy/detail/buffer_array.hpp>
17   #include <boost/capy/buffers/buffer_param.hpp> 17   #include <boost/capy/buffers/buffer_param.hpp>
18   #include <boost/capy/concept/io_awaitable.hpp> 18   #include <boost/capy/concept/io_awaitable.hpp>
19   #include <boost/capy/concept/read_source.hpp> 19   #include <boost/capy/concept/read_source.hpp>
20   #include <boost/capy/ex/io_env.hpp> 20   #include <boost/capy/ex/io_env.hpp>
21   #include <boost/capy/io_result.hpp> 21   #include <boost/capy/io_result.hpp>
22   #include <boost/capy/io_task.hpp> 22   #include <boost/capy/io_task.hpp>
23   23  
24   #include <concepts> 24   #include <concepts>
25   #include <coroutine> 25   #include <coroutine>
26   #include <cstddef> 26   #include <cstddef>
  27 + #include <exception>
27   #include <new> 28   #include <new>
28   #include <span> 29   #include <span>
29   #include <stop_token> 30   #include <stop_token>
30   #include <system_error> 31   #include <system_error>
31   #include <utility> 32   #include <utility>
32   33  
33   namespace boost { 34   namespace boost {
34   namespace capy { 35   namespace capy {
35   36  
36   /** Type-erased wrapper for any ReadSource. 37   /** Type-erased wrapper for any ReadSource.
37   38  
38   This class provides type erasure for any type satisfying the 39   This class provides type erasure for any type satisfying the
39   @ref ReadSource concept, enabling runtime polymorphism for 40   @ref ReadSource concept, enabling runtime polymorphism for
40   source read operations. It uses cached awaitable storage to achieve 41   source read operations. It uses cached awaitable storage to achieve
41   zero steady-state allocation after construction. 42   zero steady-state allocation after construction.
42   43  
43   The wrapper supports two construction modes: 44   The wrapper supports two construction modes:
44   - **Owning**: Pass by value to transfer ownership. The wrapper 45   - **Owning**: Pass by value to transfer ownership. The wrapper
45   allocates storage and owns the source. 46   allocates storage and owns the source.
46   - **Reference**: Pass a pointer to wrap without ownership. The 47   - **Reference**: Pass a pointer to wrap without ownership. The
47   pointed-to source must outlive this wrapper. 48   pointed-to source must outlive this wrapper.
48   49  
49   @par Awaitable Preallocation 50   @par Awaitable Preallocation
50   The constructor preallocates storage for the type-erased awaitable. 51   The constructor preallocates storage for the type-erased awaitable.
51   This reserves all virtual address space at server startup 52   This reserves all virtual address space at server startup
52   so memory usage can be measured up front, rather than 53   so memory usage can be measured up front, rather than
53   allocating piecemeal as traffic arrives. 54   allocating piecemeal as traffic arrives.
54   55  
55   @par Immediate Completion 56   @par Immediate Completion
56   Operations complete immediately without suspending when the 57   Operations complete immediately without suspending when the
57   buffer sequence is empty, or when the underlying source's 58   buffer sequence is empty, or when the underlying source's
58   awaitable reports readiness via `await_ready`. 59   awaitable reports readiness via `await_ready`.
59   60  
60   @par Thread Safety 61   @par Thread Safety
61   Not thread-safe. Concurrent operations on the same wrapper 62   Not thread-safe. Concurrent operations on the same wrapper
62   are undefined behavior. 63   are undefined behavior.
63   64  
64   @par Example 65   @par Example
65   @code 66   @code
66   // Owning - takes ownership of the source 67   // Owning - takes ownership of the source
67   any_read_source rs(some_source{args...}); 68   any_read_source rs(some_source{args...});
68   69  
69   // Reference - wraps without ownership 70   // Reference - wraps without ownership
70   some_source source; 71   some_source source;
71   any_read_source rs(&source); 72   any_read_source rs(&source);
72   73  
73   mutable_buffer buf(data, size); 74   mutable_buffer buf(data, size);
74   auto [ec, n] = co_await rs.read(std::span(&buf, 1)); 75   auto [ec, n] = co_await rs.read(std::span(&buf, 1));
75   @endcode 76   @endcode
76   77  
77   @see any_read_stream, ReadSource 78   @see any_read_stream, ReadSource
78   */ 79   */
79   class any_read_source 80   class any_read_source
80   { 81   {
81   struct vtable; 82   struct vtable;
82   struct awaitable_ops; 83   struct awaitable_ops;
83   84  
84   template<ReadSource S> 85   template<ReadSource S>
85   struct vtable_for_impl; 86   struct vtable_for_impl;
86   87  
87   void* source_ = nullptr; 88   void* source_ = nullptr;
88   vtable const* vt_ = nullptr; 89   vtable const* vt_ = nullptr;
89   void* cached_awaitable_ = nullptr; 90   void* cached_awaitable_ = nullptr;
90   void* storage_ = nullptr; 91   void* storage_ = nullptr;
91   awaitable_ops const* active_ops_ = nullptr; 92   awaitable_ops const* active_ops_ = nullptr;
92   93  
93   public: 94   public:
94   /** Destructor. 95   /** Destructor.
95   96  
96   Destroys the owned source (if any) and releases the cached 97   Destroys the owned source (if any) and releases the cached
97   awaitable storage. 98   awaitable storage.
98   */ 99   */
99   ~any_read_source(); 100   ~any_read_source();
100   101  
101 - /** Default constructor. 102 + /** Construct a default instance.
102   103  
103   Constructs an empty wrapper. Operations on a default-constructed 104   Constructs an empty wrapper. Operations on a default-constructed
104   wrapper result in undefined behavior. 105   wrapper result in undefined behavior.
105   */ 106   */
106   any_read_source() = default; 107   any_read_source() = default;
107   108  
108   /** Non-copyable. 109   /** Non-copyable.
109   110  
110   The awaitable cache is per-instance and cannot be shared. 111   The awaitable cache is per-instance and cannot be shared.
111   */ 112   */
112   any_read_source(any_read_source const&) = delete; 113   any_read_source(any_read_source const&) = delete;
113   any_read_source& operator=(any_read_source const&) = delete; 114   any_read_source& operator=(any_read_source const&) = delete;
114   115  
115 - /** Move constructor. 116 + /** Construct by moving.
116   117  
117   Transfers ownership of the wrapped source (if owned) and 118   Transfers ownership of the wrapped source (if owned) and
118   cached awaitable storage from `other`. After the move, `other` is 119   cached awaitable storage from `other`. After the move, `other` is
119   in a default-constructed state. 120   in a default-constructed state.
120   121  
121   @param other The wrapper to move from. 122   @param other The wrapper to move from.
122   */ 123   */
HITCBC 123   1 any_read_source(any_read_source&& other) noexcept 124   1 any_read_source(any_read_source&& other) noexcept
HITCBC 124   1 : source_(std::exchange(other.source_, nullptr)) 125   1 : source_(std::exchange(other.source_, nullptr))
HITCBC 125   1 , vt_(std::exchange(other.vt_, nullptr)) 126   1 , vt_(std::exchange(other.vt_, nullptr))
HITCBC 126   1 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr)) 127   1 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
HITCBC 127   1 , storage_(std::exchange(other.storage_, nullptr)) 128   1 , storage_(std::exchange(other.storage_, nullptr))
HITCBC 128   1 , active_ops_(std::exchange(other.active_ops_, nullptr)) 129   1 , active_ops_(std::exchange(other.active_ops_, nullptr))
129   { 130   {
HITCBC 130   1 } 131   1 }
131   132  
132 - /** Move assignment operator. 133 + /** Assign by moving.
133   134  
134   Destroys any owned source and releases existing resources, 135   Destroys any owned source and releases existing resources,
135   then transfers ownership from `other`. 136   then transfers ownership from `other`.
136   137  
137   @param other The wrapper to move from. 138   @param other The wrapper to move from.
138   @return Reference to this wrapper. 139   @return Reference to this wrapper.
139   */ 140   */
140   any_read_source& 141   any_read_source&
141   operator=(any_read_source&& other) noexcept; 142   operator=(any_read_source&& other) noexcept;
142   143  
143   /** Construct by taking ownership of a ReadSource. 144   /** Construct by taking ownership of a ReadSource.
144   145  
145   Allocates storage and moves the source into this wrapper. 146   Allocates storage and moves the source into this wrapper.
146   The wrapper owns the source and will destroy it. 147   The wrapper owns the source and will destroy it.
147   148  
148   @param s The source to take ownership of. 149   @param s The source to take ownership of.
149   */ 150   */
150   template<ReadSource S> 151   template<ReadSource S>
151   requires (!std::same_as<std::decay_t<S>, any_read_source>) 152   requires (!std::same_as<std::decay_t<S>, any_read_source>)
152   any_read_source(S s); 153   any_read_source(S s);
153   154  
154   /** Construct by wrapping a ReadSource without ownership. 155   /** Construct by wrapping a ReadSource without ownership.
155   156  
156   Wraps the given source by pointer. The source must remain 157   Wraps the given source by pointer. The source must remain
157   valid for the lifetime of this wrapper. 158   valid for the lifetime of this wrapper.
158   159  
159   @param s Pointer to the source to wrap. 160   @param s Pointer to the source to wrap.
160   */ 161   */
161   template<ReadSource S> 162   template<ReadSource S>
162   any_read_source(S* s); 163   any_read_source(S* s);
163   164  
164   /** Check if the wrapper contains a valid source. 165   /** Check if the wrapper contains a valid source.
165   166  
166   @return `true` if wrapping a source, `false` if default-constructed 167   @return `true` if wrapping a source, `false` if default-constructed
167   or moved-from. 168   or moved-from.
168   */ 169   */
169   bool 170   bool
HITCBC 170   27 has_value() const noexcept 171   27 has_value() const noexcept
171   { 172   {
HITCBC 172   27 return source_ != nullptr; 173   27 return source_ != nullptr;
173   } 174   }
174   175  
175   /** Check if the wrapper contains a valid source. 176   /** Check if the wrapper contains a valid source.
176   177  
177   @return `true` if wrapping a source, `false` if default-constructed 178   @return `true` if wrapping a source, `false` if default-constructed
178   or moved-from. 179   or moved-from.
179   */ 180   */
180   explicit 181   explicit
HITCBC 181   8 operator bool() const noexcept 182   8 operator bool() const noexcept
182   { 183   {
HITCBC 183   8 return has_value(); 184   8 return has_value();
184   } 185   }
185   186  
186   /** Initiate a partial read operation. 187   /** Initiate a partial read operation.
187   188  
188 - Reads one or more bytes into the provided buffer sequence. 189 + Attempt to read up to `buffer_size( buffers )` bytes into
189 - May fill less than the full sequence. 190 + the provided buffer sequence. May fill less than the
  191 + full sequence.
190   192  
191   @param buffers The buffer sequence to read into. 193   @param buffers The buffer sequence to read into.
192   194  
193 - @return An awaitable yielding `(error_code,std::size_t)`. 195 + @return An awaitable that await-returns `(error_code,std::size_t)`.
194   196  
195   @par Immediate Completion 197   @par Immediate Completion
196   The operation completes immediately without suspending 198   The operation completes immediately without suspending
197   the calling coroutine when: 199   the calling coroutine when:
198   @li The buffer sequence is empty, returning `{error_code{}, 0}`. 200   @li The buffer sequence is empty, returning `{error_code{}, 0}`.
199   @li The underlying source's awaitable reports immediate 201   @li The underlying source's awaitable reports immediate
200   readiness via `await_ready`. 202   readiness via `await_ready`.
201   203  
202   @note This is a partial operation and may not process the 204   @note This is a partial operation and may not process the
203   entire buffer sequence. Use @ref read for guaranteed 205   entire buffer sequence. Use @ref read for guaranteed
204   complete transfer. 206   complete transfer.
205   207  
206   @par Preconditions 208   @par Preconditions
207   The wrapper must contain a valid source (`has_value() == true`). 209   The wrapper must contain a valid source (`has_value() == true`).
208   The caller must not call this function again after a prior 210   The caller must not call this function again after a prior
209   call returned an error (including EOF). 211   call returned an error (including EOF).
210   */ 212   */
211   template<MutableBufferSequence MB> 213   template<MutableBufferSequence MB>
212   auto 214   auto
213   read_some(MB buffers); 215   read_some(MB buffers);
214   216  
215   /** Initiate a complete read operation. 217   /** Initiate a complete read operation.
216   218  
217   Reads data into the provided buffer sequence by forwarding 219   Reads data into the provided buffer sequence by forwarding
218   to the underlying source's `read` operation. Large buffer 220   to the underlying source's `read` operation. Large buffer
219   sequences are processed in windows, with each window 221   sequences are processed in windows, with each window
220   forwarded as a separate `read` call to the underlying source. 222   forwarded as a separate `read` call to the underlying source.
221   The operation completes when the entire buffer sequence is 223   The operation completes when the entire buffer sequence is
222   filled, end-of-file is reached, or an error occurs. 224   filled, end-of-file is reached, or an error occurs.
223   225  
224   @param buffers The buffer sequence to read into. 226   @param buffers The buffer sequence to read into.
225   227  
226 - @return An awaitable yielding `(error_code,std::size_t)`. 228 + @return An awaitable that await-returns `(error_code,std::size_t)`.
227   229  
228   @par Immediate Completion 230   @par Immediate Completion
229   The operation completes immediately without suspending 231   The operation completes immediately without suspending
230   the calling coroutine when: 232   the calling coroutine when:
231   @li The buffer sequence is empty, returning `{error_code{}, 0}`. 233   @li The buffer sequence is empty, returning `{error_code{}, 0}`.
232   @li The underlying source's `read` awaitable reports 234   @li The underlying source's `read` awaitable reports
233   immediate readiness via `await_ready`. 235   immediate readiness via `await_ready`.
234   236  
235   @par Postconditions 237   @par Postconditions
236   Exactly one of the following is true on return: 238   Exactly one of the following is true on return:
237   @li **Success**: `!ec` and `n == buffer_size(buffers)`. 239   @li **Success**: `!ec` and `n == buffer_size(buffers)`.
238   The entire buffer was filled. 240   The entire buffer was filled.
239   @li **End-of-stream or Error**: `ec` and `n` indicates 241   @li **End-of-stream or Error**: `ec` and `n` indicates
240   the number of bytes transferred before the failure. 242   the number of bytes transferred before the failure.
241   243  
242   @par Preconditions 244   @par Preconditions
243   The wrapper must contain a valid source (`has_value() == true`). 245   The wrapper must contain a valid source (`has_value() == true`).
244   The caller must not call this function again after a prior 246   The caller must not call this function again after a prior
245   call returned an error (including EOF). 247   call returned an error (including EOF).
246   */ 248   */
247   template<MutableBufferSequence MB> 249   template<MutableBufferSequence MB>
248   io_task<std::size_t> 250   io_task<std::size_t>
249   read(MB buffers); 251   read(MB buffers);
250   252  
251   protected: 253   protected:
252   /** Rebind to a new source after move. 254   /** Rebind to a new source after move.
253   255  
254   Updates the internal pointer to reference a new source object. 256   Updates the internal pointer to reference a new source object.
255   Used by owning wrappers after move assignment when the owned 257   Used by owning wrappers after move assignment when the owned
256   object has moved to a new location. 258   object has moved to a new location.
257   259  
258   @param new_source The new source to bind to. Must be the same 260   @param new_source The new source to bind to. Must be the same
259   type as the original source. 261   type as the original source.
260   262  
261   @note Terminates if called with a source of different type 263   @note Terminates if called with a source of different type
262   than the original. 264   than the original.
263   */ 265   */
264   template<ReadSource S> 266   template<ReadSource S>
265   void 267   void
266   rebind(S& new_source) noexcept 268   rebind(S& new_source) noexcept
267   { 269   {
268   if(vt_ != &vtable_for_impl<S>::value) 270   if(vt_ != &vtable_for_impl<S>::value)
269   std::terminate(); 271   std::terminate();
270   source_ = &new_source; 272   source_ = &new_source;
271   } 273   }
272   274  
273   private: 275   private:
274   auto 276   auto
275   read_(std::span<mutable_buffer const> buffers); 277   read_(std::span<mutable_buffer const> buffers);
276   }; 278   };
277 - //----------------------------------------------------------  
278 -  
279   279  
280   // ordered by call sequence for cache line coherence 280   // ordered by call sequence for cache line coherence
281   struct any_read_source::awaitable_ops 281   struct any_read_source::awaitable_ops
282   { 282   {
283   bool (*await_ready)(void*); 283   bool (*await_ready)(void*);
284   std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*); 284   std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
285   io_result<std::size_t> (*await_resume)(void*); 285   io_result<std::size_t> (*await_resume)(void*);
286   void (*destroy)(void*) noexcept; 286   void (*destroy)(void*) noexcept;
287   }; 287   };
288   288  
289   // ordered by call frequency for cache line coherence 289   // ordered by call frequency for cache line coherence
290   struct any_read_source::vtable 290   struct any_read_source::vtable
291   { 291   {
292   awaitable_ops const* (*construct_read_some_awaitable)( 292   awaitable_ops const* (*construct_read_some_awaitable)(
293   void* source, 293   void* source,
294   void* storage, 294   void* storage,
295   std::span<mutable_buffer const> buffers); 295   std::span<mutable_buffer const> buffers);
296   awaitable_ops const* (*construct_read_awaitable)( 296   awaitable_ops const* (*construct_read_awaitable)(
297   void* source, 297   void* source,
298   void* storage, 298   void* storage,
299   std::span<mutable_buffer const> buffers); 299   std::span<mutable_buffer const> buffers);
300   std::size_t awaitable_size; 300   std::size_t awaitable_size;
301   std::size_t awaitable_align; 301   std::size_t awaitable_align;
302   void (*destroy)(void*) noexcept; 302   void (*destroy)(void*) noexcept;
303   }; 303   };
304   304  
305   template<ReadSource S> 305   template<ReadSource S>
306   struct any_read_source::vtable_for_impl 306   struct any_read_source::vtable_for_impl
307   { 307   {
308   using ReadSomeAwaitable = decltype(std::declval<S&>().read_some( 308   using ReadSomeAwaitable = decltype(std::declval<S&>().read_some(
309   std::span<mutable_buffer const>{})); 309   std::span<mutable_buffer const>{}));
310   using ReadAwaitable = decltype(std::declval<S&>().read( 310   using ReadAwaitable = decltype(std::declval<S&>().read(
311   std::span<mutable_buffer const>{})); 311   std::span<mutable_buffer const>{}));
312   312  
313   static void 313   static void
HITCBC 314   6 do_destroy_impl(void* source) noexcept 314   6 do_destroy_impl(void* source) noexcept
315   { 315   {
HITCBC 316   6 static_cast<S*>(source)->~S(); 316   6 static_cast<S*>(source)->~S();
HITCBC 317   6 } 317   6 }
318   318  
319   static awaitable_ops const* 319   static awaitable_ops const*
HITCBC 320   52 construct_read_some_awaitable_impl( 320   52 construct_read_some_awaitable_impl(
321   void* source, 321   void* source,
322   void* storage, 322   void* storage,
323   std::span<mutable_buffer const> buffers) 323   std::span<mutable_buffer const> buffers)
324   { 324   {
HITCBC 325   52 auto& s = *static_cast<S*>(source); 325   52 auto& s = *static_cast<S*>(source);
HITCBC 326   52 ::new(storage) ReadSomeAwaitable(s.read_some(buffers)); 326   52 ::new(storage) ReadSomeAwaitable(s.read_some(buffers));
327   327  
328   static constexpr awaitable_ops ops = { 328   static constexpr awaitable_ops ops = {
HITCBC 329   52 +[](void* p) { 329   52 +[](void* p) {
HITCBC 330   52 return static_cast<ReadSomeAwaitable*>(p)->await_ready(); 330   52 return static_cast<ReadSomeAwaitable*>(p)->await_ready();
331   }, 331   },
HITCBC 332   2 +[](void* p, std::coroutine_handle<> h, io_env const* env) { 332   2 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
HITCBC 333   2 return detail::call_await_suspend( 333   2 return detail::call_await_suspend(
HITCBC 334   2 static_cast<ReadSomeAwaitable*>(p), h, env); 334   2 static_cast<ReadSomeAwaitable*>(p), h, env);
335   }, 335   },
HITCBC 336   50 +[](void* p) { 336   50 +[](void* p) {
HITCBC 337   50 return static_cast<ReadSomeAwaitable*>(p)->await_resume(); 337   50 return static_cast<ReadSomeAwaitable*>(p)->await_resume();
338   }, 338   },
HITCBC 339   54 +[](void* p) noexcept { 339   54 +[](void* p) noexcept {
HITCBC 340   2 static_cast<ReadSomeAwaitable*>(p)->~ReadSomeAwaitable(); 340   2 static_cast<ReadSomeAwaitable*>(p)->~ReadSomeAwaitable();
341   } 341   }
342   }; 342   };
HITCBC 343   52 return &ops; 343   52 return &ops;
344   } 344   }
345   345  
346   static awaitable_ops const* 346   static awaitable_ops const*
HITCBC 347   116 construct_read_awaitable_impl( 347   116 construct_read_awaitable_impl(
348   void* source, 348   void* source,
349   void* storage, 349   void* storage,
350   std::span<mutable_buffer const> buffers) 350   std::span<mutable_buffer const> buffers)
351   { 351   {
HITCBC 352   116 auto& s = *static_cast<S*>(source); 352   116 auto& s = *static_cast<S*>(source);
HITCBC 353   116 ::new(storage) ReadAwaitable(s.read(buffers)); 353   116 ::new(storage) ReadAwaitable(s.read(buffers));
354   354  
355   static constexpr awaitable_ops ops = { 355   static constexpr awaitable_ops ops = {
HITCBC 356   116 +[](void* p) { 356   116 +[](void* p) {
HITCBC 357   116 return static_cast<ReadAwaitable*>(p)->await_ready(); 357   116 return static_cast<ReadAwaitable*>(p)->await_ready();
358   }, 358   },
MISUBC 359   +[](void* p, std::coroutine_handle<> h, io_env const* env) { 359   +[](void* p, std::coroutine_handle<> h, io_env const* env) {
MISUBC 360   return detail::call_await_suspend( 360   return detail::call_await_suspend(
MISUBC 361   static_cast<ReadAwaitable*>(p), h, env); 361   static_cast<ReadAwaitable*>(p), h, env);
362   }, 362   },
HITCBC 363   116 +[](void* p) { 363   116 +[](void* p) {
HITCBC 364   116 return static_cast<ReadAwaitable*>(p)->await_resume(); 364   116 return static_cast<ReadAwaitable*>(p)->await_resume();
365   }, 365   },
HITCBC 366   116 +[](void* p) noexcept { 366   116 +[](void* p) noexcept {
MISUBC 367   static_cast<ReadAwaitable*>(p)->~ReadAwaitable(); 367   static_cast<ReadAwaitable*>(p)->~ReadAwaitable();
368   } 368   }
369   }; 369   };
HITCBC 370   116 return &ops; 370   116 return &ops;
371   } 371   }
372   372  
373   static constexpr std::size_t max_awaitable_size = 373   static constexpr std::size_t max_awaitable_size =
374   sizeof(ReadSomeAwaitable) > sizeof(ReadAwaitable) 374   sizeof(ReadSomeAwaitable) > sizeof(ReadAwaitable)
375   ? sizeof(ReadSomeAwaitable) 375   ? sizeof(ReadSomeAwaitable)
376   : sizeof(ReadAwaitable); 376   : sizeof(ReadAwaitable);
377   static constexpr std::size_t max_awaitable_align = 377   static constexpr std::size_t max_awaitable_align =
378   alignof(ReadSomeAwaitable) > alignof(ReadAwaitable) 378   alignof(ReadSomeAwaitable) > alignof(ReadAwaitable)
379   ? alignof(ReadSomeAwaitable) 379   ? alignof(ReadSomeAwaitable)
380   : alignof(ReadAwaitable); 380   : alignof(ReadAwaitable);
381   381  
382   static constexpr vtable value = { 382   static constexpr vtable value = {
383   &construct_read_some_awaitable_impl, 383   &construct_read_some_awaitable_impl,
384   &construct_read_awaitable_impl, 384   &construct_read_awaitable_impl,
385   max_awaitable_size, 385   max_awaitable_size,
386   max_awaitable_align, 386   max_awaitable_align,
387   &do_destroy_impl 387   &do_destroy_impl
388   }; 388   };
389   }; 389   };
390 - //----------------------------------------------------------  
391 -  
392   390  
393   inline 391   inline
HITCBC 394   145 any_read_source::~any_read_source() 392   145 any_read_source::~any_read_source()
395   { 393   {
HITCBC 396   145 if(storage_) 394   145 if(storage_)
397   { 395   {
HITCBC 398   6 vt_->destroy(source_); 396   6 vt_->destroy(source_);
HITCBC 399   6 ::operator delete(storage_); 397   6 ::operator delete(storage_);
400   } 398   }
HITCBC 401   145 if(cached_awaitable_) 399   145 if(cached_awaitable_)
402   { 400   {
HITCBC 403   139 if(active_ops_) 401   139 if(active_ops_)
HITCBC 404   1 active_ops_->destroy(cached_awaitable_); 402   1 active_ops_->destroy(cached_awaitable_);
HITCBC 405   139 ::operator delete(cached_awaitable_); 403   139 ::operator delete(cached_awaitable_);
406   } 404   }
HITCBC 407   145 } 405   145 }
408   406  
409   inline any_read_source& 407   inline any_read_source&
HITCBC 410   4 any_read_source::operator=(any_read_source&& other) noexcept 408   4 any_read_source::operator=(any_read_source&& other) noexcept
411   { 409   {
HITCBC 412   4 if(this != &other) 410   4 if(this != &other)
413   { 411   {
HITCBC 414   3 if(storage_) 412   3 if(storage_)
415   { 413   {
MISUBC 416   vt_->destroy(source_); 414   vt_->destroy(source_);
MISUBC 417   ::operator delete(storage_); 415   ::operator delete(storage_);
418   } 416   }
HITCBC 419   3 if(cached_awaitable_) 417   3 if(cached_awaitable_)
420   { 418   {
HITCBC 421   2 if(active_ops_) 419   2 if(active_ops_)
HITCBC 422   1 active_ops_->destroy(cached_awaitable_); 420   1 active_ops_->destroy(cached_awaitable_);
HITCBC 423   2 ::operator delete(cached_awaitable_); 421   2 ::operator delete(cached_awaitable_);
424   } 422   }
HITCBC 425   3 source_ = std::exchange(other.source_, nullptr); 423   3 source_ = std::exchange(other.source_, nullptr);
HITCBC 426   3 vt_ = std::exchange(other.vt_, nullptr); 424   3 vt_ = std::exchange(other.vt_, nullptr);
HITCBC 427   3 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr); 425   3 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
HITCBC 428   3 storage_ = std::exchange(other.storage_, nullptr); 426   3 storage_ = std::exchange(other.storage_, nullptr);
HITCBC 429   3 active_ops_ = std::exchange(other.active_ops_, nullptr); 427   3 active_ops_ = std::exchange(other.active_ops_, nullptr);
430   } 428   }
HITCBC 431   4 return *this; 429   4 return *this;
432   } 430   }
433   431  
434   template<ReadSource S> 432   template<ReadSource S>
435   requires (!std::same_as<std::decay_t<S>, any_read_source>) 433   requires (!std::same_as<std::decay_t<S>, any_read_source>)
HITCBC 436   6 any_read_source::any_read_source(S s) 434   6 any_read_source::any_read_source(S s)
HITCBC 437   6 : vt_(&vtable_for_impl<S>::value) 435   6 : vt_(&vtable_for_impl<S>::value)
438   { 436   {
439   struct guard { 437   struct guard {
440   any_read_source* self; 438   any_read_source* self;
441   bool committed = false; 439   bool committed = false;
HITCBC 442   6 ~guard() { 440   6 ~guard() {
HITCBC 443   6 if(!committed && self->storage_) { 441   6 if(!committed && self->storage_) {
MISUBC 444   self->vt_->destroy(self->source_); 442   self->vt_->destroy(self->source_);
MISUBC 445   ::operator delete(self->storage_); 443   ::operator delete(self->storage_);
MISUBC 446   self->storage_ = nullptr; 444   self->storage_ = nullptr;
MISUBC 447   self->source_ = nullptr; 445   self->source_ = nullptr;
448   } 446   }
HITCBC 449   6 } 447   6 }
HITCBC 450   6 } g{this}; 448   6 } g{this};
451   449  
HITCBC 452   6 storage_ = ::operator new(sizeof(S)); 450   6 storage_ = ::operator new(sizeof(S));
HITCBC 453   6 source_ = ::new(storage_) S(std::move(s)); 451   6 source_ = ::new(storage_) S(std::move(s));
454   452  
455   // Preallocate the awaitable storage 453   // Preallocate the awaitable storage
HITCBC 456   6 cached_awaitable_ = ::operator new(vt_->awaitable_size); 454   6 cached_awaitable_ = ::operator new(vt_->awaitable_size);
457   455  
HITCBC 458   6 g.committed = true; 456   6 g.committed = true;
HITCBC 459   6 } 457   6 }
460   458  
461   template<ReadSource S> 459   template<ReadSource S>
HITCBC 462   135 any_read_source::any_read_source(S* s) 460   135 any_read_source::any_read_source(S* s)
HITCBC 463   135 : source_(s) 461   135 : source_(s)
HITCBC 464   135 , vt_(&vtable_for_impl<S>::value) 462   135 , vt_(&vtable_for_impl<S>::value)
465   { 463   {
466   // Preallocate the awaitable storage 464   // Preallocate the awaitable storage
HITCBC 467   135 cached_awaitable_ = ::operator new(vt_->awaitable_size); 465   135 cached_awaitable_ = ::operator new(vt_->awaitable_size);
HITCBC 468   135 } 466   135 }
469 - //----------------------------------------------------------  
470 -  
471   467  
472   template<MutableBufferSequence MB> 468   template<MutableBufferSequence MB>
473   auto 469   auto
HITCBC 474   54 any_read_source::read_some(MB buffers) 470   54 any_read_source::read_some(MB buffers)
475   { 471   {
476   struct awaitable 472   struct awaitable
477   { 473   {
478   any_read_source* self_; 474   any_read_source* self_;
479 - mutable_buffer_array<detail::max_iovec_> ba_; 475 + detail::mutable_buffer_array<detail::max_iovec_> ba_;
480   476  
HITCBC 481   54 awaitable(any_read_source* self, MB const& buffers) 477   54 awaitable(any_read_source* self, MB const& buffers)
HITCBC 482   54 : self_(self) 478   54 : self_(self)
HITCBC 483   54 , ba_(buffers) 479   54 , ba_(buffers)
484   { 480   {
HITCBC 485   54 } 481   54 }
486   482  
487   bool 483   bool
HITCBC 488   54 await_ready() const noexcept 484   54 await_ready() const noexcept
489   { 485   {
HITCBC 490   54 return ba_.to_span().empty(); 486   54 return ba_.to_span().empty();
491   } 487   }
492   488  
493   std::coroutine_handle<> 489   std::coroutine_handle<>
HITCBC 494   52 await_suspend(std::coroutine_handle<> h, io_env const* env) 490   52 await_suspend(std::coroutine_handle<> h, io_env const* env)
495   { 491   {
HITCBC 496   52 self_->active_ops_ = self_->vt_->construct_read_some_awaitable( 492   52 self_->active_ops_ = self_->vt_->construct_read_some_awaitable(
HITCBC 497   52 self_->source_, 493   52 self_->source_,
HITCBC 498   52 self_->cached_awaitable_, 494   52 self_->cached_awaitable_,
HITCBC 499   52 ba_.to_span()); 495   52 ba_.to_span());
500   496  
HITCBC 501   52 if(self_->active_ops_->await_ready(self_->cached_awaitable_)) 497   52 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
HITCBC 502   50 return h; 498   50 return h;
503   499  
HITCBC 504   2 return self_->active_ops_->await_suspend( 500   2 return self_->active_ops_->await_suspend(
HITCBC 505   2 self_->cached_awaitable_, h, env); 501   2 self_->cached_awaitable_, h, env);
506   } 502   }
507   503  
508   io_result<std::size_t> 504   io_result<std::size_t>
HITCBC 509   52 await_resume() 505   52 await_resume()
510   { 506   {
HITCBC 511   52 if(ba_.to_span().empty()) 507   52 if(ba_.to_span().empty())
HITCBC 512   2 return {{}, 0}; 508   2 return {{}, 0};
513   509  
514   struct guard { 510   struct guard {
515   any_read_source* self; 511   any_read_source* self;
HITCBC 516   50 ~guard() { 512   50 ~guard() {
HITCBC 517   50 self->active_ops_->destroy(self->cached_awaitable_); 513   50 self->active_ops_->destroy(self->cached_awaitable_);
HITCBC 518   50 self->active_ops_ = nullptr; 514   50 self->active_ops_ = nullptr;
HITCBC 519   50 } 515   50 }
HITCBC 520   50 } g{self_}; 516   50 } g{self_};
HITCBC 521   50 return self_->active_ops_->await_resume( 517   50 return self_->active_ops_->await_resume(
HITCBC 522   50 self_->cached_awaitable_); 518   50 self_->cached_awaitable_);
HITCBC 523   50 } 519   50 }
524   }; 520   };
HITCBC 525   54 return awaitable(this, buffers); 521   54 return awaitable(this, buffers);
526   } 522   }
527   523  
528   inline auto 524   inline auto
HITCBC 529   116 any_read_source::read_(std::span<mutable_buffer const> buffers) 525   116 any_read_source::read_(std::span<mutable_buffer const> buffers)
530   { 526   {
531   struct awaitable 527   struct awaitable
532   { 528   {
533   any_read_source* self_; 529   any_read_source* self_;
534   std::span<mutable_buffer const> buffers_; 530   std::span<mutable_buffer const> buffers_;
535   531  
536   bool 532   bool
HITCBC 537   116 await_ready() const noexcept 533   116 await_ready() const noexcept
538   { 534   {
HITCBC 539   116 return false; 535   116 return false;
540   } 536   }
541   537  
542   std::coroutine_handle<> 538   std::coroutine_handle<>
HITCBC 543   116 await_suspend(std::coroutine_handle<> h, io_env const* env) 539   116 await_suspend(std::coroutine_handle<> h, io_env const* env)
544   { 540   {
HITCBC 545   232 self_->active_ops_ = self_->vt_->construct_read_awaitable( 541   232 self_->active_ops_ = self_->vt_->construct_read_awaitable(
HITCBC 546   116 self_->source_, 542   116 self_->source_,
HITCBC 547   116 self_->cached_awaitable_, 543   116 self_->cached_awaitable_,
548   buffers_); 544   buffers_);
549   545  
HITCBC 550   116 if(self_->active_ops_->await_ready(self_->cached_awaitable_)) 546   116 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
HITCBC 551   116 return h; 547   116 return h;
552   548  
MISUBC 553   return self_->active_ops_->await_suspend( 549   return self_->active_ops_->await_suspend(
MISUBC 554   self_->cached_awaitable_, h, env); 550   self_->cached_awaitable_, h, env);
555   } 551   }
556   552  
557   io_result<std::size_t> 553   io_result<std::size_t>
HITCBC 558   116 await_resume() 554   116 await_resume()
559   { 555   {
560   struct guard { 556   struct guard {
561   any_read_source* self; 557   any_read_source* self;
HITCBC 562   116 ~guard() { 558   116 ~guard() {
HITCBC 563   116 self->active_ops_->destroy(self->cached_awaitable_); 559   116 self->active_ops_->destroy(self->cached_awaitable_);
HITCBC 564   116 self->active_ops_ = nullptr; 560   116 self->active_ops_ = nullptr;
HITCBC 565   116 } 561   116 }
HITCBC 566   116 } g{self_}; 562   116 } g{self_};
HITCBC 567   116 return self_->active_ops_->await_resume( 563   116 return self_->active_ops_->await_resume(
HITCBC 568   200 self_->cached_awaitable_); 564   200 self_->cached_awaitable_);
HITCBC 569   116 } 565   116 }
570   }; 566   };
HITCBC 571   116 return awaitable{this, buffers}; 567   116 return awaitable{this, buffers};
572   } 568   }
573   569  
574   template<MutableBufferSequence MB> 570   template<MutableBufferSequence MB>
575   io_task<std::size_t> 571   io_task<std::size_t>
HITCBC 576   110 any_read_source::read(MB buffers) 572   110 any_read_source::read(MB buffers)
577   { 573   {
578   buffer_param bp(buffers); 574   buffer_param bp(buffers);
579   std::size_t total = 0; 575   std::size_t total = 0;
580   576  
581   for(;;) 577   for(;;)
582   { 578   {
583   auto bufs = bp.data(); 579   auto bufs = bp.data();
584   if(bufs.empty()) 580   if(bufs.empty())
585   break; 581   break;
586   582  
587   auto [ec, n] = co_await read_(bufs); 583   auto [ec, n] = co_await read_(bufs);
588   total += n; 584   total += n;
589   if(ec) 585   if(ec)
590   co_return {ec, total}; 586   co_return {ec, total};
591   bp.consume(n); 587   bp.consume(n);
592   } 588   }
593   589  
594   co_return {{}, total}; 590   co_return {{}, total};
HITCBC 595   220 } 591   220 }
596   592  
597   } // namespace capy 593   } // namespace capy
598   } // namespace boost 594   } // namespace boost
599   595  
600   #endif 596   #endif