1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 pub(crate) trait Dispatcher { 15 type Handle; 16 dispatchnull17 fn dispatch(&self) -> Option<Self::Handle>; 18 is_shutdownnull19 fn is_shutdown(&self) -> bool; 20 21 #[allow(dead_code)] is_goawaynull22 fn is_goaway(&self) -> bool; 23 } 24 25 pub(crate) enum ConnDispatcher<S> { 26 #[cfg(feature = "http1_1")] 27 Http1(http1::Http1Dispatcher<S>), 28 29 #[cfg(feature = "http2")] 30 Http2(http2::Http2Dispatcher<S>), 31 32 #[cfg(feature = "http3")] 33 Http3(http3::Http3Dispatcher<S>), 34 } 35 36 impl<S> Dispatcher for ConnDispatcher<S> { 37 type Handle = Conn<S>; 38 dispatchnull39 fn dispatch(&self) -> Option<Self::Handle> { 40 match self { 41 #[cfg(feature = "http1_1")] 42 Self::Http1(h1) => h1.dispatch().map(Conn::Http1), 43 44 #[cfg(feature = "http2")] 45 Self::Http2(h2) => h2.dispatch().map(Conn::Http2), 46 47 #[cfg(feature = "http3")] 48 Self::Http3(h3) => h3.dispatch().map(Conn::Http3), 49 } 50 } 51 is_shutdownnull52 fn is_shutdown(&self) -> bool { 53 match self { 54 #[cfg(feature = "http1_1")] 55 Self::Http1(h1) => h1.is_shutdown(), 56 57 #[cfg(feature = "http2")] 58 Self::Http2(h2) => h2.is_shutdown(), 59 60 #[cfg(feature = "http3")] 61 Self::Http3(h3) => h3.is_shutdown(), 62 } 63 } 64 is_goawaynull65 fn is_goaway(&self) -> bool { 66 match self { 67 #[cfg(feature = "http1_1")] 68 Self::Http1(h1) => h1.is_goaway(), 69 70 #[cfg(feature = "http2")] 71 Self::Http2(h2) => h2.is_goaway(), 72 73 #[cfg(feature = "http3")] 74 Self::Http3(h3) => h3.is_goaway(), 75 } 76 } 77 } 78 79 pub(crate) enum Conn<S> { 80 #[cfg(feature = "http1_1")] 81 Http1(http1::Http1Conn<S>), 82 83 #[cfg(feature = "http2")] 84 Http2(http2::Http2Conn<S>), 85 86 #[cfg(feature = "http3")] 87 Http3(http3::Http3Conn<S>), 88 } 89 90 #[cfg(feature = "http1_1")] 91 pub(crate) mod http1 { 92 use std::cell::UnsafeCell; 93 use std::sync::atomic::{AtomicBool, Ordering}; 94 use std::sync::Arc; 95 96 use super::{ConnDispatcher, Dispatcher}; 97 98 impl<S> ConnDispatcher<S> { 99 pub(crate) fn http1(io: S) -> Self { 100 Self::Http1(Http1Dispatcher::new(io)) 101 } 102 } 103 104 /// HTTP1-based connection manager, which can dispatch connections to other 105 /// threads according to HTTP1 syntax. 106 pub(crate) struct Http1Dispatcher<S> { 107 inner: Arc<Inner<S>>, 108 } 109 110 pub(crate) struct Inner<S> { 111 pub(crate) io: UnsafeCell<S>, 112 // `occupied` indicates that the connection is occupied. Only one coroutine 113 // can get the handle at the same time. Once the handle is fetched, the flag 114 // position is true. 115 pub(crate) occupied: AtomicBool, 116 // `shutdown` indicates that the connection need to be shut down. 117 pub(crate) shutdown: AtomicBool, 118 } 119 120 unsafe impl<S> Sync for Inner<S> {} 121 122 impl<S> Http1Dispatcher<S> { 123 pub(crate) fn new(io: S) -> Self { 124 Self { 125 inner: Arc::new(Inner { 126 io: UnsafeCell::new(io), 127 occupied: AtomicBool::new(false), 128 shutdown: AtomicBool::new(false), 129 }), 130 } 131 } 132 } 133 134 impl<S> Dispatcher for Http1Dispatcher<S> { 135 type Handle = Http1Conn<S>; 136 dispatchnull137 fn dispatch(&self) -> Option<Self::Handle> { 138 self.inner 139 .occupied 140 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) 141 .ok() 142 .map(|_| Http1Conn { 143 inner: self.inner.clone(), 144 }) 145 } 146 is_shutdownnull147 fn is_shutdown(&self) -> bool { 148 self.inner.shutdown.load(Ordering::Relaxed) 149 } 150 is_goawaynull151 fn is_goaway(&self) -> bool { 152 false 153 } 154 } 155 156 /// Handle returned to other threads for I/O operations. 157 pub(crate) struct Http1Conn<S> { 158 pub(crate) inner: Arc<Inner<S>>, 159 } 160 161 impl<S> Http1Conn<S> { 162 pub(crate) fn raw_mut(&mut self) -> &mut S { 163 // SAFETY: In the case of `HTTP1`, only one coroutine gets the handle 164 // at the same time. 165 unsafe { &mut *self.inner.io.get() } 166 } 167 168 pub(crate) fn shutdown(&self) { 169 self.inner.shutdown.store(true, Ordering::Release); 170 } 171 } 172 173 impl<S> Drop for Http1Conn<S> { dropnull174 fn drop(&mut self) { 175 self.inner.occupied.store(false, Ordering::Release) 176 } 177 } 178 } 179 180 #[cfg(feature = "http2")] 181 pub(crate) mod http2 { 182 use std::collections::HashMap; 183 use std::future::Future; 184 use std::marker::PhantomData; 185 use std::pin::Pin; 186 use std::sync::atomic::{AtomicBool, Ordering}; 187 use std::sync::{Arc, Mutex}; 188 use std::task::{Context, Poll}; 189 190 use ylong_http::error::HttpError; 191 use ylong_http::h2::{ 192 ErrorCode, Frame, FrameDecoder, FrameEncoder, FrameFlags, Goaway, H2Error, Payload, 193 RstStream, Settings, SettingsBuilder, StreamId, 194 }; 195 196 use crate::runtime::{ 197 bounded_channel, unbounded_channel, AsyncRead, AsyncWrite, AsyncWriteExt, BoundedReceiver, 198 BoundedSender, SendError, UnboundedReceiver, UnboundedSender, WriteHalf, 199 }; 200 use crate::util::config::H2Config; 201 use crate::util::dispatcher::{ConnDispatcher, Dispatcher}; 202 use crate::util::h2::{ 203 ConnManager, FlowControl, H2StreamState, RecvData, RequestWrapper, SendData, 204 StreamEndState, Streams, 205 }; 206 use crate::ErrorKind::Request; 207 use crate::{ErrorKind, HttpClientError}; 208 const DEFAULT_MAX_FRAME_SIZE: usize = 2 << 13; 209 const DEFAULT_WINDOW_SIZE: u32 = 65535; 210 211 pub(crate) type ManagerSendFut = 212 Pin<Box<dyn Future<Output = Result<(), SendError<RespMessage>>> + Send + Sync>>; 213 214 pub(crate) enum RespMessage { 215 Output(Frame), 216 OutputExit(DispatchErrorKind), 217 } 218 219 pub(crate) enum OutputMessage { 220 Output(Frame), 221 OutputExit(DispatchErrorKind), 222 } 223 224 pub(crate) struct ReqMessage { 225 pub(crate) sender: BoundedSender<RespMessage>, 226 pub(crate) request: RequestWrapper, 227 } 228 229 #[derive(Debug, Eq, PartialEq, Copy, Clone)] 230 pub(crate) enum DispatchErrorKind { 231 H2(H2Error), 232 Io(std::io::ErrorKind), 233 ChannelClosed, 234 Disconnect, 235 } 236 237 // HTTP2-based connection manager, which can dispatch connections to other 238 // threads according to HTTP2 syntax. 239 pub(crate) struct Http2Dispatcher<S> { 240 pub(crate) allowed_cache: usize, 241 pub(crate) sender: UnboundedSender<ReqMessage>, 242 pub(crate) io_shutdown: Arc<AtomicBool>, 243 pub(crate) handles: Vec<crate::runtime::JoinHandle<()>>, 244 pub(crate) _mark: PhantomData<S>, 245 } 246 247 pub(crate) struct Http2Conn<S> { 248 pub(crate) allow_cached_frames: usize, 249 // Sends frame to StreamController 250 pub(crate) sender: UnboundedSender<ReqMessage>, 251 pub(crate) receiver: RespReceiver, 252 pub(crate) io_shutdown: Arc<AtomicBool>, 253 pub(crate) _mark: PhantomData<S>, 254 } 255 256 pub(crate) struct StreamController { 257 // The connection close flag organizes new stream commits to the current connection when 258 // closed. 259 pub(crate) io_shutdown: Arc<AtomicBool>, 260 // The senders of all connected stream channels of response. 261 pub(crate) senders: HashMap<StreamId, BoundedSender<RespMessage>>, 262 pub(crate) curr_message: HashMap<StreamId, ManagerSendFut>, 263 // Stream information on the connection. 264 pub(crate) streams: Streams, 265 // Received GO_AWAY frame. 266 pub(crate) recved_go_away: Option<StreamId>, 267 // The last GO_AWAY frame sent by the client. 268 pub(crate) go_away_sync: GoAwaySync, 269 } 270 271 #[derive(Default)] 272 pub(crate) struct GoAwaySync { 273 pub(crate) going_away: Option<Goaway>, 274 } 275 276 #[derive(Default)] 277 pub(crate) struct SettingsSync { 278 pub(crate) settings: SettingsState, 279 } 280 281 #[derive(Default, Clone)] 282 pub(crate) enum SettingsState { 283 Acknowledging(Settings), 284 #[default] 285 Synced, 286 } 287 288 #[derive(Default)] 289 pub(crate) struct RespReceiver { 290 receiver: Option<BoundedReceiver<RespMessage>>, 291 } 292 293 impl<S> ConnDispatcher<S> 294 where 295 S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static, 296 { 297 pub(crate) fn http2(config: H2Config, io: S) -> Self { 298 Self::Http2(Http2Dispatcher::new(config, io)) 299 } 300 } 301 302 impl<S> Http2Dispatcher<S> 303 where 304 S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static, 305 { 306 pub(crate) fn new(config: H2Config, io: S) -> Self { 307 let settings = create_initial_settings(&config); 308 309 let mut flow = FlowControl::new(DEFAULT_WINDOW_SIZE, DEFAULT_WINDOW_SIZE); 310 flow.setup_recv_window(config.conn_window_size()); 311 312 let streams = Streams::new(config.stream_window_size(), DEFAULT_WINDOW_SIZE, flow); 313 let shutdown_flag = Arc::new(AtomicBool::new(false)); 314 let controller = StreamController::new(streams, shutdown_flag.clone()); 315 316 let (input_tx, input_rx) = unbounded_channel(); 317 let (req_tx, req_rx) = unbounded_channel(); 318 319 // Error is not possible, so it is not handled for the time 320 // being. 321 let mut handles = Vec::with_capacity(3); 322 if input_tx.send(settings).is_ok() { 323 Self::launch( 324 config.allowed_cache_frame_size(), 325 config.use_huffman_coding(), 326 controller, 327 (input_tx, input_rx), 328 req_rx, 329 &mut handles, 330 io, 331 ); 332 } 333 Self { 334 allowed_cache: config.allowed_cache_frame_size(), 335 sender: req_tx, 336 io_shutdown: shutdown_flag, 337 handles, 338 _mark: PhantomData, 339 } 340 } 341 launchnull342 fn launch( 343 allow_num: usize, 344 use_huffman: bool, 345 controller: StreamController, 346 input_channel: (UnboundedSender<Frame>, UnboundedReceiver<Frame>), 347 req_rx: UnboundedReceiver<ReqMessage>, 348 handles: &mut Vec<crate::runtime::JoinHandle<()>>, 349 io: S, 350 ) { 351 let (resp_tx, resp_rx) = bounded_channel(allow_num); 352 let (read, write) = crate::runtime::split(io); 353 let settings_sync = Arc::new(Mutex::new(SettingsSync::default())); 354 let send_settings_sync = settings_sync.clone(); 355 let send = crate::runtime::spawn(async move { 356 let mut writer = write; 357 if async_send_preface(&mut writer).await.is_ok() { 358 let encoder = FrameEncoder::new(DEFAULT_MAX_FRAME_SIZE, use_huffman); 359 let mut send = 360 SendData::new(encoder, send_settings_sync, writer, input_channel.1); 361 let _ = Pin::new(&mut send).await; 362 } 363 }); 364 handles.push(send); 365 366 let recv_settings_sync = settings_sync.clone(); 367 let recv = crate::runtime::spawn(async move { 368 let decoder = FrameDecoder::new(); 369 let mut recv = RecvData::new(decoder, recv_settings_sync, read, resp_tx); 370 let _ = Pin::new(&mut recv).await; 371 }); 372 handles.push(recv); 373 374 let manager = crate::runtime::spawn(async move { 375 let mut conn_manager = 376 ConnManager::new(settings_sync, input_channel.0, resp_rx, req_rx, controller); 377 let _ = Pin::new(&mut conn_manager).await; 378 }); 379 handles.push(manager); 380 } 381 } 382 383 impl<S> Dispatcher for Http2Dispatcher<S> { 384 type Handle = Http2Conn<S>; 385 dispatchnull386 fn dispatch(&self) -> Option<Self::Handle> { 387 let sender = self.sender.clone(); 388 let handle = Http2Conn::new(self.allowed_cache, self.io_shutdown.clone(), sender); 389 Some(handle) 390 } 391 is_shutdownnull392 fn is_shutdown(&self) -> bool { 393 self.io_shutdown.load(Ordering::Relaxed) 394 } 395 is_goawaynull396 fn is_goaway(&self) -> bool { 397 // todo: goaway and shutdown 398 false 399 } 400 } 401 402 impl<S> Drop for Http2Dispatcher<S> { dropnull403 fn drop(&mut self) { 404 for handle in &self.handles { 405 #[cfg(feature = "ylong_base")] 406 handle.cancel(); 407 #[cfg(feature = "tokio_base")] 408 handle.abort(); 409 } 410 } 411 } 412 413 impl<S> Http2Conn<S> { 414 pub(crate) fn new( 415 allow_cached_num: usize, 416 io_shutdown: Arc<AtomicBool>, 417 sender: UnboundedSender<ReqMessage>, 418 ) -> Self { 419 Self { 420 allow_cached_frames: allow_cached_num, 421 sender, 422 receiver: RespReceiver::default(), 423 io_shutdown, 424 _mark: PhantomData, 425 } 426 } 427 428 pub(crate) fn send_frame_to_controller( 429 &mut self, 430 request: RequestWrapper, 431 ) -> Result<(), HttpClientError> { 432 let (tx, rx) = bounded_channel::<RespMessage>(self.allow_cached_frames); 433 self.receiver.set_receiver(rx); 434 self.sender 435 .send(ReqMessage { 436 sender: tx, 437 request, 438 }) 439 .map_err(|_| { 440 HttpClientError::from_str(ErrorKind::Request, "Request Sender Closed !") 441 }) 442 } 443 } 444 445 impl StreamController { 446 pub(crate) fn new(streams: Streams, shutdown: Arc<AtomicBool>) -> Self { 447 Self { 448 io_shutdown: shutdown, 449 senders: HashMap::new(), 450 curr_message: HashMap::new(), 451 streams, 452 recved_go_away: None, 453 go_away_sync: GoAwaySync::default(), 454 } 455 } 456 457 pub(crate) fn shutdown(&self) { 458 self.io_shutdown.store(true, Ordering::Release); 459 } 460 461 pub(crate) fn get_unsent_streams( 462 &mut self, 463 last_stream_id: StreamId, 464 ) -> Result<Vec<StreamId>, H2Error> { 465 // The last-stream-id in the subsequent GO_AWAY frame 466 // cannot be greater than the last-stream-id in the previous GO_AWAY frame. 467 if self.streams.max_send_id < last_stream_id { 468 return Err(H2Error::ConnectionError(ErrorCode::ProtocolError)); 469 } 470 self.streams.max_send_id = last_stream_id; 471 Ok(self.streams.get_go_away_streams(last_stream_id)) 472 } 473 474 pub(crate) fn send_message_to_stream( 475 &mut self, 476 cx: &mut Context<'_>, 477 stream_id: StreamId, 478 message: RespMessage, 479 ) -> Poll<Result<(), H2Error>> { 480 if let Some(sender) = self.senders.get(&stream_id) { 481 // If the client coroutine has exited, this frame is skipped. 482 let mut tx = { 483 let sender = sender.clone(); 484 let ft = async move { sender.send(message).await }; 485 Box::pin(ft) 486 }; 487 488 match tx.as_mut().poll(cx) { 489 Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), 490 // The current coroutine sending the request exited prematurely. 491 Poll::Ready(Err(_)) => { 492 self.senders.remove(&stream_id); 493 Poll::Ready(Err(H2Error::StreamError(stream_id, ErrorCode::NoError))) 494 } 495 Poll::Pending => { 496 self.curr_message.insert(stream_id, tx); 497 Poll::Pending 498 } 499 } 500 } else { 501 Poll::Ready(Err(H2Error::StreamError(stream_id, ErrorCode::NoError))) 502 } 503 } 504 505 pub(crate) fn poll_blocked_message( 506 &mut self, 507 cx: &mut Context<'_>, 508 input_tx: &UnboundedSender<Frame>, 509 ) -> Poll<()> { 510 let keys: Vec<StreamId> = self.curr_message.keys().cloned().collect(); 511 let mut blocked = false; 512 513 for key in keys { 514 if let Some(mut task) = self.curr_message.remove(&key) { 515 match task.as_mut().poll(cx) { 516 Poll::Ready(Ok(_)) => {} 517 // The current coroutine sending the request exited prematurely. 518 Poll::Ready(Err(_)) => { 519 self.senders.remove(&key); 520 if let Some(state) = self.streams.stream_state(key) { 521 if !matches!(state, H2StreamState::Closed(_)) { 522 if let StreamEndState::OK = self.streams.send_local_reset(key) { 523 let rest_payload = 524 RstStream::new(ErrorCode::NoError.into_code()); 525 let frame = Frame::new( 526 key, 527 FrameFlags::empty(), 528 Payload::RstStream(rest_payload), 529 ); 530 // ignore the send error occurs here in order to finish all 531 // tasks. 532 let _ = input_tx.send(frame); 533 } 534 } 535 } 536 } 537 Poll::Pending => { 538 self.curr_message.insert(key, task); 539 blocked = true; 540 } 541 } 542 } 543 } 544 if blocked { 545 Poll::Pending 546 } else { 547 Poll::Ready(()) 548 } 549 } 550 } 551 552 impl RespReceiver { 553 pub(crate) fn set_receiver(&mut self, receiver: BoundedReceiver<RespMessage>) { 554 self.receiver = Some(receiver); 555 } 556 557 pub(crate) async fn recv(&mut self) -> Result<Frame, HttpClientError> { 558 match self.receiver { 559 Some(ref mut receiver) => { 560 #[cfg(feature = "tokio_base")] 561 match receiver.recv().await { 562 None => err_from_msg!(Request, "Response Receiver Closed !"), 563 Some(message) => match message { 564 RespMessage::Output(frame) => Ok(frame), 565 RespMessage::OutputExit(e) => Err(dispatch_client_error(e)), 566 }, 567 } 568 569 #[cfg(feature = "ylong_base")] 570 match receiver.recv().await { 571 Err(err) => Err(HttpClientError::from_error(ErrorKind::Request, err)), 572 Ok(message) => match message { 573 RespMessage::Output(frame) => Ok(frame), 574 RespMessage::OutputExit(e) => Err(dispatch_client_error(e)), 575 }, 576 } 577 } 578 // this will not happen. 579 None => Err(HttpClientError::from_str( 580 ErrorKind::Request, 581 "Invalid Frame Receiver !", 582 )), 583 } 584 } 585 586 pub(crate) fn poll_recv( 587 &mut self, 588 cx: &mut Context<'_>, 589 ) -> Poll<Result<Frame, HttpClientError>> { 590 if let Some(ref mut receiver) = self.receiver { 591 #[cfg(feature = "tokio_base")] 592 match receiver.poll_recv(cx) { 593 Poll::Ready(None) => { 594 Poll::Ready(err_from_msg!(Request, "Error receive response !")) 595 } 596 Poll::Ready(Some(message)) => match message { 597 RespMessage::Output(frame) => Poll::Ready(Ok(frame)), 598 RespMessage::OutputExit(e) => Poll::Ready(Err(dispatch_client_error(e))), 599 }, 600 Poll::Pending => Poll::Pending, 601 } 602 603 #[cfg(feature = "ylong_base")] 604 match receiver.poll_recv(cx) { 605 Poll::Ready(Err(e)) => { 606 Poll::Ready(Err(HttpClientError::from_error(ErrorKind::Request, e))) 607 } 608 Poll::Ready(Ok(message)) => match message { 609 RespMessage::Output(frame) => Poll::Ready(Ok(frame)), 610 RespMessage::OutputExit(e) => Poll::Ready(Err(dispatch_client_error(e))), 611 }, 612 Poll::Pending => Poll::Pending, 613 } 614 } else { 615 Poll::Ready(err_from_msg!(Request, "Invalid Frame Receiver !")) 616 } 617 } 618 } 619 620 async fn async_send_preface<S>(writer: &mut WriteHalf<S>) -> Result<(), DispatchErrorKind> 621 where 622 S: AsyncWrite + Unpin, 623 { 624 const PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; 625 writer 626 .write_all(PREFACE) 627 .await 628 .map_err(|e| DispatchErrorKind::Io(e.kind())) 629 } 630 631 pub(crate) fn create_initial_settings(config: &H2Config) -> Frame { 632 let settings = SettingsBuilder::new() 633 .max_header_list_size(config.max_header_list_size()) 634 .max_frame_size(config.max_frame_size()) 635 .header_table_size(config.header_table_size()) 636 .enable_push(config.enable_push()) 637 .initial_window_size(config.stream_window_size()) 638 .build(); 639 640 Frame::new(0, FrameFlags::new(0), Payload::Settings(settings)) 641 } 642 643 impl From<std::io::Error> for DispatchErrorKind { fromnull644 fn from(value: std::io::Error) -> Self { 645 DispatchErrorKind::Io(value.kind()) 646 } 647 } 648 649 impl From<H2Error> for DispatchErrorKind { fromnull650 fn from(err: H2Error) -> Self { 651 DispatchErrorKind::H2(err) 652 } 653 } 654 655 pub(crate) fn dispatch_client_error(dispatch_error: DispatchErrorKind) -> HttpClientError { 656 match dispatch_error { 657 DispatchErrorKind::H2(e) => HttpClientError::from_error(Request, HttpError::from(e)), 658 DispatchErrorKind::Io(e) => { 659 HttpClientError::from_io_error(Request, std::io::Error::from(e)) 660 } 661 DispatchErrorKind::ChannelClosed => { 662 HttpClientError::from_str(Request, "Coroutine channel closed.") 663 } 664 DispatchErrorKind::Disconnect => { 665 HttpClientError::from_str(Request, "remote peer closed.") 666 } 667 } 668 } 669 } 670 671 #[cfg(feature = "http3")] 672 pub(crate) mod http3 { 673 use std::marker::PhantomData; 674 use std::pin::Pin; 675 use std::sync::atomic::{AtomicBool, Ordering}; 676 use std::sync::{Arc, Mutex}; 677 678 use ylong_http::error::HttpError; 679 use ylong_http::h3::{Frame, FrameDecoder, H3Error}; 680 681 use crate::async_impl::{ConnInfo, QuicConn}; 682 use crate::runtime::{ 683 bounded_channel, unbounded_channel, AsyncRead, AsyncWrite, BoundedReceiver, BoundedSender, 684 UnboundedSender, 685 }; 686 use crate::util::config::H3Config; 687 use crate::util::data_ref::BodyDataRef; 688 use crate::util::dispatcher::{ConnDispatcher, Dispatcher}; 689 use crate::util::h3::io_manager::IOManager; 690 use crate::util::h3::stream_manager::StreamManager; 691 use crate::ErrorKind::Request; 692 use crate::{ErrorKind, HttpClientError}; 693 694 pub(crate) struct Http3Dispatcher<S> { 695 pub(crate) req_tx: UnboundedSender<ReqMessage>, 696 pub(crate) handles: Vec<crate::runtime::JoinHandle<()>>, 697 pub(crate) _mark: PhantomData<S>, 698 pub(crate) io_shutdown: Arc<AtomicBool>, 699 pub(crate) io_goaway: Arc<AtomicBool>, 700 } 701 702 pub(crate) struct Http3Conn<S> { 703 pub(crate) sender: UnboundedSender<ReqMessage>, 704 pub(crate) resp_receiver: BoundedReceiver<RespMessage>, 705 pub(crate) resp_sender: BoundedSender<RespMessage>, 706 pub(crate) io_shutdown: Arc<AtomicBool>, 707 pub(crate) _mark: PhantomData<S>, 708 } 709 710 pub(crate) struct RequestWrapper { 711 pub(crate) header: Frame, 712 pub(crate) data: BodyDataRef, 713 } 714 715 #[derive(Debug, Clone)] 716 pub(crate) enum DispatchErrorKind { 717 H3(H3Error), 718 Io(std::io::ErrorKind), 719 Quic(quiche::Error), 720 ChannelClosed, 721 StreamFinished, 722 // todo: retry? 723 GoawayReceived, 724 Disconnect, 725 } 726 727 pub(crate) enum RespMessage { 728 Output(Frame), 729 OutputExit(DispatchErrorKind), 730 } 731 732 pub(crate) struct ReqMessage { 733 pub(crate) request: RequestWrapper, 734 pub(crate) frame_tx: BoundedSender<RespMessage>, 735 } 736 737 impl<S> Http3Dispatcher<S> 738 where 739 S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static, 740 { 741 pub(crate) fn new(config: H3Config, io: S, quic_connection: QuicConn) -> Self { 742 let (req_tx, req_rx) = unbounded_channel(); 743 let (io_manager_tx, io_manager_rx) = unbounded_channel(); 744 let (stream_manager_tx, stream_manager_rx) = unbounded_channel(); 745 let mut handles = Vec::with_capacity(2); 746 let conn = Arc::new(Mutex::new(quic_connection)); 747 let io_shutdown = Arc::new(AtomicBool::new(false)); 748 let io_goaway = Arc::new(AtomicBool::new(false)); 749 let mut stream_manager = StreamManager::new( 750 conn.clone(), 751 io_manager_tx, 752 stream_manager_rx, 753 req_rx, 754 FrameDecoder::new( 755 config.qpack_blocked_streams() as usize, 756 config.qpack_max_table_capacity() as usize, 757 ), 758 io_shutdown.clone(), 759 io_goaway.clone(), 760 ); 761 let stream_handle = crate::runtime::spawn(async move { 762 if stream_manager.init(config).is_err() { 763 return; 764 } 765 let _ = Pin::new(&mut stream_manager).await; 766 }); 767 handles.push(stream_handle); 768 769 let io_handle = crate::runtime::spawn(async move { 770 let mut io_manager = IOManager::new(io, conn, io_manager_rx, stream_manager_tx); 771 let _ = Pin::new(&mut io_manager).await; 772 }); 773 handles.push(io_handle); 774 // read_rx gets readable stream ids and writable client channels, then read 775 // stream and send to the corresponding channel 776 Self { 777 req_tx, 778 handles, 779 _mark: PhantomData, 780 io_shutdown, 781 io_goaway, 782 } 783 } 784 } 785 786 impl<S> Http3Conn<S> { 787 pub(crate) fn new( 788 sender: UnboundedSender<ReqMessage>, 789 io_shutdown: Arc<AtomicBool>, 790 ) -> Self { 791 const CHANNEL_SIZE: usize = 3; 792 let (resp_sender, resp_receiver) = bounded_channel(CHANNEL_SIZE); 793 Self { 794 sender, 795 resp_sender, 796 resp_receiver, 797 _mark: PhantomData, 798 io_shutdown, 799 } 800 } 801 802 pub(crate) fn send_frame_to_reader( 803 &mut self, 804 request: RequestWrapper, 805 ) -> Result<(), HttpClientError> { 806 self.sender 807 .send(ReqMessage { 808 request, 809 frame_tx: self.resp_sender.clone(), 810 }) 811 .map_err(|_| { 812 HttpClientError::from_str(ErrorKind::Request, "Request Sender Closed !") 813 }) 814 } 815 816 pub(crate) async fn recv_resp(&mut self) -> Result<Frame, HttpClientError> { 817 #[cfg(feature = "tokio_base")] 818 match self.resp_receiver.recv().await { 819 None => err_from_msg!(Request, "Response Receiver Closed !"), 820 Some(message) => match message { 821 RespMessage::Output(frame) => Ok(frame), 822 RespMessage::OutputExit(e) => Err(dispatch_client_error(e)), 823 }, 824 } 825 826 #[cfg(feature = "ylong_base")] 827 match self.resp_receiver.recv().await { 828 Err(err) => Err(HttpClientError::from_error(ErrorKind::Request, err)), 829 Ok(message) => match message { 830 RespMessage::Output(frame) => Ok(frame), 831 RespMessage::OutputExit(e) => Err(dispatch_client_error(e)), 832 }, 833 } 834 } 835 } 836 837 impl<S> ConnDispatcher<S> 838 where 839 S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static, 840 { 841 pub(crate) fn http3(config: H3Config, io: S, quic_connection: QuicConn) -> Self { 842 Self::Http3(Http3Dispatcher::new(config, io, quic_connection)) 843 } 844 } 845 846 impl<S> Dispatcher for Http3Dispatcher<S> { 847 type Handle = Http3Conn<S>; 848 dispatchnull849 fn dispatch(&self) -> Option<Self::Handle> { 850 let sender = self.req_tx.clone(); 851 Some(Http3Conn::new(sender, self.io_shutdown.clone())) 852 } 853 is_shutdownnull854 fn is_shutdown(&self) -> bool { 855 self.io_shutdown.load(Ordering::Relaxed) 856 } 857 is_goawaynull858 fn is_goaway(&self) -> bool { 859 self.io_goaway.load(Ordering::Relaxed) 860 } 861 } 862 863 impl<S> Drop for Http3Dispatcher<S> { dropnull864 fn drop(&mut self) { 865 for handle in &self.handles { 866 #[cfg(feature = "tokio_base")] 867 handle.abort(); 868 #[cfg(feature = "ylong_base")] 869 handle.cancel(); 870 } 871 } 872 } 873 874 impl From<std::io::Error> for DispatchErrorKind { fromnull875 fn from(value: std::io::Error) -> Self { 876 DispatchErrorKind::Io(value.kind()) 877 } 878 } 879 880 impl From<H3Error> for DispatchErrorKind { fromnull881 fn from(err: H3Error) -> Self { 882 DispatchErrorKind::H3(err) 883 } 884 } 885 886 impl From<quiche::Error> for DispatchErrorKind { fromnull887 fn from(value: quiche::Error) -> Self { 888 DispatchErrorKind::Quic(value) 889 } 890 } 891 892 pub(crate) fn dispatch_client_error(dispatch_error: DispatchErrorKind) -> HttpClientError { 893 match dispatch_error { 894 DispatchErrorKind::H3(e) => HttpClientError::from_error(Request, HttpError::from(e)), 895 DispatchErrorKind::Io(e) => { 896 HttpClientError::from_io_error(Request, std::io::Error::from(e)) 897 } 898 DispatchErrorKind::ChannelClosed => { 899 HttpClientError::from_str(Request, "Coroutine channel closed.") 900 } 901 DispatchErrorKind::Quic(e) => HttpClientError::from_error(Request, e), 902 DispatchErrorKind::GoawayReceived => { 903 HttpClientError::from_str(Request, "received remote goaway.") 904 } 905 DispatchErrorKind::StreamFinished => { 906 HttpClientError::from_str(Request, "stream finished.") 907 } 908 DispatchErrorKind::Disconnect => { 909 HttpClientError::from_str(Request, "remote peer closed.") 910 } 911 } 912 } 913 } 914 915 #[cfg(test)] 916 mod ut_dispatch { 917 use crate::dispatcher::{ConnDispatcher, Dispatcher}; 918 919 /// UT test cases for `ConnDispatcher::is_shutdown`. 920 /// 921 /// # Brief 922 /// 1. Creates a `ConnDispatcher`. 923 /// 2. Calls `ConnDispatcher::is_shutdown` to get the result. 924 /// 3. Calls `ConnDispatcher::dispatch` to get the result. 925 /// 4. Checks if the result is false. 926 #[test] ut_is_shutdownnull927 fn ut_is_shutdown() { 928 let conn = ConnDispatcher::http1(b"Data"); 929 let res = conn.is_shutdown(); 930 assert!(!res); 931 let res = conn.dispatch(); 932 assert!(res.is_some()); 933 } 934 } 935