1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::cmp; 15 use std::io::{IoSlice, SeekFrom}; 16 use std::pin::Pin; 17 use std::task::{Context, Poll}; 18 19 use crate::io::async_buf_read::AsyncBufRead; 20 use crate::io::buffered::DEFAULT_BUF_SIZE; 21 use crate::io::{poll_ready, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; 22 23 /// This is an asynchronous version of [`std::io::BufReader`] 24 /// 25 /// The `AsyncBufReader<R>` struct adds buffering to any reader that implements 26 /// AsyncRead. It is suitable to perform large, infrequent reads on the 27 /// underlying [`AsyncRead`] object and maintains an in-memory buffer of the 28 /// results. 29 /// 30 /// When the `AsyncBufReader<R>` is dropped, the contents inside its buffer will 31 /// be discarded. Creating multiple instances of `AsyncBufReader<R>` on the same 32 /// [`AsyncRead`] stream may cause data loss. 33 pub struct AsyncBufReader<R> { 34 inner: R, 35 buf: Box<[u8]>, 36 pos: usize, 37 filled: usize, 38 } 39 40 impl<R: AsyncRead> AsyncBufReader<R> { 41 /// Creates a new `AsyncBufReader<R>` with a default buffer capacity. 42 /// The default buffer capacity is 8 KB, which is the same as 43 /// [`std::io::BufReader`] 44 /// 45 /// # Examples 46 /// 47 /// ```no run 48 /// use ylong_runtime::fs::File; 49 /// 50 /// async fn main() -> std::io::Result<()> { 51 /// use ylong_runtime::io::AsyncBufReader; 52 /// let f = File::open("test.txt").await?; 53 /// let reader = AsyncBufReader::new(f); 54 /// Ok(()) 55 /// } 56 /// ``` newnull57 pub fn new(inner: R) -> AsyncBufReader<R> { 58 AsyncBufReader::with_capacity(DEFAULT_BUF_SIZE, inner) 59 } 60 61 /// Creates a new `AsyncBufReader<R>` with a specific buffer capacity. 62 /// 63 /// # Examples 64 /// 65 /// ```no run 66 /// use ylong_runtime::fs::File; 67 /// 68 /// async fn main() -> std::io::Result<()> { 69 /// use ylong_runtime::io::AsyncBufReader; 70 /// let f = File::open("test.txt").await?; 71 /// let reader = AsyncBufReader::with_capacity(1000, f); 72 /// Ok(()) 73 /// } with_capacitynull74 pub fn with_capacity(capacity: usize, inner: R) -> AsyncBufReader<R> { 75 AsyncBufReader { 76 inner, 77 buf: vec![0; capacity].into_boxed_slice(), 78 pos: 0, 79 filled: 0, 80 } 81 } 82 } 83 84 impl<R> AsyncBufReader<R> { 85 /// Gets a reference to the underlying reader. 86 /// 87 /// # Examples 88 /// 89 /// ```no run 90 /// use ylong_runtime::fs::File; 91 /// 92 /// async fn main() -> std::io::Result<()> { 93 /// use ylong_runtime::io::AsyncBufReader; 94 /// let f = File::open("test.txt").await?; 95 /// let reader = AsyncBufReader::new(f); 96 /// let reader_ref = reader.get_ref(); 97 /// Ok(()) 98 /// } 99 /// ``` get_refnull100 pub fn get_ref(&self) -> &R { 101 &self.inner 102 } 103 104 /// Gets the mutable reference to the underlying reader. 105 /// 106 /// # Examples 107 /// 108 /// ```no run 109 /// use ylong_runtime::fs::File; 110 /// 111 /// async fn main() -> std::io::Result<()> { 112 /// use ylong_runtime::io::AsyncBufReader; 113 /// let f = File::open("test.txt").await?; 114 /// let mut reader = AsyncBufReader::new(f); 115 /// let reader_ref = reader.get_mut(); 116 /// Ok(()) 117 /// } 118 /// ``` get_mutnull119 pub fn get_mut(&mut self) -> &mut R { 120 &mut self.inner 121 } 122 123 /// Returns a reference to the internally buffered data. 124 /// 125 /// Only returns the filled part of the buffer instead of the whole buffer. 126 /// 127 /// # Examples 128 /// 129 /// ```no run 130 /// use ylong_runtime::fs::File; 131 /// 132 /// async fn main() -> std::io::Result<()> { 133 /// use ylong_runtime::io::AsyncBufReader; 134 /// let f = File::open("test.txt").await?; 135 /// let reader = AsyncBufReader::new(f); 136 /// let read_buf = reader.buffer(); 137 /// assert!(read_buf.is_empty()); 138 /// Ok(()) 139 /// } 140 /// ``` buffernull141 pub fn buffer(&self) -> &[u8] { 142 &self.buf[self.pos..self.filled] 143 } 144 145 /// Returns the capacity of the internal buffer. 146 /// 147 /// # Examples 148 /// 149 /// ```no run 150 /// use ylong_runtime::fs::File; 151 /// 152 /// async fn main() -> std::io::Result<()> { 153 /// use ylong_runtime::io::AsyncBufReader; 154 /// let f = File::open("test.txt").await?; 155 /// let reader = AsyncBufReader::with_capacity(10, f); 156 /// let capacity = reader.capacity(); 157 /// assert_eq!(capacity, 10); 158 /// Ok(()) 159 /// } 160 /// ``` capacitynull161 pub fn capacity(&self) -> usize { 162 self.buf.len() 163 } 164 165 /// Unwraps this `AsyncBufReader<R>`, returning the underlying reader. 166 /// 167 /// Any leftover data inside the internal buffer of the `AsyncBufReader` is 168 /// lost. into_innernull169 pub fn into_inner(self) -> R { 170 self.inner 171 } 172 173 /// Invalidates all data in the internal buffer. discard_buffernull174 fn discard_buffer(&mut self) { 175 self.pos = 0; 176 self.filled = 0; 177 } 178 } 179 180 impl<R: AsyncRead> AsyncRead for AsyncBufReader<R> { poll_readnull181 fn poll_read( 182 mut self: Pin<&mut Self>, 183 cx: &mut Context<'_>, 184 buf: &mut ReadBuf<'_>, 185 ) -> Poll<std::io::Result<()>> { 186 if self.filled == self.pos && buf.remaining() >= self.buf.len() { 187 let this = unsafe { self.get_unchecked_mut() }; 188 this.discard_buffer(); 189 return unsafe { Pin::new_unchecked(&mut this.inner).poll_read(cx, buf) }; 190 } 191 let rem = poll_ready!(self.as_mut().poll_fill_buf(cx))?; 192 let r_len = cmp::min(rem.len(), buf.remaining()); 193 buf.append(&rem[..r_len]); 194 self.as_mut().consume(r_len); 195 196 Poll::Ready(Ok(())) 197 } 198 } 199 200 impl<R: AsyncRead> AsyncBufRead for AsyncBufReader<R> { poll_fill_bufnull201 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> { 202 let this = unsafe { self.get_unchecked_mut() }; 203 if this.pos >= this.filled { 204 let mut read_buf = ReadBuf::new(&mut this.buf); 205 unsafe { 206 poll_ready!(Pin::new_unchecked(&mut this.inner).poll_read(cx, &mut read_buf))?; 207 } 208 this.pos = 0; 209 this.filled = read_buf.filled_len(); 210 } 211 Poll::Ready(Ok(&this.buf[this.pos..this.filled])) 212 } 213 consumenull214 fn consume(self: Pin<&mut Self>, amt: usize) { 215 let this = unsafe { self.get_unchecked_mut() }; 216 this.pos = cmp::min(this.pos + amt, this.filled); 217 } 218 } 219 220 impl<R: AsyncRead + AsyncSeek> AsyncSeek for AsyncBufReader<R> { poll_seeknull221 fn poll_seek( 222 self: Pin<&mut Self>, 223 cx: &mut Context<'_>, 224 pos: SeekFrom, 225 ) -> Poll<std::io::Result<u64>> { 226 let this = unsafe { self.get_unchecked_mut() }; 227 if let SeekFrom::Current(n) = pos { 228 let remainder = (this.filled - this.pos) as i64; 229 if let Some(offset) = n.checked_sub(remainder) { 230 let res = unsafe { 231 poll_ready!(Pin::new_unchecked(&mut this.inner) 232 .poll_seek(cx, SeekFrom::Current(offset)))? 233 }; 234 this.discard_buffer(); 235 return Poll::Ready(Ok(res)); 236 } else { 237 unsafe { 238 poll_ready!(Pin::new_unchecked(&mut this.inner) 239 .poll_seek(cx, SeekFrom::Current(-remainder)))?; 240 this.discard_buffer(); 241 } 242 } 243 } 244 245 let res = unsafe { poll_ready!(Pin::new_unchecked(&mut this.inner).poll_seek(cx, pos))? }; 246 this.discard_buffer(); 247 Poll::Ready(Ok(res)) 248 } 249 } 250 251 impl<R: AsyncRead + AsyncWrite> AsyncWrite for AsyncBufReader<R> { poll_writenull252 fn poll_write( 253 self: Pin<&mut Self>, 254 cx: &mut Context<'_>, 255 buf: &[u8], 256 ) -> Poll<std::io::Result<usize>> { 257 let this = unsafe { self.get_unchecked_mut() }; 258 unsafe { Pin::new_unchecked(&mut this.inner).poll_write(cx, buf) } 259 } 260 poll_write_vectorednull261 fn poll_write_vectored( 262 self: Pin<&mut Self>, 263 cx: &mut Context<'_>, 264 bufs: &[IoSlice<'_>], 265 ) -> Poll<std::io::Result<usize>> { 266 let this = unsafe { self.get_unchecked_mut() }; 267 unsafe { Pin::new_unchecked(&mut this.inner).poll_write_vectored(cx, bufs) } 268 } 269 is_write_vectorednull270 fn is_write_vectored(&self) -> bool { 271 self.inner.is_write_vectored() 272 } 273 poll_flushnull274 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> { 275 let this = unsafe { self.get_unchecked_mut() }; 276 unsafe { Pin::new_unchecked(&mut this.inner).poll_flush(cx) } 277 } 278 poll_shutdownnull279 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> { 280 let this = unsafe { self.get_unchecked_mut() }; 281 unsafe { Pin::new_unchecked(&mut this.inner).poll_shutdown(cx) } 282 } 283 } 284 285 #[cfg(test)] 286 mod test { 287 use crate::io::AsyncBufReader; 288 289 /// UT test cases for `AsyncBufReader`. 290 /// 291 /// # Brief 292 /// 1. create a `AsyncBufReader`. 293 /// 2. check pos and filled. 294 /// 3. set pos and filled. 295 /// 4. call `discard_buffer()` function. 296 /// 5. check pos and filled. 297 #[test] ut_test_stdio_basicnull298 fn ut_test_stdio_basic() { 299 let stdin = crate::io::stdin(); 300 let mut buf = AsyncBufReader::new(stdin); 301 assert_eq!(buf.pos, 0); 302 assert_eq!(buf.filled, 0); 303 buf.pos = 1; 304 buf.filled = 1; 305 buf.discard_buffer(); 306 assert_eq!(buf.pos, 0); 307 assert_eq!(buf.filled, 0); 308 } 309 } 310