1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use core::pin::Pin; 15 use core::task::{Context, Poll}; 16 use std::future::Future; 17 use std::io::{Cursor, Read}; 18 use std::sync::Arc; 19 20 use ylong_http::body::async_impl::Body; 21 use ylong_http::body::TextBodyDecoder; 22 #[cfg(feature = "http1_1")] 23 use ylong_http::body::{ChunkBodyDecoder, ChunkState}; 24 use ylong_http::headers::Headers; 25 26 use super::conn::StreamData; 27 use crate::async_impl::interceptor::Interceptors; 28 use crate::error::{ErrorKind, HttpClientError}; 29 use crate::runtime::{AsyncRead, ReadBuf, Sleep}; 30 use crate::util::normalizer::BodyLength; 31 32 const TRAILER_SIZE: usize = 1024; 33 34 /// `HttpBody` is the body part of the `Response` returned by `Client::request`. 35 /// `HttpBody` implements `Body` trait, so users can call related methods to get 36 /// body data. 37 /// 38 /// # Examples 39 /// 40 /// ```no_run 41 /// use ylong_http_client::async_impl::{Body, Client, HttpBody, Request}; 42 /// use ylong_http_client::HttpClientError; 43 /// 44 /// async fn read_body() -> Result<(), HttpClientError> { 45 /// let client = Client::new(); 46 /// 47 /// // `HttpBody` is the body part of `response`. 48 /// let mut response = client 49 /// .request(Request::builder().body(Body::empty())?) 50 /// .await?; 51 /// 52 /// // Users can use `Body::data` to get body data. 53 /// let mut buf = [0u8; 1024]; 54 /// loop { 55 /// let size = response.data(&mut buf).await.unwrap(); 56 /// if size == 0 { 57 /// break; 58 /// } 59 /// let _data = &buf[..size]; 60 /// // Deals with the data. 61 /// } 62 /// Ok(()) 63 /// } 64 /// ``` 65 pub struct HttpBody { 66 kind: Kind, 67 sleep: Option<Pin<Box<Sleep>>>, 68 } 69 70 type BoxStreamData = Box<dyn StreamData + Sync + Send + Unpin>; 71 72 impl HttpBody { 73 pub(crate) fn new( 74 interceptors: Arc<Interceptors>, 75 body_length: BodyLength, 76 io: BoxStreamData, 77 pre: &[u8], 78 ) -> Result<Self, HttpClientError> { 79 let kind = match body_length { 80 BodyLength::Empty => { 81 if !pre.is_empty() { 82 // TODO: Consider the case where BodyLength is empty but pre is not empty. 83 io.shutdown(); 84 return err_from_msg!(Request, "Body length is 0 but read extra data"); 85 } 86 Kind::Empty 87 } 88 BodyLength::Length(len) => Kind::Text(Text::new(len, pre, io, interceptors)), 89 BodyLength::UntilClose => Kind::UntilClose(UntilClose::new(pre, io, interceptors)), 90 91 #[cfg(feature = "http1_1")] 92 BodyLength::Chunk => Kind::Chunk(Chunk::new(pre, io, interceptors)), 93 }; 94 Ok(Self { kind, sleep: None }) 95 } 96 97 pub(crate) fn set_sleep(&mut self, sleep: Option<Pin<Box<Sleep>>>) { 98 self.sleep = sleep; 99 } 100 } 101 102 impl Body for HttpBody { 103 type Error = HttpClientError; 104 poll_datanull105 fn poll_data( 106 mut self: Pin<&mut Self>, 107 cx: &mut Context<'_>, 108 buf: &mut [u8], 109 ) -> Poll<Result<usize, Self::Error>> { 110 if buf.is_empty() { 111 return Poll::Ready(Ok(0)); 112 } 113 114 if let Some(delay) = self.sleep.as_mut() { 115 if let Poll::Ready(()) = Pin::new(delay).poll(cx) { 116 return Poll::Ready(err_from_io!(Timeout, std::io::ErrorKind::TimedOut.into())); 117 } 118 } 119 120 match self.kind { 121 Kind::Empty => Poll::Ready(Ok(0)), 122 Kind::Text(ref mut text) => text.data(cx, buf), 123 Kind::UntilClose(ref mut until_close) => until_close.data(cx, buf), 124 #[cfg(feature = "http1_1")] 125 Kind::Chunk(ref mut chunk) => chunk.data(cx, buf), 126 } 127 } 128 poll_trailernull129 fn poll_trailer( 130 mut self: Pin<&mut Self>, 131 cx: &mut Context<'_>, 132 ) -> Poll<Result<Option<Headers>, Self::Error>> { 133 // Get trailer data from io 134 if let Some(delay) = self.sleep.as_mut() { 135 if let Poll::Ready(()) = Pin::new(delay).poll(cx) { 136 return Poll::Ready(err_from_msg!(Timeout, "Request timeout")); 137 } 138 } 139 140 let mut read_buf = [0_u8; TRAILER_SIZE]; 141 142 match self.kind { 143 #[cfg(feature = "http1_1")] 144 Kind::Chunk(ref mut chunk) => { 145 match chunk.data(cx, &mut read_buf) { 146 Poll::Ready(Ok(_)) => {} 147 Poll::Pending => { 148 return Poll::Pending; 149 } 150 Poll::Ready(Err(e)) => { 151 return Poll::Ready(Err(e)); 152 } 153 } 154 Poll::Ready(Ok(chunk.decoder.get_trailer().map_err(|e| { 155 HttpClientError::from_error(ErrorKind::BodyDecode, e) 156 })?)) 157 } 158 _ => Poll::Ready(Ok(None)), 159 } 160 } 161 } 162 163 impl Drop for HttpBody { dropnull164 fn drop(&mut self) { 165 let io = match self.kind { 166 Kind::Text(ref mut text) => text.io.as_mut(), 167 #[cfg(feature = "http1_1")] 168 Kind::Chunk(ref mut chunk) => chunk.io.as_mut(), 169 Kind::UntilClose(ref mut until_close) => until_close.io.as_mut(), 170 _ => None, 171 }; 172 // If response body is not totally read, shutdown io. 173 if let Some(io) = io { 174 io.shutdown() 175 } 176 } 177 } 178 179 // TODO: `TextBodyDecoder` implementation and `ChunkBodyDecoder` implementation. 180 enum Kind { 181 Empty, 182 Text(Text), 183 #[cfg(feature = "http1_1")] 184 Chunk(Chunk), 185 UntilClose(UntilClose), 186 } 187 188 struct UntilClose { 189 interceptors: Arc<Interceptors>, 190 pre: Option<Cursor<Vec<u8>>>, 191 io: Option<BoxStreamData>, 192 } 193 194 impl UntilClose { 195 pub(crate) fn new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self { 196 Self { 197 interceptors, 198 pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())), 199 io: Some(io), 200 } 201 } 202 datanull203 fn data( 204 &mut self, 205 cx: &mut Context<'_>, 206 buf: &mut [u8], 207 ) -> Poll<Result<usize, HttpClientError>> { 208 if buf.is_empty() { 209 return Poll::Ready(Ok(0)); 210 } 211 let mut read = 0; 212 if let Some(pre) = self.pre.as_mut() { 213 // Here cursor read never failed. 214 let this_read = Read::read(pre, buf).unwrap(); 215 if this_read == 0 { 216 self.pre = None; 217 } else { 218 read += this_read; 219 } 220 } 221 222 if !buf[read..].is_empty() { 223 if let Some(io) = self.io.take() { 224 return self.poll_read_io(cx, io, read, buf); 225 } 226 } 227 Poll::Ready(Ok(read)) 228 } 229 poll_read_ionull230 fn poll_read_io( 231 &mut self, 232 cx: &mut Context<'_>, 233 mut io: BoxStreamData, 234 read: usize, 235 buf: &mut [u8], 236 ) -> Poll<Result<usize, HttpClientError>> { 237 let mut read = read; 238 let mut read_buf = ReadBuf::new(&mut buf[read..]); 239 match Pin::new(&mut io).poll_read(cx, &mut read_buf) { 240 // Disconnected. 241 Poll::Ready(Ok(())) => { 242 let filled = read_buf.filled().len(); 243 if filled == 0 { 244 io.shutdown(); 245 } else { 246 self.interceptors 247 .intercept_output(&buf[read..(read + filled)])?; 248 self.io = Some(io); 249 } 250 read += filled; 251 Poll::Ready(Ok(read)) 252 } 253 Poll::Pending => { 254 self.io = Some(io); 255 if read != 0 { 256 return Poll::Ready(Ok(read)); 257 } 258 Poll::Pending 259 } 260 Poll::Ready(Err(e)) => { 261 // If IO error occurs, shutdowns `io` before return. 262 io.shutdown(); 263 Poll::Ready(err_from_io!(BodyTransfer, e)) 264 } 265 } 266 } 267 } 268 269 struct Text { 270 interceptors: Arc<Interceptors>, 271 decoder: TextBodyDecoder, 272 pre: Option<Cursor<Vec<u8>>>, 273 io: Option<BoxStreamData>, 274 } 275 276 impl Text { 277 pub(crate) fn new( 278 len: u64, 279 pre: &[u8], 280 io: BoxStreamData, 281 interceptors: Arc<Interceptors>, 282 ) -> Self { 283 Self { 284 interceptors, 285 decoder: TextBodyDecoder::new(len), 286 pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())), 287 io: Some(io), 288 } 289 } 290 } 291 292 impl Text { datanull293 fn data( 294 &mut self, 295 cx: &mut Context<'_>, 296 buf: &mut [u8], 297 ) -> Poll<Result<usize, HttpClientError>> { 298 if buf.is_empty() { 299 return Poll::Ready(Ok(0)); 300 } 301 302 let mut read = 0; 303 304 if let Some(pre) = self.pre.as_mut() { 305 // Here cursor read never failed. 306 let this_read = Read::read(pre, buf).unwrap(); 307 if this_read == 0 { 308 self.pre = None; 309 } else { 310 read += this_read; 311 if let Some(result) = self.read_remaining(buf, read) { 312 return result; 313 } 314 } 315 } 316 317 if !buf[read..].is_empty() { 318 if let Some(io) = self.io.take() { 319 return self.poll_read_io(cx, buf, io, read); 320 } 321 } 322 Poll::Ready(Ok(read)) 323 } 324 read_remainingnull325 fn read_remaining( 326 &mut self, 327 buf: &mut [u8], 328 read: usize, 329 ) -> Option<Poll<Result<usize, HttpClientError>>> { 330 let (text, rem) = self.decoder.decode(&buf[..read]); 331 332 // Contains redundant `rem`, return error. 333 match (text.is_complete(), rem.is_empty()) { 334 (true, false) => { 335 if let Some(io) = self.io.take() { 336 io.shutdown(); 337 }; 338 Some(Poll::Ready(err_from_msg!(BodyDecode, "Not eof"))) 339 } 340 (true, true) => { 341 if let Some(io) = self.io.take() { 342 // stream not closed, waiting for the fin 343 if !io.is_stream_closable() { 344 self.io = Some(io); 345 } 346 } 347 Some(Poll::Ready(Ok(read))) 348 } 349 // TextBodyDecoder decodes as much as possible here. 350 _ => None, 351 } 352 } 353 poll_read_ionull354 fn poll_read_io( 355 &mut self, 356 cx: &mut Context<'_>, 357 buf: &mut [u8], 358 mut io: BoxStreamData, 359 read: usize, 360 ) -> Poll<Result<usize, HttpClientError>> { 361 let mut read = read; 362 let mut read_buf = ReadBuf::new(&mut buf[read..]); 363 match Pin::new(&mut io).poll_read(cx, &mut read_buf) { 364 // Disconnected. 365 Poll::Ready(Ok(())) => { 366 let filled = read_buf.filled().len(); 367 if filled == 0 { 368 // stream closed, and get the fin 369 if io.is_stream_closable() && self.decoder.decode(&buf[..0]).0.is_complete() { 370 return Poll::Ready(Ok(0)); 371 } 372 io.shutdown(); 373 return Poll::Ready(err_from_msg!(BodyDecode, "Response body incomplete")); 374 } 375 let (text, rem) = self.decoder.decode(read_buf.filled()); 376 self.interceptors.intercept_output(read_buf.filled())?; 377 read += filled; 378 // Contains redundant `rem`, return error. 379 match (text.is_complete(), rem.is_empty()) { 380 (true, false) => { 381 io.shutdown(); 382 Poll::Ready(err_from_msg!(BodyDecode, "Not eof")) 383 } 384 (true, true) => { 385 if !io.is_stream_closable() { 386 // stream not closed, waiting for the fin 387 self.io = Some(io); 388 } 389 Poll::Ready(Ok(read)) 390 } 391 _ => { 392 self.io = Some(io); 393 Poll::Ready(Ok(read)) 394 } 395 } 396 } 397 Poll::Pending => { 398 self.io = Some(io); 399 if read != 0 { 400 return Poll::Ready(Ok(read)); 401 } 402 Poll::Pending 403 } 404 Poll::Ready(Err(e)) => { 405 // If IO error occurs, shutdowns `io` before return. 406 io.shutdown(); 407 Poll::Ready(err_from_io!(BodyDecode, e)) 408 } 409 } 410 } 411 } 412 413 #[cfg(feature = "http1_1")] 414 struct Chunk { 415 interceptors: Arc<Interceptors>, 416 decoder: ChunkBodyDecoder, 417 pre: Option<Cursor<Vec<u8>>>, 418 io: Option<BoxStreamData>, 419 } 420 421 #[cfg(feature = "http1_1")] 422 impl Chunk { 423 pub(crate) fn new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self { 424 Self { 425 interceptors, 426 decoder: ChunkBodyDecoder::new().contains_trailer(true), 427 pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())), 428 io: Some(io), 429 } 430 } 431 } 432 433 #[cfg(feature = "http1_1")] 434 impl Chunk { datanull435 fn data( 436 &mut self, 437 cx: &mut Context<'_>, 438 buf: &mut [u8], 439 ) -> Poll<Result<usize, HttpClientError>> { 440 if buf.is_empty() { 441 return Poll::Ready(Ok(0)); 442 } 443 444 let mut read = 0; 445 446 while let Some(pre) = self.pre.as_mut() { 447 // Here cursor read never failed. 448 let size = Read::read(pre, &mut buf[read..]).unwrap(); 449 if size == 0 { 450 self.pre = None; 451 } 452 453 let (size, flag) = self.merge_chunks(&mut buf[read..read + size])?; 454 read += size; 455 456 if flag { 457 // Return if we find a 0-sized chunk. 458 self.io = None; 459 return Poll::Ready(Ok(read)); 460 } else if read != 0 { 461 // Return if we get some data. 462 return Poll::Ready(Ok(read)); 463 } 464 } 465 466 // Here `read` must be 0. 467 while let Some(mut io) = self.io.take() { 468 let mut read_buf = ReadBuf::new(&mut buf[read..]); 469 match Pin::new(&mut io).poll_read(cx, &mut read_buf) { 470 Poll::Ready(Ok(())) => { 471 let filled = read_buf.filled().len(); 472 if filled == 0 { 473 io.shutdown(); 474 return Poll::Ready(err_from_msg!(BodyDecode, "Response body incomplete")); 475 } 476 let (size, flag) = self.merge_chunks(read_buf.filled_mut())?; 477 self.interceptors.intercept_output(read_buf.filled_mut())?; 478 read += size; 479 if flag { 480 // Return if we find a 0-sized chunk. 481 // Return if we get some data. 482 return Poll::Ready(Ok(read)); 483 } 484 self.io = Some(io); 485 if read != 0 { 486 return Poll::Ready(Ok(read)); 487 } 488 } 489 Poll::Pending => { 490 self.io = Some(io); 491 return Poll::Pending; 492 } 493 Poll::Ready(Err(e)) => { 494 // If IO error occurs, shutdowns `io` before return. 495 io.shutdown(); 496 return Poll::Ready(err_from_io!(BodyDecode, e)); 497 } 498 } 499 } 500 501 Poll::Ready(Ok(read)) 502 } 503 merge_chunksnull504 fn merge_chunks(&mut self, buf: &mut [u8]) -> Result<(usize, bool), HttpClientError> { 505 // Here we need to merge the chunks into one data block and return. 506 // The data arrangement in buf is as follows: 507 // 508 // data in buf: 509 // +------+------+------+------+------+------+------+ 510 // | data | len | data | len | ... | data | len | 511 // +------+------+------+------+------+------+------+ 512 // 513 // We need to merge these data blocks into one block: 514 // 515 // after merge: 516 // +---------------------------+ 517 // | data | 518 // +---------------------------+ 519 520 let (chunks, junk) = self 521 .decoder 522 .decode(buf) 523 .map_err(|e| HttpClientError::from_error(ErrorKind::BodyDecode, e))?; 524 525 let mut finished = false; 526 let mut ptrs = Vec::new(); 527 528 for chunk in chunks.into_iter() { 529 if chunk.trailer().is_some() { 530 if chunk.state() == &ChunkState::Finish { 531 finished = true; 532 } 533 } else { 534 if chunk.size() == 0 && chunk.state() != &ChunkState::MetaSize { 535 finished = true; 536 break; 537 } 538 let data = chunk.data(); 539 ptrs.push((data.as_ptr(), data.len())) 540 } 541 } 542 543 if finished && !junk.is_empty() { 544 return err_from_msg!(BodyDecode, "Invalid chunk body"); 545 } 546 547 let start = buf.as_ptr(); 548 549 let mut idx = 0; 550 for (ptr, len) in ptrs.into_iter() { 551 let st = ptr as usize - start as usize; 552 let ed = st + len; 553 buf.copy_within(st..ed, idx); 554 idx += len; 555 } 556 Ok((idx, finished)) 557 } 558 } 559 560 #[cfg(feature = "ylong_base")] 561 #[cfg(test)] 562 mod ut_async_http_body { 563 use std::sync::Arc; 564 565 use ylong_http::body::async_impl; 566 567 use crate::async_impl::interceptor::IdleInterceptor; 568 use crate::async_impl::HttpBody; 569 use crate::util::normalizer::BodyLength; 570 use crate::ErrorKind; 571 572 /// UT test cases for `HttpBody::trailer`. 573 /// 574 /// # Brief 575 /// 1. Creates a `HttpBody` by calling `HttpBody::new`. 576 /// 2. Calls `trailer` to get headers. 577 /// 3. Checks if the test result is correct. 578 #[test] ut_asnyc_chunk_trailer_1null579 fn ut_asnyc_chunk_trailer_1() { 580 let handle = ylong_runtime::spawn(async move { 581 async_chunk_trailer_1().await; 582 async_chunk_trailer_2().await; 583 }); 584 ylong_runtime::block_on(handle).unwrap(); 585 } 586 587 async fn async_chunk_trailer_1() { 588 let box_stream = Box::new("".as_bytes()); 589 let chunk_body_bytes = "\ 590 5\r\n\ 591 hello\r\n\ 592 C ; type = text ;end = !\r\n\ 593 hello world!\r\n\ 594 000; message = last\r\n\ 595 accept:text/html\r\n\r\n\ 596 "; 597 let mut chunk = HttpBody::new( 598 Arc::new(IdleInterceptor), 599 BodyLength::Chunk, 600 box_stream, 601 chunk_body_bytes.as_bytes(), 602 ) 603 .unwrap(); 604 let res = async_impl::Body::trailer(&mut chunk) 605 .await 606 .unwrap() 607 .unwrap(); 608 assert_eq!( 609 res.get("accept").unwrap().to_string().unwrap(), 610 "text/html".to_string() 611 ); 612 let box_stream = Box::new("".as_bytes()); 613 let chunk_body_no_trailer_bytes = "\ 614 5\r\n\ 615 hello\r\n\ 616 C ; type = text ;end = !\r\n\ 617 hello world!\r\n\ 618 0\r\n\r\n\ 619 "; 620 621 let mut chunk = HttpBody::new( 622 Arc::new(IdleInterceptor), 623 BodyLength::Chunk, 624 box_stream, 625 chunk_body_no_trailer_bytes.as_bytes(), 626 ) 627 .unwrap(); 628 629 let mut buf = [0u8; 32]; 630 // Read body part 631 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 632 assert_eq!(read, 5); 633 assert_eq!(&buf[..read], b"hello"); 634 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 635 assert_eq!(read, 12); 636 assert_eq!(&buf[..read], b"hello world!"); 637 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 638 assert_eq!(read, 0); 639 assert_eq!(&buf[..read], b""); 640 // try read trailer part 641 let res = async_impl::Body::trailer(&mut chunk).await.unwrap(); 642 assert!(res.is_none()); 643 } 644 645 async fn async_chunk_trailer_2() { 646 let box_stream = Box::new("".as_bytes()); 647 let chunk_body_bytes = "\ 648 5\r\n\ 649 hello\r\n\ 650 C ; type = text ;end = !\r\n\ 651 hello world!\r\n\ 652 000; message = last\r\n\ 653 Expires: Wed, 21 Oct 2015 07:27:00 GMT \r\n\r\n\ 654 "; 655 let mut chunk = HttpBody::new( 656 Arc::new(IdleInterceptor), 657 BodyLength::Chunk, 658 box_stream, 659 chunk_body_bytes.as_bytes(), 660 ) 661 .unwrap(); 662 let res = async_impl::Body::trailer(&mut chunk) 663 .await 664 .unwrap() 665 .unwrap(); 666 assert_eq!( 667 res.get("expires").unwrap().to_string().unwrap(), 668 "Wed, 21 Oct 2015 07:27:00 GMT".to_string() 669 ); 670 } 671 672 /// UT test cases for `Body::data`. 673 /// 674 /// # Brief 675 /// 1. Creates a chunk `HttpBody`. 676 /// 2. Calls `data` method get boxstream. 677 /// 3. Checks if data size is correct. 678 #[test] ut_asnyc_http_body_chunk2null679 fn ut_asnyc_http_body_chunk2() { 680 let handle = ylong_runtime::spawn(async move { 681 http_body_chunk2().await; 682 }); 683 ylong_runtime::block_on(handle).unwrap(); 684 } 685 686 async fn http_body_chunk2() { 687 let box_stream = Box::new( 688 "\ 689 5\r\n\ 690 hello\r\n\ 691 C ; type = text ;end = !\r\n\ 692 hello world!\r\n\ 693 000; message = last\r\n\ 694 accept:text/html\r\n\r\n\ 695 " 696 .as_bytes(), 697 ); 698 let chunk_body_bytes = ""; 699 let mut chunk = HttpBody::new( 700 Arc::new(IdleInterceptor), 701 BodyLength::Chunk, 702 box_stream, 703 chunk_body_bytes.as_bytes(), 704 ) 705 .unwrap(); 706 707 let mut buf = [0u8; 32]; 708 // Read body part 709 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 710 assert_eq!(read, 5); 711 712 let box_stream = Box::new("".as_bytes()); 713 let chunk_body_no_trailer_bytes = "\ 714 5\r\n\ 715 hello\r\n\ 716 C ; type = text ;end = !\r\n\ 717 hello world!\r\n\ 718 0\r\n\r\n\ 719 "; 720 721 let mut chunk = HttpBody::new( 722 Arc::new(IdleInterceptor), 723 BodyLength::Chunk, 724 box_stream, 725 chunk_body_no_trailer_bytes.as_bytes(), 726 ) 727 .unwrap(); 728 729 let mut buf = [0u8; 32]; 730 // Read body part 731 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 732 assert_eq!(read, 5); 733 assert_eq!(&buf[..read], b"hello"); 734 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 735 assert_eq!(read, 12); 736 assert_eq!(&buf[..read], b"hello world!"); 737 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 738 assert_eq!(read, 0); 739 assert_eq!(&buf[..read], b""); 740 let res = async_impl::Body::trailer(&mut chunk).await.unwrap(); 741 assert!(res.is_none()); 742 } 743 744 /// UT test cases for `Body::data`. 745 /// 746 /// # Brief 747 /// 1. Creates a empty `HttpBody`. 748 /// 2. Calls `HttpBody::new` to create empty http body. 749 /// 3. Checks if http body is empty. 750 #[test] http_body_empty_errnull751 fn http_body_empty_err() { 752 let box_stream = Box::new("".as_bytes()); 753 let content_bytes = "hello"; 754 755 match HttpBody::new( 756 Arc::new(IdleInterceptor), 757 BodyLength::Empty, 758 box_stream, 759 content_bytes.as_bytes(), 760 ) { 761 Ok(_) => (), 762 Err(e) => assert_eq!(e.error_kind(), ErrorKind::Request), 763 } 764 } 765 766 /// UT test cases for text `HttpBody::new`. 767 /// 768 /// # Brief 769 /// 1. Creates a text `HttpBody`. 770 /// 2. Calls `HttpBody::new` to create text http body. 771 /// 3. Checks if result is correct. 772 #[test] ut_http_body_textnull773 fn ut_http_body_text() { 774 let handle = ylong_runtime::spawn(async move { 775 http_body_text().await; 776 }); 777 ylong_runtime::block_on(handle).unwrap(); 778 } 779 780 async fn http_body_text() { 781 let box_stream = Box::new("hello world".as_bytes()); 782 let content_bytes = ""; 783 784 let mut text = HttpBody::new( 785 Arc::new(IdleInterceptor), 786 BodyLength::Length(11), 787 box_stream, 788 content_bytes.as_bytes(), 789 ) 790 .unwrap(); 791 792 let mut buf = [0u8; 5]; 793 // Read body part 794 let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap(); 795 assert_eq!(read, 5); 796 let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap(); 797 assert_eq!(read, 5); 798 let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap(); 799 assert_eq!(read, 1); 800 let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap(); 801 assert_eq!(read, 0); 802 803 let box_stream = Box::new("".as_bytes()); 804 let content_bytes = "hello"; 805 806 let mut text = HttpBody::new( 807 Arc::new(IdleInterceptor), 808 BodyLength::Length(5), 809 box_stream, 810 content_bytes.as_bytes(), 811 ) 812 .unwrap(); 813 814 let mut buf = [0u8; 32]; 815 // Read body part 816 let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap(); 817 assert_eq!(read, 5); 818 let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap(); 819 assert_eq!(read, 0); 820 } 821 822 /// UT test cases for until_close `HttpBody::new`. 823 /// 824 /// # Brief 825 /// 1. Creates a until_close `HttpBody`. 826 /// 2. Calls `HttpBody::new` to create until_close http body. 827 /// 3. Checks if result is correct. 828 #[test] ut_http_body_until_closenull829 fn ut_http_body_until_close() { 830 let handle = ylong_runtime::spawn(async move { 831 http_body_until_close().await; 832 }); 833 ylong_runtime::block_on(handle).unwrap(); 834 } 835 836 async fn http_body_until_close() { 837 let box_stream = Box::new("hello world".as_bytes()); 838 let content_bytes = ""; 839 840 let mut until_close = HttpBody::new( 841 Arc::new(IdleInterceptor), 842 BodyLength::UntilClose, 843 box_stream, 844 content_bytes.as_bytes(), 845 ) 846 .unwrap(); 847 848 let mut buf = [0u8; 5]; 849 // Read body part 850 let read = async_impl::Body::data(&mut until_close, &mut buf) 851 .await 852 .unwrap(); 853 assert_eq!(read, 5); 854 let read = async_impl::Body::data(&mut until_close, &mut buf) 855 .await 856 .unwrap(); 857 assert_eq!(read, 5); 858 let read = async_impl::Body::data(&mut until_close, &mut buf) 859 .await 860 .unwrap(); 861 assert_eq!(read, 1); 862 863 let box_stream = Box::new("".as_bytes()); 864 let content_bytes = "hello"; 865 866 let mut until_close = HttpBody::new( 867 Arc::new(IdleInterceptor), 868 BodyLength::UntilClose, 869 box_stream, 870 content_bytes.as_bytes(), 871 ) 872 .unwrap(); 873 874 let mut buf = [0u8; 5]; 875 // Read body part 876 let read = async_impl::Body::data(&mut until_close, &mut buf) 877 .await 878 .unwrap(); 879 assert_eq!(read, 5); 880 let read = async_impl::Body::data(&mut until_close, &mut buf) 881 .await 882 .unwrap(); 883 assert_eq!(read, 0); 884 } 885 } 886