1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::cmp;
15 use std::io::{IoSlice, SeekFrom};
16 use std::pin::Pin;
17 use std::task::{Context, Poll};
18 
19 use crate::io::async_buf_read::AsyncBufRead;
20 use crate::io::buffered::DEFAULT_BUF_SIZE;
21 use crate::io::{poll_ready, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
22 
23 /// This is an asynchronous version of [`std::io::BufReader`]
24 ///
25 /// The `AsyncBufReader<R>` struct adds buffering to any reader that implements
26 /// AsyncRead. It is suitable to perform large, infrequent reads on the
27 /// underlying [`AsyncRead`] object and maintains an in-memory buffer of the
28 /// results.
29 ///
30 /// When the `AsyncBufReader<R>` is dropped, the contents inside its buffer will
31 /// be discarded. Creating multiple instances of `AsyncBufReader<R>` on the same
32 /// [`AsyncRead`] stream may cause data loss.
33 pub struct AsyncBufReader<R> {
34     inner: R,
35     buf: Box<[u8]>,
36     pos: usize,
37     filled: usize,
38 }
39 
40 impl<R: AsyncRead> AsyncBufReader<R> {
41     /// Creates a new `AsyncBufReader<R>` with a default buffer capacity.
42     /// The default buffer capacity is 8 KB, which is the same as
43     /// [`std::io::BufReader`]
44     ///
45     /// # Examples
46     ///
47     /// ```no run
48     /// use ylong_runtime::fs::File;
49     ///
50     /// async fn main() -> std::io::Result<()> {
51     ///     use ylong_runtime::io::AsyncBufReader;
52     ///     let f = File::open("test.txt").await?;
53     ///     let reader = AsyncBufReader::new(f);
54     ///     Ok(())
55     /// }
56     /// ```
newnull57     pub fn new(inner: R) -> AsyncBufReader<R> {
58         AsyncBufReader::with_capacity(DEFAULT_BUF_SIZE, inner)
59     }
60 
61     /// Creates a new `AsyncBufReader<R>` with a specific buffer capacity.
62     ///
63     /// # Examples
64     ///
65     /// ```no run
66     /// use ylong_runtime::fs::File;
67     ///
68     /// async fn main() -> std::io::Result<()> {
69     ///     use ylong_runtime::io::AsyncBufReader;
70     ///     let f = File::open("test.txt").await?;
71     ///     let reader = AsyncBufReader::with_capacity(1000, f);
72     ///     Ok(())
73     /// }
with_capacitynull74     pub fn with_capacity(capacity: usize, inner: R) -> AsyncBufReader<R> {
75         AsyncBufReader {
76             inner,
77             buf: vec![0; capacity].into_boxed_slice(),
78             pos: 0,
79             filled: 0,
80         }
81     }
82 }
83 
84 impl<R> AsyncBufReader<R> {
85     /// Gets a reference to the underlying reader.
86     ///
87     /// # Examples
88     ///
89     /// ```no run
90     /// use ylong_runtime::fs::File;
91     ///
92     /// async fn main() -> std::io::Result<()> {
93     ///     use ylong_runtime::io::AsyncBufReader;
94     ///     let f = File::open("test.txt").await?;
95     ///     let reader = AsyncBufReader::new(f);
96     ///     let reader_ref = reader.get_ref();
97     ///     Ok(())
98     /// }
99     /// ```
get_refnull100     pub fn get_ref(&self) -> &R {
101         &self.inner
102     }
103 
104     /// Gets the mutable reference to the underlying reader.
105     ///
106     /// # Examples
107     ///
108     /// ```no run
109     /// use ylong_runtime::fs::File;
110     ///
111     /// async fn main() -> std::io::Result<()> {
112     ///     use ylong_runtime::io::AsyncBufReader;
113     ///     let f = File::open("test.txt").await?;
114     ///     let mut reader = AsyncBufReader::new(f);
115     ///     let reader_ref = reader.get_mut();
116     ///     Ok(())
117     /// }
118     /// ```
get_mutnull119     pub fn get_mut(&mut self) -> &mut R {
120         &mut self.inner
121     }
122 
123     /// Returns a reference to the internally buffered data.
124     ///
125     /// Only returns the filled part of the buffer instead of the whole buffer.
126     ///
127     /// # Examples
128     ///
129     /// ```no run
130     /// use ylong_runtime::fs::File;
131     ///
132     /// async fn main() -> std::io::Result<()> {
133     ///     use ylong_runtime::io::AsyncBufReader;
134     ///     let f = File::open("test.txt").await?;
135     ///     let reader = AsyncBufReader::new(f);
136     ///     let read_buf = reader.buffer();
137     ///     assert!(read_buf.is_empty());
138     ///     Ok(())
139     /// }
140     /// ```
buffernull141     pub fn buffer(&self) -> &[u8] {
142         &self.buf[self.pos..self.filled]
143     }
144 
145     /// Returns the capacity of the internal buffer.
146     ///
147     /// # Examples
148     ///
149     /// ```no run
150     /// use ylong_runtime::fs::File;
151     ///
152     /// async fn main() -> std::io::Result<()> {
153     ///     use ylong_runtime::io::AsyncBufReader;
154     ///     let f = File::open("test.txt").await?;
155     ///     let reader = AsyncBufReader::with_capacity(10, f);
156     ///     let capacity = reader.capacity();
157     ///     assert_eq!(capacity, 10);
158     ///     Ok(())
159     /// }
160     /// ```
capacitynull161     pub fn capacity(&self) -> usize {
162         self.buf.len()
163     }
164 
165     /// Unwraps this `AsyncBufReader<R>`, returning the underlying reader.
166     ///
167     /// Any leftover data inside the internal buffer of the `AsyncBufReader` is
168     /// lost.
into_innernull169     pub fn into_inner(self) -> R {
170         self.inner
171     }
172 
173     /// Invalidates all data in the internal buffer.
discard_buffernull174     fn discard_buffer(&mut self) {
175         self.pos = 0;
176         self.filled = 0;
177     }
178 }
179 
180 impl<R: AsyncRead> AsyncRead for AsyncBufReader<R> {
poll_readnull181     fn poll_read(
182         mut self: Pin<&mut Self>,
183         cx: &mut Context<'_>,
184         buf: &mut ReadBuf<'_>,
185     ) -> Poll<std::io::Result<()>> {
186         if self.filled == self.pos && buf.remaining() >= self.buf.len() {
187             let this = unsafe { self.get_unchecked_mut() };
188             this.discard_buffer();
189             return unsafe { Pin::new_unchecked(&mut this.inner).poll_read(cx, buf) };
190         }
191         let rem = poll_ready!(self.as_mut().poll_fill_buf(cx))?;
192         let r_len = cmp::min(rem.len(), buf.remaining());
193         buf.append(&rem[..r_len]);
194         self.as_mut().consume(r_len);
195 
196         Poll::Ready(Ok(()))
197     }
198 }
199 
200 impl<R: AsyncRead> AsyncBufRead for AsyncBufReader<R> {
poll_fill_bufnull201     fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
202         let this = unsafe { self.get_unchecked_mut() };
203         if this.pos >= this.filled {
204             let mut read_buf = ReadBuf::new(&mut this.buf);
205             unsafe {
206                 poll_ready!(Pin::new_unchecked(&mut this.inner).poll_read(cx, &mut read_buf))?;
207             }
208             this.pos = 0;
209             this.filled = read_buf.filled_len();
210         }
211         Poll::Ready(Ok(&this.buf[this.pos..this.filled]))
212     }
213 
consumenull214     fn consume(self: Pin<&mut Self>, amt: usize) {
215         let this = unsafe { self.get_unchecked_mut() };
216         this.pos = cmp::min(this.pos + amt, this.filled);
217     }
218 }
219 
220 impl<R: AsyncRead + AsyncSeek> AsyncSeek for AsyncBufReader<R> {
poll_seeknull221     fn poll_seek(
222         self: Pin<&mut Self>,
223         cx: &mut Context<'_>,
224         pos: SeekFrom,
225     ) -> Poll<std::io::Result<u64>> {
226         let this = unsafe { self.get_unchecked_mut() };
227         if let SeekFrom::Current(n) = pos {
228             let remainder = (this.filled - this.pos) as i64;
229             if let Some(offset) = n.checked_sub(remainder) {
230                 let res = unsafe {
231                     poll_ready!(Pin::new_unchecked(&mut this.inner)
232                         .poll_seek(cx, SeekFrom::Current(offset)))?
233                 };
234                 this.discard_buffer();
235                 return Poll::Ready(Ok(res));
236             } else {
237                 unsafe {
238                     poll_ready!(Pin::new_unchecked(&mut this.inner)
239                         .poll_seek(cx, SeekFrom::Current(-remainder)))?;
240                     this.discard_buffer();
241                 }
242             }
243         }
244 
245         let res = unsafe { poll_ready!(Pin::new_unchecked(&mut this.inner).poll_seek(cx, pos))? };
246         this.discard_buffer();
247         Poll::Ready(Ok(res))
248     }
249 }
250 
251 impl<R: AsyncRead + AsyncWrite> AsyncWrite for AsyncBufReader<R> {
poll_writenull252     fn poll_write(
253         self: Pin<&mut Self>,
254         cx: &mut Context<'_>,
255         buf: &[u8],
256     ) -> Poll<std::io::Result<usize>> {
257         let this = unsafe { self.get_unchecked_mut() };
258         unsafe { Pin::new_unchecked(&mut this.inner).poll_write(cx, buf) }
259     }
260 
poll_write_vectorednull261     fn poll_write_vectored(
262         self: Pin<&mut Self>,
263         cx: &mut Context<'_>,
264         bufs: &[IoSlice<'_>],
265     ) -> Poll<std::io::Result<usize>> {
266         let this = unsafe { self.get_unchecked_mut() };
267         unsafe { Pin::new_unchecked(&mut this.inner).poll_write_vectored(cx, bufs) }
268     }
269 
is_write_vectorednull270     fn is_write_vectored(&self) -> bool {
271         self.inner.is_write_vectored()
272     }
273 
poll_flushnull274     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
275         let this = unsafe { self.get_unchecked_mut() };
276         unsafe { Pin::new_unchecked(&mut this.inner).poll_flush(cx) }
277     }
278 
poll_shutdownnull279     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
280         let this = unsafe { self.get_unchecked_mut() };
281         unsafe { Pin::new_unchecked(&mut this.inner).poll_shutdown(cx) }
282     }
283 }
284 
285 #[cfg(test)]
286 mod test {
287     use crate::io::AsyncBufReader;
288 
289     /// UT test cases for `AsyncBufReader`.
290     ///
291     /// # Brief
292     /// 1. create a `AsyncBufReader`.
293     /// 2. check pos and filled.
294     /// 3. set pos and filled.
295     /// 4. call `discard_buffer()` function.
296     /// 5. check pos and filled.
297     #[test]
ut_test_stdio_basicnull298     fn ut_test_stdio_basic() {
299         let stdin = crate::io::stdin();
300         let mut buf = AsyncBufReader::new(stdin);
301         assert_eq!(buf.pos, 0);
302         assert_eq!(buf.filled, 0);
303         buf.pos = 1;
304         buf.filled = 1;
305         buf.discard_buffer();
306         assert_eq!(buf.pos, 0);
307         assert_eq!(buf.filled, 0);
308     }
309 }
310