1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use core::pin::Pin;
15 use core::task::{Context, Poll};
16 use std::future::Future;
17 use std::io::{Cursor, Read};
18 use std::sync::Arc;
19 
20 use ylong_http::body::async_impl::Body;
21 use ylong_http::body::TextBodyDecoder;
22 #[cfg(feature = "http1_1")]
23 use ylong_http::body::{ChunkBodyDecoder, ChunkState};
24 use ylong_http::headers::Headers;
25 
26 use super::conn::StreamData;
27 use crate::async_impl::interceptor::Interceptors;
28 use crate::error::{ErrorKind, HttpClientError};
29 use crate::runtime::{AsyncRead, ReadBuf, Sleep};
30 use crate::util::normalizer::BodyLength;
31 
32 const TRAILER_SIZE: usize = 1024;
33 
34 /// `HttpBody` is the body part of the `Response` returned by `Client::request`.
35 /// `HttpBody` implements `Body` trait, so users can call related methods to get
36 /// body data.
37 ///
38 /// # Examples
39 ///
40 /// ```no_run
41 /// use ylong_http_client::async_impl::{Body, Client, HttpBody, Request};
42 /// use ylong_http_client::HttpClientError;
43 ///
44 /// async fn read_body() -> Result<(), HttpClientError> {
45 ///     let client = Client::new();
46 ///
47 ///     // `HttpBody` is the body part of `response`.
48 ///     let mut response = client
49 ///         .request(Request::builder().body(Body::empty())?)
50 ///         .await?;
51 ///
52 ///     // Users can use `Body::data` to get body data.
53 ///     let mut buf = [0u8; 1024];
54 ///     loop {
55 ///         let size = response.data(&mut buf).await.unwrap();
56 ///         if size == 0 {
57 ///             break;
58 ///         }
59 ///         let _data = &buf[..size];
60 ///         // Deals with the data.
61 ///     }
62 ///     Ok(())
63 /// }
64 /// ```
65 pub struct HttpBody {
66     kind: Kind,
67     sleep: Option<Pin<Box<Sleep>>>,
68 }
69 
70 type BoxStreamData = Box<dyn StreamData + Sync + Send + Unpin>;
71 
72 impl HttpBody {
73     pub(crate) fn new(
74         interceptors: Arc<Interceptors>,
75         body_length: BodyLength,
76         io: BoxStreamData,
77         pre: &[u8],
78     ) -> Result<Self, HttpClientError> {
79         let kind = match body_length {
80             BodyLength::Empty => {
81                 if !pre.is_empty() {
82                     // TODO: Consider the case where BodyLength is empty but pre is not empty.
83                     io.shutdown();
84                     return err_from_msg!(Request, "Body length is 0 but read extra data");
85                 }
86                 Kind::Empty
87             }
88             BodyLength::Length(len) => Kind::Text(Text::new(len, pre, io, interceptors)),
89             BodyLength::UntilClose => Kind::UntilClose(UntilClose::new(pre, io, interceptors)),
90 
91             #[cfg(feature = "http1_1")]
92             BodyLength::Chunk => Kind::Chunk(Chunk::new(pre, io, interceptors)),
93         };
94         Ok(Self { kind, sleep: None })
95     }
96 
97     pub(crate) fn set_sleep(&mut self, sleep: Option<Pin<Box<Sleep>>>) {
98         self.sleep = sleep;
99     }
100 }
101 
102 impl Body for HttpBody {
103     type Error = HttpClientError;
104 
poll_datanull105     fn poll_data(
106         mut self: Pin<&mut Self>,
107         cx: &mut Context<'_>,
108         buf: &mut [u8],
109     ) -> Poll<Result<usize, Self::Error>> {
110         if buf.is_empty() {
111             return Poll::Ready(Ok(0));
112         }
113 
114         if let Some(delay) = self.sleep.as_mut() {
115             if let Poll::Ready(()) = Pin::new(delay).poll(cx) {
116                 return Poll::Ready(err_from_io!(Timeout, std::io::ErrorKind::TimedOut.into()));
117             }
118         }
119 
120         match self.kind {
121             Kind::Empty => Poll::Ready(Ok(0)),
122             Kind::Text(ref mut text) => text.data(cx, buf),
123             Kind::UntilClose(ref mut until_close) => until_close.data(cx, buf),
124             #[cfg(feature = "http1_1")]
125             Kind::Chunk(ref mut chunk) => chunk.data(cx, buf),
126         }
127     }
128 
poll_trailernull129     fn poll_trailer(
130         mut self: Pin<&mut Self>,
131         cx: &mut Context<'_>,
132     ) -> Poll<Result<Option<Headers>, Self::Error>> {
133         // Get trailer data from io
134         if let Some(delay) = self.sleep.as_mut() {
135             if let Poll::Ready(()) = Pin::new(delay).poll(cx) {
136                 return Poll::Ready(err_from_msg!(Timeout, "Request timeout"));
137             }
138         }
139 
140         let mut read_buf = [0_u8; TRAILER_SIZE];
141 
142         match self.kind {
143             #[cfg(feature = "http1_1")]
144             Kind::Chunk(ref mut chunk) => {
145                 match chunk.data(cx, &mut read_buf) {
146                     Poll::Ready(Ok(_)) => {}
147                     Poll::Pending => {
148                         return Poll::Pending;
149                     }
150                     Poll::Ready(Err(e)) => {
151                         return Poll::Ready(Err(e));
152                     }
153                 }
154                 Poll::Ready(Ok(chunk.decoder.get_trailer().map_err(|e| {
155                     HttpClientError::from_error(ErrorKind::BodyDecode, e)
156                 })?))
157             }
158             _ => Poll::Ready(Ok(None)),
159         }
160     }
161 }
162 
163 impl Drop for HttpBody {
dropnull164     fn drop(&mut self) {
165         let io = match self.kind {
166             Kind::Text(ref mut text) => text.io.as_mut(),
167             #[cfg(feature = "http1_1")]
168             Kind::Chunk(ref mut chunk) => chunk.io.as_mut(),
169             Kind::UntilClose(ref mut until_close) => until_close.io.as_mut(),
170             _ => None,
171         };
172         // If response body is not totally read, shutdown io.
173         if let Some(io) = io {
174             io.shutdown()
175         }
176     }
177 }
178 
179 // TODO: `TextBodyDecoder` implementation and `ChunkBodyDecoder` implementation.
180 enum Kind {
181     Empty,
182     Text(Text),
183     #[cfg(feature = "http1_1")]
184     Chunk(Chunk),
185     UntilClose(UntilClose),
186 }
187 
188 struct UntilClose {
189     interceptors: Arc<Interceptors>,
190     pre: Option<Cursor<Vec<u8>>>,
191     io: Option<BoxStreamData>,
192 }
193 
194 impl UntilClose {
195     pub(crate) fn new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self {
196         Self {
197             interceptors,
198             pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())),
199             io: Some(io),
200         }
201     }
202 
datanull203     fn data(
204         &mut self,
205         cx: &mut Context<'_>,
206         buf: &mut [u8],
207     ) -> Poll<Result<usize, HttpClientError>> {
208         if buf.is_empty() {
209             return Poll::Ready(Ok(0));
210         }
211         let mut read = 0;
212         if let Some(pre) = self.pre.as_mut() {
213             // Here cursor read never failed.
214             let this_read = Read::read(pre, buf).unwrap();
215             if this_read == 0 {
216                 self.pre = None;
217             } else {
218                 read += this_read;
219             }
220         }
221 
222         if !buf[read..].is_empty() {
223             if let Some(io) = self.io.take() {
224                 return self.poll_read_io(cx, io, read, buf);
225             }
226         }
227         Poll::Ready(Ok(read))
228     }
229 
poll_read_ionull230     fn poll_read_io(
231         &mut self,
232         cx: &mut Context<'_>,
233         mut io: BoxStreamData,
234         read: usize,
235         buf: &mut [u8],
236     ) -> Poll<Result<usize, HttpClientError>> {
237         let mut read = read;
238         let mut read_buf = ReadBuf::new(&mut buf[read..]);
239         match Pin::new(&mut io).poll_read(cx, &mut read_buf) {
240             // Disconnected.
241             Poll::Ready(Ok(())) => {
242                 let filled = read_buf.filled().len();
243                 if filled == 0 {
244                     io.shutdown();
245                 } else {
246                     self.interceptors
247                         .intercept_output(&buf[read..(read + filled)])?;
248                     self.io = Some(io);
249                 }
250                 read += filled;
251                 Poll::Ready(Ok(read))
252             }
253             Poll::Pending => {
254                 self.io = Some(io);
255                 if read != 0 {
256                     return Poll::Ready(Ok(read));
257                 }
258                 Poll::Pending
259             }
260             Poll::Ready(Err(e)) => {
261                 // If IO error occurs, shutdowns `io` before return.
262                 io.shutdown();
263                 Poll::Ready(err_from_io!(BodyTransfer, e))
264             }
265         }
266     }
267 }
268 
269 struct Text {
270     interceptors: Arc<Interceptors>,
271     decoder: TextBodyDecoder,
272     pre: Option<Cursor<Vec<u8>>>,
273     io: Option<BoxStreamData>,
274 }
275 
276 impl Text {
277     pub(crate) fn new(
278         len: u64,
279         pre: &[u8],
280         io: BoxStreamData,
281         interceptors: Arc<Interceptors>,
282     ) -> Self {
283         Self {
284             interceptors,
285             decoder: TextBodyDecoder::new(len),
286             pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())),
287             io: Some(io),
288         }
289     }
290 }
291 
292 impl Text {
datanull293     fn data(
294         &mut self,
295         cx: &mut Context<'_>,
296         buf: &mut [u8],
297     ) -> Poll<Result<usize, HttpClientError>> {
298         if buf.is_empty() {
299             return Poll::Ready(Ok(0));
300         }
301 
302         let mut read = 0;
303 
304         if let Some(pre) = self.pre.as_mut() {
305             // Here cursor read never failed.
306             let this_read = Read::read(pre, buf).unwrap();
307             if this_read == 0 {
308                 self.pre = None;
309             } else {
310                 read += this_read;
311                 if let Some(result) = self.read_remaining(buf, read) {
312                     return result;
313                 }
314             }
315         }
316 
317         if !buf[read..].is_empty() {
318             if let Some(io) = self.io.take() {
319                 return self.poll_read_io(cx, buf, io, read);
320             }
321         }
322         Poll::Ready(Ok(read))
323     }
324 
read_remainingnull325     fn read_remaining(
326         &mut self,
327         buf: &mut [u8],
328         read: usize,
329     ) -> Option<Poll<Result<usize, HttpClientError>>> {
330         let (text, rem) = self.decoder.decode(&buf[..read]);
331 
332         // Contains redundant `rem`, return error.
333         match (text.is_complete(), rem.is_empty()) {
334             (true, false) => {
335                 if let Some(io) = self.io.take() {
336                     io.shutdown();
337                 };
338                 Some(Poll::Ready(err_from_msg!(BodyDecode, "Not eof")))
339             }
340             (true, true) => {
341                 if let Some(io) = self.io.take() {
342                     // stream not closed, waiting for the fin
343                     if !io.is_stream_closable() {
344                         self.io = Some(io);
345                     }
346                 }
347                 Some(Poll::Ready(Ok(read)))
348             }
349             // TextBodyDecoder decodes as much as possible here.
350             _ => None,
351         }
352     }
353 
poll_read_ionull354     fn poll_read_io(
355         &mut self,
356         cx: &mut Context<'_>,
357         buf: &mut [u8],
358         mut io: BoxStreamData,
359         read: usize,
360     ) -> Poll<Result<usize, HttpClientError>> {
361         let mut read = read;
362         let mut read_buf = ReadBuf::new(&mut buf[read..]);
363         match Pin::new(&mut io).poll_read(cx, &mut read_buf) {
364             // Disconnected.
365             Poll::Ready(Ok(())) => {
366                 let filled = read_buf.filled().len();
367                 if filled == 0 {
368                     // stream closed, and get the fin
369                     if io.is_stream_closable() && self.decoder.decode(&buf[..0]).0.is_complete() {
370                         return Poll::Ready(Ok(0));
371                     }
372                     io.shutdown();
373                     return Poll::Ready(err_from_msg!(BodyDecode, "Response body incomplete"));
374                 }
375                 let (text, rem) = self.decoder.decode(read_buf.filled());
376                 self.interceptors.intercept_output(read_buf.filled())?;
377                 read += filled;
378                 // Contains redundant `rem`, return error.
379                 match (text.is_complete(), rem.is_empty()) {
380                     (true, false) => {
381                         io.shutdown();
382                         Poll::Ready(err_from_msg!(BodyDecode, "Not eof"))
383                     }
384                     (true, true) => {
385                         if !io.is_stream_closable() {
386                             // stream not closed, waiting for the fin
387                             self.io = Some(io);
388                         }
389                         Poll::Ready(Ok(read))
390                     }
391                     _ => {
392                         self.io = Some(io);
393                         Poll::Ready(Ok(read))
394                     }
395                 }
396             }
397             Poll::Pending => {
398                 self.io = Some(io);
399                 if read != 0 {
400                     return Poll::Ready(Ok(read));
401                 }
402                 Poll::Pending
403             }
404             Poll::Ready(Err(e)) => {
405                 // If IO error occurs, shutdowns `io` before return.
406                 io.shutdown();
407                 Poll::Ready(err_from_io!(BodyDecode, e))
408             }
409         }
410     }
411 }
412 
413 #[cfg(feature = "http1_1")]
414 struct Chunk {
415     interceptors: Arc<Interceptors>,
416     decoder: ChunkBodyDecoder,
417     pre: Option<Cursor<Vec<u8>>>,
418     io: Option<BoxStreamData>,
419 }
420 
421 #[cfg(feature = "http1_1")]
422 impl Chunk {
423     pub(crate) fn new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self {
424         Self {
425             interceptors,
426             decoder: ChunkBodyDecoder::new().contains_trailer(true),
427             pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())),
428             io: Some(io),
429         }
430     }
431 }
432 
433 #[cfg(feature = "http1_1")]
434 impl Chunk {
datanull435     fn data(
436         &mut self,
437         cx: &mut Context<'_>,
438         buf: &mut [u8],
439     ) -> Poll<Result<usize, HttpClientError>> {
440         if buf.is_empty() {
441             return Poll::Ready(Ok(0));
442         }
443 
444         let mut read = 0;
445 
446         while let Some(pre) = self.pre.as_mut() {
447             // Here cursor read never failed.
448             let size = Read::read(pre, &mut buf[read..]).unwrap();
449             if size == 0 {
450                 self.pre = None;
451             }
452 
453             let (size, flag) = self.merge_chunks(&mut buf[read..read + size])?;
454             read += size;
455 
456             if flag {
457                 // Return if we find a 0-sized chunk.
458                 self.io = None;
459                 return Poll::Ready(Ok(read));
460             } else if read != 0 {
461                 // Return if we get some data.
462                 return Poll::Ready(Ok(read));
463             }
464         }
465 
466         // Here `read` must be 0.
467         while let Some(mut io) = self.io.take() {
468             let mut read_buf = ReadBuf::new(&mut buf[read..]);
469             match Pin::new(&mut io).poll_read(cx, &mut read_buf) {
470                 Poll::Ready(Ok(())) => {
471                     let filled = read_buf.filled().len();
472                     if filled == 0 {
473                         io.shutdown();
474                         return Poll::Ready(err_from_msg!(BodyDecode, "Response body incomplete"));
475                     }
476                     let (size, flag) = self.merge_chunks(read_buf.filled_mut())?;
477                     self.interceptors.intercept_output(read_buf.filled_mut())?;
478                     read += size;
479                     if flag {
480                         // Return if we find a 0-sized chunk.
481                         // Return if we get some data.
482                         return Poll::Ready(Ok(read));
483                     }
484                     self.io = Some(io);
485                     if read != 0 {
486                         return Poll::Ready(Ok(read));
487                     }
488                 }
489                 Poll::Pending => {
490                     self.io = Some(io);
491                     return Poll::Pending;
492                 }
493                 Poll::Ready(Err(e)) => {
494                     // If IO error occurs, shutdowns `io` before return.
495                     io.shutdown();
496                     return Poll::Ready(err_from_io!(BodyDecode, e));
497                 }
498             }
499         }
500 
501         Poll::Ready(Ok(read))
502     }
503 
merge_chunksnull504     fn merge_chunks(&mut self, buf: &mut [u8]) -> Result<(usize, bool), HttpClientError> {
505         // Here we need to merge the chunks into one data block and return.
506         // The data arrangement in buf is as follows:
507         //
508         // data in buf:
509         // +------+------+------+------+------+------+------+
510         // | data | len  | data | len  |  ... | data |  len |
511         // +------+------+------+------+------+------+------+
512         //
513         // We need to merge these data blocks into one block:
514         //
515         // after merge:
516         // +---------------------------+
517         // |            data           |
518         // +---------------------------+
519 
520         let (chunks, junk) = self
521             .decoder
522             .decode(buf)
523             .map_err(|e| HttpClientError::from_error(ErrorKind::BodyDecode, e))?;
524 
525         let mut finished = false;
526         let mut ptrs = Vec::new();
527 
528         for chunk in chunks.into_iter() {
529             if chunk.trailer().is_some() {
530                 if chunk.state() == &ChunkState::Finish {
531                     finished = true;
532                 }
533             } else {
534                 if chunk.size() == 0 && chunk.state() != &ChunkState::MetaSize {
535                     finished = true;
536                     break;
537                 }
538                 let data = chunk.data();
539                 ptrs.push((data.as_ptr(), data.len()))
540             }
541         }
542 
543         if finished && !junk.is_empty() {
544             return err_from_msg!(BodyDecode, "Invalid chunk body");
545         }
546 
547         let start = buf.as_ptr();
548 
549         let mut idx = 0;
550         for (ptr, len) in ptrs.into_iter() {
551             let st = ptr as usize - start as usize;
552             let ed = st + len;
553             buf.copy_within(st..ed, idx);
554             idx += len;
555         }
556         Ok((idx, finished))
557     }
558 }
559 
560 #[cfg(feature = "ylong_base")]
561 #[cfg(test)]
562 mod ut_async_http_body {
563     use std::sync::Arc;
564 
565     use ylong_http::body::async_impl;
566 
567     use crate::async_impl::interceptor::IdleInterceptor;
568     use crate::async_impl::HttpBody;
569     use crate::util::normalizer::BodyLength;
570     use crate::ErrorKind;
571 
572     /// UT test cases for `HttpBody::trailer`.
573     ///
574     /// # Brief
575     /// 1. Creates a `HttpBody` by calling `HttpBody::new`.
576     /// 2. Calls `trailer` to get headers.
577     /// 3. Checks if the test result is correct.
578     #[test]
ut_asnyc_chunk_trailer_1null579     fn ut_asnyc_chunk_trailer_1() {
580         let handle = ylong_runtime::spawn(async move {
581             async_chunk_trailer_1().await;
582             async_chunk_trailer_2().await;
583         });
584         ylong_runtime::block_on(handle).unwrap();
585     }
586 
587     async fn async_chunk_trailer_1() {
588         let box_stream = Box::new("".as_bytes());
589         let chunk_body_bytes = "\
590             5\r\n\
591             hello\r\n\
592             C ; type = text ;end = !\r\n\
593             hello world!\r\n\
594             000; message = last\r\n\
595             accept:text/html\r\n\r\n\
596             ";
597         let mut chunk = HttpBody::new(
598             Arc::new(IdleInterceptor),
599             BodyLength::Chunk,
600             box_stream,
601             chunk_body_bytes.as_bytes(),
602         )
603         .unwrap();
604         let res = async_impl::Body::trailer(&mut chunk)
605             .await
606             .unwrap()
607             .unwrap();
608         assert_eq!(
609             res.get("accept").unwrap().to_string().unwrap(),
610             "text/html".to_string()
611         );
612         let box_stream = Box::new("".as_bytes());
613         let chunk_body_no_trailer_bytes = "\
614             5\r\n\
615             hello\r\n\
616             C ; type = text ;end = !\r\n\
617             hello world!\r\n\
618             0\r\n\r\n\
619             ";
620 
621         let mut chunk = HttpBody::new(
622             Arc::new(IdleInterceptor),
623             BodyLength::Chunk,
624             box_stream,
625             chunk_body_no_trailer_bytes.as_bytes(),
626         )
627         .unwrap();
628 
629         let mut buf = [0u8; 32];
630         // Read body part
631         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
632         assert_eq!(read, 5);
633         assert_eq!(&buf[..read], b"hello");
634         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
635         assert_eq!(read, 12);
636         assert_eq!(&buf[..read], b"hello world!");
637         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
638         assert_eq!(read, 0);
639         assert_eq!(&buf[..read], b"");
640         // try read trailer part
641         let res = async_impl::Body::trailer(&mut chunk).await.unwrap();
642         assert!(res.is_none());
643     }
644 
645     async fn async_chunk_trailer_2() {
646         let box_stream = Box::new("".as_bytes());
647         let chunk_body_bytes = "\
648             5\r\n\
649             hello\r\n\
650             C ; type = text ;end = !\r\n\
651             hello world!\r\n\
652             000; message = last\r\n\
653             Expires: Wed, 21 Oct 2015 07:27:00 GMT \r\n\r\n\
654             ";
655         let mut chunk = HttpBody::new(
656             Arc::new(IdleInterceptor),
657             BodyLength::Chunk,
658             box_stream,
659             chunk_body_bytes.as_bytes(),
660         )
661         .unwrap();
662         let res = async_impl::Body::trailer(&mut chunk)
663             .await
664             .unwrap()
665             .unwrap();
666         assert_eq!(
667             res.get("expires").unwrap().to_string().unwrap(),
668             "Wed, 21 Oct 2015 07:27:00 GMT".to_string()
669         );
670     }
671 
672     /// UT test cases for `Body::data`.
673     ///
674     /// # Brief
675     /// 1. Creates a chunk `HttpBody`.
676     /// 2. Calls `data` method get boxstream.
677     /// 3. Checks if data size is correct.
678     #[test]
ut_asnyc_http_body_chunk2null679     fn ut_asnyc_http_body_chunk2() {
680         let handle = ylong_runtime::spawn(async move {
681             http_body_chunk2().await;
682         });
683         ylong_runtime::block_on(handle).unwrap();
684     }
685 
686     async fn http_body_chunk2() {
687         let box_stream = Box::new(
688             "\
689             5\r\n\
690             hello\r\n\
691             C ; type = text ;end = !\r\n\
692             hello world!\r\n\
693             000; message = last\r\n\
694             accept:text/html\r\n\r\n\
695         "
696             .as_bytes(),
697         );
698         let chunk_body_bytes = "";
699         let mut chunk = HttpBody::new(
700             Arc::new(IdleInterceptor),
701             BodyLength::Chunk,
702             box_stream,
703             chunk_body_bytes.as_bytes(),
704         )
705         .unwrap();
706 
707         let mut buf = [0u8; 32];
708         // Read body part
709         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
710         assert_eq!(read, 5);
711 
712         let box_stream = Box::new("".as_bytes());
713         let chunk_body_no_trailer_bytes = "\
714             5\r\n\
715             hello\r\n\
716             C ; type = text ;end = !\r\n\
717             hello world!\r\n\
718             0\r\n\r\n\
719             ";
720 
721         let mut chunk = HttpBody::new(
722             Arc::new(IdleInterceptor),
723             BodyLength::Chunk,
724             box_stream,
725             chunk_body_no_trailer_bytes.as_bytes(),
726         )
727         .unwrap();
728 
729         let mut buf = [0u8; 32];
730         // Read body part
731         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
732         assert_eq!(read, 5);
733         assert_eq!(&buf[..read], b"hello");
734         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
735         assert_eq!(read, 12);
736         assert_eq!(&buf[..read], b"hello world!");
737         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
738         assert_eq!(read, 0);
739         assert_eq!(&buf[..read], b"");
740         let res = async_impl::Body::trailer(&mut chunk).await.unwrap();
741         assert!(res.is_none());
742     }
743 
744     /// UT test cases for `Body::data`.
745     ///
746     /// # Brief
747     /// 1. Creates a empty `HttpBody`.
748     /// 2. Calls `HttpBody::new` to create empty http body.
749     /// 3. Checks if http body is empty.
750     #[test]
http_body_empty_errnull751     fn http_body_empty_err() {
752         let box_stream = Box::new("".as_bytes());
753         let content_bytes = "hello";
754 
755         match HttpBody::new(
756             Arc::new(IdleInterceptor),
757             BodyLength::Empty,
758             box_stream,
759             content_bytes.as_bytes(),
760         ) {
761             Ok(_) => (),
762             Err(e) => assert_eq!(e.error_kind(), ErrorKind::Request),
763         }
764     }
765 
766     /// UT test cases for text `HttpBody::new`.
767     ///
768     /// # Brief
769     /// 1. Creates a text `HttpBody`.
770     /// 2. Calls `HttpBody::new` to create text http body.
771     /// 3. Checks if result is correct.
772     #[test]
ut_http_body_textnull773     fn ut_http_body_text() {
774         let handle = ylong_runtime::spawn(async move {
775             http_body_text().await;
776         });
777         ylong_runtime::block_on(handle).unwrap();
778     }
779 
780     async fn http_body_text() {
781         let box_stream = Box::new("hello world".as_bytes());
782         let content_bytes = "";
783 
784         let mut text = HttpBody::new(
785             Arc::new(IdleInterceptor),
786             BodyLength::Length(11),
787             box_stream,
788             content_bytes.as_bytes(),
789         )
790         .unwrap();
791 
792         let mut buf = [0u8; 5];
793         // Read body part
794         let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap();
795         assert_eq!(read, 5);
796         let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap();
797         assert_eq!(read, 5);
798         let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap();
799         assert_eq!(read, 1);
800         let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap();
801         assert_eq!(read, 0);
802 
803         let box_stream = Box::new("".as_bytes());
804         let content_bytes = "hello";
805 
806         let mut text = HttpBody::new(
807             Arc::new(IdleInterceptor),
808             BodyLength::Length(5),
809             box_stream,
810             content_bytes.as_bytes(),
811         )
812         .unwrap();
813 
814         let mut buf = [0u8; 32];
815         // Read body part
816         let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap();
817         assert_eq!(read, 5);
818         let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap();
819         assert_eq!(read, 0);
820     }
821 
822     /// UT test cases for until_close `HttpBody::new`.
823     ///
824     /// # Brief
825     /// 1. Creates a until_close `HttpBody`.
826     /// 2. Calls `HttpBody::new` to create until_close http body.
827     /// 3. Checks if result is correct.
828     #[test]
ut_http_body_until_closenull829     fn ut_http_body_until_close() {
830         let handle = ylong_runtime::spawn(async move {
831             http_body_until_close().await;
832         });
833         ylong_runtime::block_on(handle).unwrap();
834     }
835 
836     async fn http_body_until_close() {
837         let box_stream = Box::new("hello world".as_bytes());
838         let content_bytes = "";
839 
840         let mut until_close = HttpBody::new(
841             Arc::new(IdleInterceptor),
842             BodyLength::UntilClose,
843             box_stream,
844             content_bytes.as_bytes(),
845         )
846         .unwrap();
847 
848         let mut buf = [0u8; 5];
849         // Read body part
850         let read = async_impl::Body::data(&mut until_close, &mut buf)
851             .await
852             .unwrap();
853         assert_eq!(read, 5);
854         let read = async_impl::Body::data(&mut until_close, &mut buf)
855             .await
856             .unwrap();
857         assert_eq!(read, 5);
858         let read = async_impl::Body::data(&mut until_close, &mut buf)
859             .await
860             .unwrap();
861         assert_eq!(read, 1);
862 
863         let box_stream = Box::new("".as_bytes());
864         let content_bytes = "hello";
865 
866         let mut until_close = HttpBody::new(
867             Arc::new(IdleInterceptor),
868             BodyLength::UntilClose,
869             box_stream,
870             content_bytes.as_bytes(),
871         )
872         .unwrap();
873 
874         let mut buf = [0u8; 5];
875         // Read body part
876         let read = async_impl::Body::data(&mut until_close, &mut buf)
877             .await
878             .unwrap();
879         assert_eq!(read, 5);
880         let read = async_impl::Body::data(&mut until_close, &mut buf)
881             .await
882             .unwrap();
883         assert_eq!(read, 0);
884     }
885 }
886