94.78% Lines (109/115) 95.00% Functions (19/20)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/capy 7   // Official repository: https://github.com/cppalliance/capy
8   // 8   //
9   9  
10   #ifndef BOOST_CAPY_TEST_STREAM_HPP 10   #ifndef BOOST_CAPY_TEST_STREAM_HPP
11   #define BOOST_CAPY_TEST_STREAM_HPP 11   #define BOOST_CAPY_TEST_STREAM_HPP
12   12  
13   #include <boost/capy/detail/config.hpp> 13   #include <boost/capy/detail/config.hpp>
14   #include <boost/capy/buffers.hpp> 14   #include <boost/capy/buffers.hpp>
15   #include <boost/capy/buffers/buffer_copy.hpp> 15   #include <boost/capy/buffers/buffer_copy.hpp>
16   #include <boost/capy/buffers/make_buffer.hpp> 16   #include <boost/capy/buffers/make_buffer.hpp>
  17 + #include <boost/capy/continuation.hpp>
17   #include <coroutine> 18   #include <coroutine>
18   #include <boost/capy/ex/io_env.hpp> 19   #include <boost/capy/ex/io_env.hpp>
19   #include <boost/capy/io_result.hpp> 20   #include <boost/capy/io_result.hpp>
20   #include <boost/capy/error.hpp> 21   #include <boost/capy/error.hpp>
21   #include <boost/capy/read.hpp> 22   #include <boost/capy/read.hpp>
22   #include <boost/capy/task.hpp> 23   #include <boost/capy/task.hpp>
23   #include <boost/capy/test/fuse.hpp> 24   #include <boost/capy/test/fuse.hpp>
24   #include <boost/capy/test/run_blocking.hpp> 25   #include <boost/capy/test/run_blocking.hpp>
25   26  
26   #include <memory> 27   #include <memory>
27   #include <stop_token> 28   #include <stop_token>
28   #include <string> 29   #include <string>
29   #include <string_view> 30   #include <string_view>
30   #include <utility> 31   #include <utility>
31   32  
32   namespace boost { 33   namespace boost {
33   namespace capy { 34   namespace capy {
34   namespace test { 35   namespace test {
35   36  
36   /** A connected stream for testing bidirectional I/O. 37   /** A connected stream for testing bidirectional I/O.
37   38  
38   Streams are created in pairs via @ref make_stream_pair. 39   Streams are created in pairs via @ref make_stream_pair.
39   Data written to one end becomes available for reading on 40   Data written to one end becomes available for reading on
40   the other. If no data is available when @ref read_some 41   the other. If no data is available when @ref read_some
41   is called, the calling coroutine suspends until the peer 42   is called, the calling coroutine suspends until the peer
42   calls @ref write_some. The shared @ref fuse enables error 43   calls @ref write_some. The shared @ref fuse enables error
43   injection at controlled points in both directions. 44   injection at controlled points in both directions.
44   45  
45   When the fuse injects an error or throws on one end, the 46   When the fuse injects an error or throws on one end, the
46   other end is automatically closed: any suspended reader is 47   other end is automatically closed: any suspended reader is
47   resumed with `error::eof`, and subsequent operations on 48   resumed with `error::eof`, and subsequent operations on
48   both ends return `error::eof`. Calling @ref close on one 49   both ends return `error::eof`. Calling @ref close on one
49   end signals eof to the peer's reads after draining any 50   end signals eof to the peer's reads after draining any
50   buffered data, while the peer may still write. 51   buffered data, while the peer may still write.
51   52  
52   @par Thread Safety 53   @par Thread Safety
53   Single-threaded only. Both ends of the pair must be 54   Single-threaded only. Both ends of the pair must be
54   accessed from the same thread. Concurrent access is 55   accessed from the same thread. Concurrent access is
55   undefined behavior. 56   undefined behavior.
56   57  
57   @par Example 58   @par Example
58   @code 59   @code
59   fuse f; 60   fuse f;
60   auto [a, b] = make_stream_pair( f ); 61   auto [a, b] = make_stream_pair( f );
61   62  
62   auto r = f.armed( [&]( fuse& ) -> task<> { 63   auto r = f.armed( [&]( fuse& ) -> task<> {
63   auto [ec, n] = co_await a.write_some( 64   auto [ec, n] = co_await a.write_some(
64   const_buffer( "hello", 5 ) ); 65   const_buffer( "hello", 5 ) );
65   if( ec ) 66   if( ec )
66   co_return; 67   co_return;
67   68  
68   char buf[32]; 69   char buf[32];
69   auto [ec2, n2] = co_await b.read_some( 70   auto [ec2, n2] = co_await b.read_some(
70   mutable_buffer( buf, sizeof( buf ) ) ); 71   mutable_buffer( buf, sizeof( buf ) ) );
71   if( ec2 ) 72   if( ec2 )
72   co_return; 73   co_return;
73   // buf contains "hello" 74   // buf contains "hello"
74   } ); 75   } );
75   @endcode 76   @endcode
76   77  
77   @see make_stream_pair, fuse 78   @see make_stream_pair, fuse
78   */ 79   */
79   class stream 80   class stream
80   { 81   {
81   // Single-threaded only. No concurrent access to either 82   // Single-threaded only. No concurrent access to either
82   // end of the pair. Both streams and all operations must 83   // end of the pair. Both streams and all operations must
83   // run on the same thread. 84   // run on the same thread.
84   85  
85   struct half 86   struct half
86   { 87   {
87   std::string buf; 88   std::string buf;
88   std::size_t max_read_size = std::size_t(-1); 89   std::size_t max_read_size = std::size_t(-1);
89 - std::coroutine_handle<> pending_h{}; 90 + continuation pending_cont_;
90   executor_ref pending_ex; 91   executor_ref pending_ex;
91   bool eof = false; 92   bool eof = false;
92   }; 93   };
93   94  
94   struct state 95   struct state
95   { 96   {
96   fuse f; 97   fuse f;
97   bool closed = false; 98   bool closed = false;
98   half sides[2]; 99   half sides[2];
99   100  
HITCBC 100   279 explicit state(fuse f_) noexcept 101   280 explicit state(fuse f_) noexcept
HITCBC 101   837 : f(std::move(f_)) 102   840 : f(std::move(f_))
102   { 103   {
HITCBC 103   279 } 104   280 }
104   105  
105   // Set closed and resume any suspended readers 106   // Set closed and resume any suspended readers
106   // with eof on both sides. 107   // with eof on both sides.
HITCBC 107   208 void close() 108   208 void close()
108   { 109   {
HITCBC 109   208 closed = true; 110   208 closed = true;
HITCBC 110   624 for(auto& side : sides) 111   624 for(auto& side : sides)
111   { 112   {
HITCBC 112 - 416 if(side.pending_h) 113 + 416 if(side.pending_cont_.h)
113   { 114   {
HITCBC 114 - 12 auto h = side.pending_h; 115 + 12 side.pending_ex.post(side.pending_cont_);
HITCBC 115 - 12 side.pending_h = {}; 116 + 12 side.pending_cont_.h = {};
DCB 116 - 12 auto ex = side.pending_ex;  
DCB 117 - 12 ex.post(h);  
HITCBC 118   12 side.pending_ex = {}; 117   12 side.pending_ex = {};
119   } 118   }
120   } 119   }
HITCBC 121   208 } 120   208 }
122   }; 121   };
123   122  
124   // Wraps the maybe_fail() call. If the guard is 123   // Wraps the maybe_fail() call. If the guard is
125   // not disarmed before destruction (fuse returned 124   // not disarmed before destruction (fuse returned
126   // an error, or threw an exception), closes both 125   // an error, or threw an exception), closes both
127   // ends so any suspended peer gets eof. 126   // ends so any suspended peer gets eof.
128   struct close_guard 127   struct close_guard
129   { 128   {
130   state* st; 129   state* st;
131   bool armed = true; 130   bool armed = true;
HITCBC 132   298 void disarm() noexcept { armed = false; } 131   300 void disarm() noexcept { armed = false; }
HITCBC 133   506 ~close_guard() noexcept(false) { if(armed) st->close(); } 132   508 ~close_guard() noexcept(false) { if(armed) st->close(); }
134   }; 133   };
135   134  
136   std::shared_ptr<state> state_; 135   std::shared_ptr<state> state_;
137   int index_; 136   int index_;
138   137  
HITCBC 139   558 stream( 138   560 stream(
140   std::shared_ptr<state> sp, 139   std::shared_ptr<state> sp,
141   int index) noexcept 140   int index) noexcept
HITCBC 142   558 : state_(std::move(sp)) 141   560 : state_(std::move(sp))
HITCBC 143   558 , index_(index) 142   560 , index_(index)
144   { 143   {
HITCBC 145   558 } 144   560 }
146   145  
147   friend std::pair<stream, stream> 146   friend std::pair<stream, stream>
148   make_stream_pair(fuse); 147   make_stream_pair(fuse);
149   148  
150   public: 149   public:
151   stream(stream const&) = delete; 150   stream(stream const&) = delete;
152   stream& operator=(stream const&) = delete; 151   stream& operator=(stream const&) = delete;
HITCBC 153   658 stream(stream&&) = default; 152   660 stream(stream&&) = default;
154   stream& operator=(stream&&) = default; 153   stream& operator=(stream&&) = default;
155   154  
156   /** Signal end-of-stream to the peer. 155   /** Signal end-of-stream to the peer.
157   156  
158   Marks the peer's read direction as closed. 157   Marks the peer's read direction as closed.
159   If the peer is suspended in @ref read_some, 158   If the peer is suspended in @ref read_some,
160   it is resumed. The peer drains any buffered 159   it is resumed. The peer drains any buffered
161   data before receiving `error::eof`. Writes 160   data before receiving `error::eof`. Writes
162   from the peer are unaffected. 161   from the peer are unaffected.
163   */ 162   */
164   void 163   void
HITCBC 165   3 close() 164   3 close()
166   { 165   {
HITCBC 167   3 int peer = 1 - index_; 166   3 int peer = 1 - index_;
HITCBC 168   3 auto& side = state_->sides[peer]; 167   3 auto& side = state_->sides[peer];
HITCBC 169   3 side.eof = true; 168   3 side.eof = true;
HITCBC 170 - 3 if(side.pending_h) 169 + 3 if(side.pending_cont_.h)
171   { 170   {
HITCBC 172 - 1 auto h = side.pending_h; 171 + 1 side.pending_ex.post(side.pending_cont_);
HITCBC 173 - 1 side.pending_h = {}; 172 + 1 side.pending_cont_.h = {};
DCB 174 - 1 auto ex = side.pending_ex;  
DCB 175 - 1 ex.post(h);  
HITCBC 176   1 side.pending_ex = {}; 173   1 side.pending_ex = {};
177   } 174   }
HITCBC 178   3 } 175   3 }
179   176  
180   /** Set the maximum bytes returned per read. 177   /** Set the maximum bytes returned per read.
181   178  
182   Limits how many bytes @ref read_some returns in 179   Limits how many bytes @ref read_some returns in
183   a single call, simulating chunked network delivery. 180   a single call, simulating chunked network delivery.
184   The default is unlimited. 181   The default is unlimited.
185   182  
186   @param n Maximum bytes per read. 183   @param n Maximum bytes per read.
187   */ 184   */
188   void 185   void
HITCBC 189   54 set_max_read_size(std::size_t n) noexcept 186   54 set_max_read_size(std::size_t n) noexcept
190   { 187   {
HITCBC 191   54 state_->sides[index_].max_read_size = n; 188   54 state_->sides[index_].max_read_size = n;
HITCBC 192   54 } 189   54 }
193   190  
194   /** Asynchronously read data from the stream. 191   /** Asynchronously read data from the stream.
195   192  
196   Transfers up to `buffer_size(buffers)` bytes from 193   Transfers up to `buffer_size(buffers)` bytes from
197   data written by the peer. If no data is available, 194   data written by the peer. If no data is available,
198   the calling coroutine suspends until the peer calls 195   the calling coroutine suspends until the peer calls
199   @ref write_some. Before every read, the attached 196   @ref write_some. Before every read, the attached
200   @ref fuse is consulted to possibly inject an error. 197   @ref fuse is consulted to possibly inject an error.
201   If the fuse fires, the peer is automatically closed. 198   If the fuse fires, the peer is automatically closed.
202   If the stream is closed, returns `error::eof`. 199   If the stream is closed, returns `error::eof`.
203   The returned `std::size_t` is the number of bytes 200   The returned `std::size_t` is the number of bytes
204   transferred. 201   transferred.
205   202  
206   @param buffers The mutable buffer sequence to receive data. 203   @param buffers The mutable buffer sequence to receive data.
207   204  
208 - @return An awaitable yielding `(error_code,std::size_t)`. 205 + @return An awaitable that await-returns `(error_code,std::size_t)`.
209   206  
210   @see fuse, close 207   @see fuse, close
211   */ 208   */
212   template<MutableBufferSequence MB> 209   template<MutableBufferSequence MB>
213   auto 210   auto
HITCBC 214   274 read_some(MB buffers) 211   275 read_some(MB buffers)
215   { 212   {
216   struct awaitable 213   struct awaitable
217   { 214   {
218   stream* self_; 215   stream* self_;
219   MB buffers_; 216   MB buffers_;
220   217  
HITCBC 221   274 bool await_ready() const noexcept 218   275 bool await_ready() const noexcept
222   { 219   {
HITCBC 223   274 if(buffer_empty(buffers_)) 220   275 if(buffer_empty(buffers_))
HITCBC 224   8 return true; 221   8 return true;
HITCBC 225   266 auto* st = self_->state_.get(); 222   267 auto* st = self_->state_.get();
HITCBC 226   266 auto& side = st->sides[self_->index_]; 223   267 auto& side = st->sides[self_->index_];
HITCBC 227   530 return st->closed || side.eof || 224   532 return st->closed || side.eof ||
HITCBC 228   530 !side.buf.empty(); 225   532 !side.buf.empty();
229   } 226   }
230   227  
HITCBC 231   25 std::coroutine_handle<> await_suspend( 228   25 std::coroutine_handle<> await_suspend(
232   std::coroutine_handle<> h, 229   std::coroutine_handle<> h,
233   io_env const* env) noexcept 230   io_env const* env) noexcept
234   { 231   {
HITCBC 235   25 auto& side = self_->state_->sides[ 232   25 auto& side = self_->state_->sides[
HITCBC 236   25 self_->index_]; 233   25 self_->index_];
HITCBC 237 - 25 side.pending_h = h; 234 + 25 side.pending_cont_.h = h;
HITCBC 238   25 side.pending_ex = env->executor; 235   25 side.pending_ex = env->executor;
HITCBC 239   25 return std::noop_coroutine(); 236   25 return std::noop_coroutine();
240   } 237   }
241   238  
242   io_result<std::size_t> 239   io_result<std::size_t>
HITCBC 243   274 await_resume() 240   275 await_resume()
244   { 241   {
HITCBC 245   274 if(buffer_empty(buffers_)) 242   275 if(buffer_empty(buffers_))
HITCBC 246   8 return {{}, 0}; 243   8 return {{}, 0};
247   244  
HITCBC 248   266 auto* st = self_->state_.get(); 245   267 auto* st = self_->state_.get();
HITCBC 249   266 auto& side = st->sides[ 246   267 auto& side = st->sides[
HITCBC 250   266 self_->index_]; 247   267 self_->index_];
251   248  
HITCBC 252   266 if(st->closed) 249   267 if(st->closed)
HITCBC 253   12 return {error::eof, 0}; 250   12 return {error::eof, 0};
254   251  
HITCBC 255   254 if(side.eof && side.buf.empty()) 252   255 if(side.eof && side.buf.empty())
HITCBC 256   3 return {error::eof, 0}; 253   3 return {error::eof, 0};
257   254  
HITCBC 258   251 if(!side.eof) 255   252 if(!side.eof)
259   { 256   {
HITCBC 260   251 close_guard g{st}; 257   252 close_guard g{st};
HITCBC 261   251 auto ec = st->f.maybe_fail(); 258   252 auto ec = st->f.maybe_fail();
HITCBC 262   198 if(ec) 259   199 if(ec)
HITCBC 263   53 return {ec, 0}; 260   53 return {ec, 0};
HITCBC 264   145 g.disarm(); 261   146 g.disarm();
HITCBC 265   251 } 262   252 }
266   263  
HITCBC 267   290 std::size_t const n = buffer_copy( 264   292 std::size_t const n = buffer_copy(
HITCBC 268   145 buffers_, make_buffer(side.buf), 265   146 buffers_, make_buffer(side.buf),
269   side.max_read_size); 266   side.max_read_size);
HITCBC 270   145 side.buf.erase(0, n); 267   146 side.buf.erase(0, n);
HITCBC 271   145 return {{}, n}; 268   146 return {{}, n};
272   } 269   }
273   }; 270   };
HITCBC 274   274 return awaitable{this, buffers}; 271   275 return awaitable{this, buffers};
275   } 272   }
276   273  
277   /** Asynchronously write data to the stream. 274   /** Asynchronously write data to the stream.
278   275  
279   Transfers up to `buffer_size(buffers)` bytes to the 276   Transfers up to `buffer_size(buffers)` bytes to the
280   peer's incoming buffer. If the peer is suspended in 277   peer's incoming buffer. If the peer is suspended in
281   @ref read_some, it is resumed. Before every write, 278   @ref read_some, it is resumed. Before every write,
282   the attached @ref fuse is consulted to possibly inject 279   the attached @ref fuse is consulted to possibly inject
283   an error. If the fuse fires, the peer is automatically 280   an error. If the fuse fires, the peer is automatically
284   closed. If the stream is closed, returns `error::eof`. 281   closed. If the stream is closed, returns `error::eof`.
285   The returned `std::size_t` is the number of bytes 282   The returned `std::size_t` is the number of bytes
286   transferred. 283   transferred.
287   284  
288   @param buffers The const buffer sequence containing 285   @param buffers The const buffer sequence containing
289   data to write. 286   data to write.
290   287  
291 - @return An awaitable yielding `(error_code,std::size_t)`. 288 + @return An awaitable that await-returns `(error_code,std::size_t)`.
292   289  
293   @see fuse, close 290   @see fuse, close
294   */ 291   */
295   template<ConstBufferSequence CB> 292   template<ConstBufferSequence CB>
296   auto 293   auto
HITCBC 297   259 write_some(CB buffers) 294   260 write_some(CB buffers)
298   { 295   {
299   struct awaitable 296   struct awaitable
300   { 297   {
301   stream* self_; 298   stream* self_;
302   CB buffers_; 299   CB buffers_;
303   300  
HITCBC 304   259 bool await_ready() const noexcept { return true; } 301   260 bool await_ready() const noexcept { return true; }
305   302  
MISUBC 306   void await_suspend( 303   void await_suspend(
307   std::coroutine_handle<>, 304   std::coroutine_handle<>,
308   io_env const*) const noexcept 305   io_env const*) const noexcept
309   { 306   {
MISUBC 310   } 307   }
311   308  
312   io_result<std::size_t> 309   io_result<std::size_t>
HITCBC 313   259 await_resume() 310   260 await_resume()
314   { 311   {
HITCBC 315   259 std::size_t n = buffer_size(buffers_); 312   260 std::size_t n = buffer_size(buffers_);
HITCBC 316   259 if(n == 0) 313   260 if(n == 0)
HITCBC 317   4 return {{}, 0}; 314   4 return {{}, 0};
318   315  
HITCBC 319   255 auto* st = self_->state_.get(); 316   256 auto* st = self_->state_.get();
320   317  
HITCBC 321   255 if(st->closed) 318   256 if(st->closed)
MISUBC 322   return {error::eof, 0}; 319   return {error::eof, 0};
323   320  
HITCBC 324   255 close_guard g{st}; 321   256 close_guard g{st};
HITCBC 325   255 auto ec = st->f.maybe_fail(); 322   256 auto ec = st->f.maybe_fail();
HITCBC 326   204 if(ec) 323   205 if(ec)
HITCBC 327   51 return {ec, 0}; 324   51 return {ec, 0};
HITCBC 328   153 g.disarm(); 325   154 g.disarm();
329   326  
HITCBC 330   153 int peer = 1 - self_->index_; 327   154 int peer = 1 - self_->index_;
HITCBC 331   153 auto& side = st->sides[peer]; 328   154 auto& side = st->sides[peer];
332   329  
HITCBC 333   153 std::size_t const old_size = side.buf.size(); 330   154 std::size_t const old_size = side.buf.size();
HITCBC 334   153 side.buf.resize(old_size + n); 331   154 side.buf.resize(old_size + n);
HITCBC 335   153 buffer_copy(make_buffer( 332   154 buffer_copy(make_buffer(
HITCBC 336   153 side.buf.data() + old_size, n), 333   154 side.buf.data() + old_size, n),
HITCBC 337   153 buffers_, n); 334   154 buffers_, n);
338   335  
HITCBC 339 - 153 if(side.pending_h) 336 + 154 if(side.pending_cont_.h)
340   { 337   {
HITCBC 341 - 12 auto h = side.pending_h; 338 + 12 side.pending_ex.post(side.pending_cont_);
HITCBC 342 - 12 side.pending_h = {}; 339 + 12 side.pending_cont_.h = {};
DCB 343 - 12 auto ex = side.pending_ex;  
DCB 344 - 12 ex.post(h);  
HITCBC 345   12 side.pending_ex = {}; 340   12 side.pending_ex = {};
346   } 341   }
347   342  
HITCBC 348   153 return {{}, n}; 343   154 return {{}, n};
HITCBC 349   255 } 344   256 }
350   }; 345   };
HITCBC 351   259 return awaitable{this, buffers}; 346   260 return awaitable{this, buffers};
352   } 347   }
353   348  
354   /** Inject data into this stream's peer for reading. 349   /** Inject data into this stream's peer for reading.
355   350  
356   Appends data directly to the peer's incoming buffer, 351   Appends data directly to the peer's incoming buffer,
357   bypassing the fuse. If the peer is suspended in 352   bypassing the fuse. If the peer is suspended in
358   @ref read_some, it is resumed. This is test setup, 353   @ref read_some, it is resumed. This is test setup,
359   not an operation under test. 354   not an operation under test.
360   355  
361   @param sv The data to inject. 356   @param sv The data to inject.
362   357  
363   @see make_stream_pair 358   @see make_stream_pair
364   */ 359   */
365   void 360   void
HITCBC 366   86 provide(std::string_view sv) 361   87 provide(std::string_view sv)
367   { 362   {
HITCBC 368   86 int peer = 1 - index_; 363   87 int peer = 1 - index_;
HITCBC 369   86 auto& side = state_->sides[peer]; 364   87 auto& side = state_->sides[peer];
HITCBC 370   86 side.buf.append(sv); 365   87 side.buf.append(sv);
HITCBC 371 - 86 if(side.pending_h) 366 + 87 if(side.pending_cont_.h)
372   { 367   {
MISUBC 373 - auto h = side.pending_h; 368 + side.pending_ex.post(side.pending_cont_);
MISUBC 374 - side.pending_h = {}; 369 + side.pending_cont_.h = {};
DUB 375 - auto ex = side.pending_ex;  
DUB 376 - ex.post(h);  
MISUBC 377   side.pending_ex = {}; 370   side.pending_ex = {};
378   } 371   }
HITCBC 379   86 } 372   87 }
380   373  
381   /** Read from this stream and verify the content. 374   /** Read from this stream and verify the content.
382   375  
383   Reads exactly `expected.size()` bytes from the stream 376   Reads exactly `expected.size()` bytes from the stream
384   and compares against the expected string. The read goes 377   and compares against the expected string. The read goes
385   through the normal path including the fuse. 378   through the normal path including the fuse.
386   379  
387   @param expected The expected content. 380   @param expected The expected content.
388   381  
389   @return A pair of `(error_code, bool)`. The error_code 382   @return A pair of `(error_code, bool)`. The error_code
390   is set if a read error occurs (e.g. fuse injection). 383   is set if a read error occurs (e.g. fuse injection).
391   The bool is true if the data matches. 384   The bool is true if the data matches.
392   385  
393   @see provide 386   @see provide
394   */ 387   */
395   std::pair<std::error_code, bool> 388   std::pair<std::error_code, bool>
HITCBC 396   38 expect(std::string_view expected) 389   38 expect(std::string_view expected)
397   { 390   {
HITCBC 398   38 std::error_code result; 391   38 std::error_code result;
HITCBC 399   38 bool match = false; 392   38 bool match = false;
HITCBC 400   141 run_blocking()([]( 393   141 run_blocking()([](
401   stream& self, 394   stream& self,
402   std::string_view expected, 395   std::string_view expected,
403   std::error_code& result, 396   std::error_code& result,
404   bool& match) -> task<> 397   bool& match) -> task<>
405   { 398   {
406   std::string buf(expected.size(), '\0'); 399   std::string buf(expected.size(), '\0');
407   auto [ec, n] = co_await read( 400   auto [ec, n] = co_await read(
408   self, mutable_buffer( 401   self, mutable_buffer(
409   buf.data(), buf.size())); 402   buf.data(), buf.size()));
410   if(ec) 403   if(ec)
411   { 404   {
412   result = ec; 405   result = ec;
413   co_return; 406   co_return;
414   } 407   }
415   match = (std::string_view( 408   match = (std::string_view(
416   buf.data(), n) == expected); 409   buf.data(), n) == expected);
HITCBC 417   161 }(*this, expected, result, match)); 410   161 }(*this, expected, result, match));
HITCBC 418   58 return {result, match}; 411   58 return {result, match};
419   } 412   }
420   413  
421   /** Return the stream's pending read data. 414   /** Return the stream's pending read data.
422   415  
423   Returns a view of the data waiting to be read 416   Returns a view of the data waiting to be read
424   from this stream. This is a direct peek at the 417   from this stream. This is a direct peek at the
425   internal buffer, bypassing the fuse. 418   internal buffer, bypassing the fuse.
426   419  
427   @return A view of the pending data. 420   @return A view of the pending data.
428   421  
429   @see provide, expect 422   @see provide, expect
430   */ 423   */
431   std::string_view 424   std::string_view
432   data() const noexcept 425   data() const noexcept
433   { 426   {
434   return state_->sides[index_].buf; 427   return state_->sides[index_].buf;
435   } 428   }
436   }; 429   };
437   430  
438   /** Create a connected pair of test streams. 431   /** Create a connected pair of test streams.
439   432  
440   Data written to one stream becomes readable on the other. 433   Data written to one stream becomes readable on the other.
441   If a coroutine calls @ref stream::read_some when no data 434   If a coroutine calls @ref stream::read_some when no data
442   is available, it suspends until the peer writes. Before 435   is available, it suspends until the peer writes. Before
443   every read or write, the @ref fuse is consulted to 436   every read or write, the @ref fuse is consulted to
444   possibly inject an error for testing fault scenarios. 437   possibly inject an error for testing fault scenarios.
445   When the fuse fires, the peer is automatically closed. 438   When the fuse fires, the peer is automatically closed.
446   439  
447   @param f The fuse used to inject errors during operations. 440   @param f The fuse used to inject errors during operations.
448   441  
449   @return A pair of connected streams. 442   @return A pair of connected streams.
450   443  
451   @see stream, fuse 444   @see stream, fuse
452   */ 445   */
453   inline std::pair<stream, stream> 446   inline std::pair<stream, stream>
HITCBC 454   279 make_stream_pair(fuse f = {}) 447   280 make_stream_pair(fuse f = {})
455   { 448   {
HITCBC 456   279 auto sp = std::make_shared<stream::state>(std::move(f)); 449   280 auto sp = std::make_shared<stream::state>(std::move(f));
HITCBC 457   558 return {stream(sp, 0), stream(sp, 1)}; 450   560 return {stream(sp, 0), stream(sp, 1)};
HITCBC 458   279 } 451   280 }
459   452  
460   } // test 453   } // test
461   } // capy 454   } // capy
462   } // boost 455   } // boost
463   456  
464   #endif 457   #endif