16dbb5987Sopenharmony_ci// Copyright (c) 2023 Huawei Device Co., Ltd. 26dbb5987Sopenharmony_ci// Licensed under the Apache License, Version 2.0 (the "License"); 36dbb5987Sopenharmony_ci// you may not use this file except in compliance with the License. 46dbb5987Sopenharmony_ci// You may obtain a copy of the License at 56dbb5987Sopenharmony_ci// 66dbb5987Sopenharmony_ci// http://www.apache.org/licenses/LICENSE-2.0 76dbb5987Sopenharmony_ci// 86dbb5987Sopenharmony_ci// Unless required by applicable law or agreed to in writing, software 96dbb5987Sopenharmony_ci// distributed under the License is distributed on an "AS IS" BASIS, 106dbb5987Sopenharmony_ci// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 116dbb5987Sopenharmony_ci// See the License for the specific language governing permissions and 126dbb5987Sopenharmony_ci// limitations under the License. 136dbb5987Sopenharmony_ci 146dbb5987Sopenharmony_cipub(crate) trait Dispatcher { 156dbb5987Sopenharmony_ci type Handle; 166dbb5987Sopenharmony_ci 176dbb5987Sopenharmony_ci fn dispatch(&self) -> Option<Self::Handle>; 186dbb5987Sopenharmony_ci 196dbb5987Sopenharmony_ci fn is_shutdown(&self) -> bool; 206dbb5987Sopenharmony_ci 216dbb5987Sopenharmony_ci #[allow(dead_code)] 226dbb5987Sopenharmony_ci fn is_goaway(&self) -> bool; 236dbb5987Sopenharmony_ci} 246dbb5987Sopenharmony_ci 256dbb5987Sopenharmony_cipub(crate) enum ConnDispatcher<S> { 266dbb5987Sopenharmony_ci #[cfg(feature = "http1_1")] 276dbb5987Sopenharmony_ci Http1(http1::Http1Dispatcher<S>), 286dbb5987Sopenharmony_ci 296dbb5987Sopenharmony_ci #[cfg(feature = "http2")] 306dbb5987Sopenharmony_ci Http2(http2::Http2Dispatcher<S>), 316dbb5987Sopenharmony_ci 326dbb5987Sopenharmony_ci #[cfg(feature = "http3")] 336dbb5987Sopenharmony_ci Http3(http3::Http3Dispatcher<S>), 346dbb5987Sopenharmony_ci} 356dbb5987Sopenharmony_ci 366dbb5987Sopenharmony_ciimpl<S> Dispatcher for ConnDispatcher<S> { 376dbb5987Sopenharmony_ci type Handle = Conn<S>; 386dbb5987Sopenharmony_ci 396dbb5987Sopenharmony_ci fn dispatch(&self) -> Option<Self::Handle> { 406dbb5987Sopenharmony_ci match self { 416dbb5987Sopenharmony_ci #[cfg(feature = "http1_1")] 426dbb5987Sopenharmony_ci Self::Http1(h1) => h1.dispatch().map(Conn::Http1), 436dbb5987Sopenharmony_ci 446dbb5987Sopenharmony_ci #[cfg(feature = "http2")] 456dbb5987Sopenharmony_ci Self::Http2(h2) => h2.dispatch().map(Conn::Http2), 466dbb5987Sopenharmony_ci 476dbb5987Sopenharmony_ci #[cfg(feature = "http3")] 486dbb5987Sopenharmony_ci Self::Http3(h3) => h3.dispatch().map(Conn::Http3), 496dbb5987Sopenharmony_ci } 506dbb5987Sopenharmony_ci } 516dbb5987Sopenharmony_ci 526dbb5987Sopenharmony_ci fn is_shutdown(&self) -> bool { 536dbb5987Sopenharmony_ci match self { 546dbb5987Sopenharmony_ci #[cfg(feature = "http1_1")] 556dbb5987Sopenharmony_ci Self::Http1(h1) => h1.is_shutdown(), 566dbb5987Sopenharmony_ci 576dbb5987Sopenharmony_ci #[cfg(feature = "http2")] 586dbb5987Sopenharmony_ci Self::Http2(h2) => h2.is_shutdown(), 596dbb5987Sopenharmony_ci 606dbb5987Sopenharmony_ci #[cfg(feature = "http3")] 616dbb5987Sopenharmony_ci Self::Http3(h3) => h3.is_shutdown(), 626dbb5987Sopenharmony_ci } 636dbb5987Sopenharmony_ci } 646dbb5987Sopenharmony_ci 656dbb5987Sopenharmony_ci fn is_goaway(&self) -> bool { 666dbb5987Sopenharmony_ci match self { 676dbb5987Sopenharmony_ci #[cfg(feature = "http1_1")] 686dbb5987Sopenharmony_ci Self::Http1(h1) => h1.is_goaway(), 696dbb5987Sopenharmony_ci 706dbb5987Sopenharmony_ci #[cfg(feature = "http2")] 716dbb5987Sopenharmony_ci Self::Http2(h2) => h2.is_goaway(), 726dbb5987Sopenharmony_ci 736dbb5987Sopenharmony_ci #[cfg(feature = "http3")] 746dbb5987Sopenharmony_ci Self::Http3(h3) => h3.is_goaway(), 756dbb5987Sopenharmony_ci } 766dbb5987Sopenharmony_ci } 776dbb5987Sopenharmony_ci} 786dbb5987Sopenharmony_ci 796dbb5987Sopenharmony_cipub(crate) enum Conn<S> { 806dbb5987Sopenharmony_ci #[cfg(feature = "http1_1")] 816dbb5987Sopenharmony_ci Http1(http1::Http1Conn<S>), 826dbb5987Sopenharmony_ci 836dbb5987Sopenharmony_ci #[cfg(feature = "http2")] 846dbb5987Sopenharmony_ci Http2(http2::Http2Conn<S>), 856dbb5987Sopenharmony_ci 866dbb5987Sopenharmony_ci #[cfg(feature = "http3")] 876dbb5987Sopenharmony_ci Http3(http3::Http3Conn<S>), 886dbb5987Sopenharmony_ci} 896dbb5987Sopenharmony_ci 906dbb5987Sopenharmony_ci#[cfg(feature = "http1_1")] 916dbb5987Sopenharmony_cipub(crate) mod http1 { 926dbb5987Sopenharmony_ci use std::cell::UnsafeCell; 936dbb5987Sopenharmony_ci use std::sync::atomic::{AtomicBool, Ordering}; 946dbb5987Sopenharmony_ci use std::sync::Arc; 956dbb5987Sopenharmony_ci 966dbb5987Sopenharmony_ci use super::{ConnDispatcher, Dispatcher}; 976dbb5987Sopenharmony_ci 986dbb5987Sopenharmony_ci impl<S> ConnDispatcher<S> { 996dbb5987Sopenharmony_ci pub(crate) fn http1(io: S) -> Self { 1006dbb5987Sopenharmony_ci Self::Http1(Http1Dispatcher::new(io)) 1016dbb5987Sopenharmony_ci } 1026dbb5987Sopenharmony_ci } 1036dbb5987Sopenharmony_ci 1046dbb5987Sopenharmony_ci /// HTTP1-based connection manager, which can dispatch connections to other 1056dbb5987Sopenharmony_ci /// threads according to HTTP1 syntax. 1066dbb5987Sopenharmony_ci pub(crate) struct Http1Dispatcher<S> { 1076dbb5987Sopenharmony_ci inner: Arc<Inner<S>>, 1086dbb5987Sopenharmony_ci } 1096dbb5987Sopenharmony_ci 1106dbb5987Sopenharmony_ci pub(crate) struct Inner<S> { 1116dbb5987Sopenharmony_ci pub(crate) io: UnsafeCell<S>, 1126dbb5987Sopenharmony_ci // `occupied` indicates that the connection is occupied. Only one coroutine 1136dbb5987Sopenharmony_ci // can get the handle at the same time. Once the handle is fetched, the flag 1146dbb5987Sopenharmony_ci // position is true. 1156dbb5987Sopenharmony_ci pub(crate) occupied: AtomicBool, 1166dbb5987Sopenharmony_ci // `shutdown` indicates that the connection need to be shut down. 1176dbb5987Sopenharmony_ci pub(crate) shutdown: AtomicBool, 1186dbb5987Sopenharmony_ci } 1196dbb5987Sopenharmony_ci 1206dbb5987Sopenharmony_ci unsafe impl<S> Sync for Inner<S> {} 1216dbb5987Sopenharmony_ci 1226dbb5987Sopenharmony_ci impl<S> Http1Dispatcher<S> { 1236dbb5987Sopenharmony_ci pub(crate) fn new(io: S) -> Self { 1246dbb5987Sopenharmony_ci Self { 1256dbb5987Sopenharmony_ci inner: Arc::new(Inner { 1266dbb5987Sopenharmony_ci io: UnsafeCell::new(io), 1276dbb5987Sopenharmony_ci occupied: AtomicBool::new(false), 1286dbb5987Sopenharmony_ci shutdown: AtomicBool::new(false), 1296dbb5987Sopenharmony_ci }), 1306dbb5987Sopenharmony_ci } 1316dbb5987Sopenharmony_ci } 1326dbb5987Sopenharmony_ci } 1336dbb5987Sopenharmony_ci 1346dbb5987Sopenharmony_ci impl<S> Dispatcher for Http1Dispatcher<S> { 1356dbb5987Sopenharmony_ci type Handle = Http1Conn<S>; 1366dbb5987Sopenharmony_ci 1376dbb5987Sopenharmony_ci fn dispatch(&self) -> Option<Self::Handle> { 1386dbb5987Sopenharmony_ci self.inner 1396dbb5987Sopenharmony_ci .occupied 1406dbb5987Sopenharmony_ci .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) 1416dbb5987Sopenharmony_ci .ok() 1426dbb5987Sopenharmony_ci .map(|_| Http1Conn { 1436dbb5987Sopenharmony_ci inner: self.inner.clone(), 1446dbb5987Sopenharmony_ci }) 1456dbb5987Sopenharmony_ci } 1466dbb5987Sopenharmony_ci 1476dbb5987Sopenharmony_ci fn is_shutdown(&self) -> bool { 1486dbb5987Sopenharmony_ci self.inner.shutdown.load(Ordering::Relaxed) 1496dbb5987Sopenharmony_ci } 1506dbb5987Sopenharmony_ci 1516dbb5987Sopenharmony_ci fn is_goaway(&self) -> bool { 1526dbb5987Sopenharmony_ci false 1536dbb5987Sopenharmony_ci } 1546dbb5987Sopenharmony_ci } 1556dbb5987Sopenharmony_ci 1566dbb5987Sopenharmony_ci /// Handle returned to other threads for I/O operations. 1576dbb5987Sopenharmony_ci pub(crate) struct Http1Conn<S> { 1586dbb5987Sopenharmony_ci pub(crate) inner: Arc<Inner<S>>, 1596dbb5987Sopenharmony_ci } 1606dbb5987Sopenharmony_ci 1616dbb5987Sopenharmony_ci impl<S> Http1Conn<S> { 1626dbb5987Sopenharmony_ci pub(crate) fn raw_mut(&mut self) -> &mut S { 1636dbb5987Sopenharmony_ci // SAFETY: In the case of `HTTP1`, only one coroutine gets the handle 1646dbb5987Sopenharmony_ci // at the same time. 1656dbb5987Sopenharmony_ci unsafe { &mut *self.inner.io.get() } 1666dbb5987Sopenharmony_ci } 1676dbb5987Sopenharmony_ci 1686dbb5987Sopenharmony_ci pub(crate) fn shutdown(&self) { 1696dbb5987Sopenharmony_ci self.inner.shutdown.store(true, Ordering::Release); 1706dbb5987Sopenharmony_ci } 1716dbb5987Sopenharmony_ci } 1726dbb5987Sopenharmony_ci 1736dbb5987Sopenharmony_ci impl<S> Drop for Http1Conn<S> { 1746dbb5987Sopenharmony_ci fn drop(&mut self) { 1756dbb5987Sopenharmony_ci self.inner.occupied.store(false, Ordering::Release) 1766dbb5987Sopenharmony_ci } 1776dbb5987Sopenharmony_ci } 1786dbb5987Sopenharmony_ci} 1796dbb5987Sopenharmony_ci 1806dbb5987Sopenharmony_ci#[cfg(feature = "http2")] 1816dbb5987Sopenharmony_cipub(crate) mod http2 { 1826dbb5987Sopenharmony_ci use std::collections::HashMap; 1836dbb5987Sopenharmony_ci use std::future::Future; 1846dbb5987Sopenharmony_ci use std::marker::PhantomData; 1856dbb5987Sopenharmony_ci use std::pin::Pin; 1866dbb5987Sopenharmony_ci use std::sync::atomic::{AtomicBool, Ordering}; 1876dbb5987Sopenharmony_ci use std::sync::{Arc, Mutex}; 1886dbb5987Sopenharmony_ci use std::task::{Context, Poll}; 1896dbb5987Sopenharmony_ci 1906dbb5987Sopenharmony_ci use ylong_http::error::HttpError; 1916dbb5987Sopenharmony_ci use ylong_http::h2::{ 1926dbb5987Sopenharmony_ci ErrorCode, Frame, FrameDecoder, FrameEncoder, FrameFlags, Goaway, H2Error, Payload, 1936dbb5987Sopenharmony_ci RstStream, Settings, SettingsBuilder, StreamId, 1946dbb5987Sopenharmony_ci }; 1956dbb5987Sopenharmony_ci 1966dbb5987Sopenharmony_ci use crate::runtime::{ 1976dbb5987Sopenharmony_ci bounded_channel, unbounded_channel, AsyncRead, AsyncWrite, AsyncWriteExt, BoundedReceiver, 1986dbb5987Sopenharmony_ci BoundedSender, SendError, UnboundedReceiver, UnboundedSender, WriteHalf, 1996dbb5987Sopenharmony_ci }; 2006dbb5987Sopenharmony_ci use crate::util::config::H2Config; 2016dbb5987Sopenharmony_ci use crate::util::dispatcher::{ConnDispatcher, Dispatcher}; 2026dbb5987Sopenharmony_ci use crate::util::h2::{ 2036dbb5987Sopenharmony_ci ConnManager, FlowControl, H2StreamState, RecvData, RequestWrapper, SendData, 2046dbb5987Sopenharmony_ci StreamEndState, Streams, 2056dbb5987Sopenharmony_ci }; 2066dbb5987Sopenharmony_ci use crate::ErrorKind::Request; 2076dbb5987Sopenharmony_ci use crate::{ErrorKind, HttpClientError}; 2086dbb5987Sopenharmony_ci const DEFAULT_MAX_FRAME_SIZE: usize = 2 << 13; 2096dbb5987Sopenharmony_ci const DEFAULT_WINDOW_SIZE: u32 = 65535; 2106dbb5987Sopenharmony_ci 2116dbb5987Sopenharmony_ci pub(crate) type ManagerSendFut = 2126dbb5987Sopenharmony_ci Pin<Box<dyn Future<Output = Result<(), SendError<RespMessage>>> + Send + Sync>>; 2136dbb5987Sopenharmony_ci 2146dbb5987Sopenharmony_ci pub(crate) enum RespMessage { 2156dbb5987Sopenharmony_ci Output(Frame), 2166dbb5987Sopenharmony_ci OutputExit(DispatchErrorKind), 2176dbb5987Sopenharmony_ci } 2186dbb5987Sopenharmony_ci 2196dbb5987Sopenharmony_ci pub(crate) enum OutputMessage { 2206dbb5987Sopenharmony_ci Output(Frame), 2216dbb5987Sopenharmony_ci OutputExit(DispatchErrorKind), 2226dbb5987Sopenharmony_ci } 2236dbb5987Sopenharmony_ci 2246dbb5987Sopenharmony_ci pub(crate) struct ReqMessage { 2256dbb5987Sopenharmony_ci pub(crate) sender: BoundedSender<RespMessage>, 2266dbb5987Sopenharmony_ci pub(crate) request: RequestWrapper, 2276dbb5987Sopenharmony_ci } 2286dbb5987Sopenharmony_ci 2296dbb5987Sopenharmony_ci #[derive(Debug, Eq, PartialEq, Copy, Clone)] 2306dbb5987Sopenharmony_ci pub(crate) enum DispatchErrorKind { 2316dbb5987Sopenharmony_ci H2(H2Error), 2326dbb5987Sopenharmony_ci Io(std::io::ErrorKind), 2336dbb5987Sopenharmony_ci ChannelClosed, 2346dbb5987Sopenharmony_ci Disconnect, 2356dbb5987Sopenharmony_ci } 2366dbb5987Sopenharmony_ci 2376dbb5987Sopenharmony_ci // HTTP2-based connection manager, which can dispatch connections to other 2386dbb5987Sopenharmony_ci // threads according to HTTP2 syntax. 2396dbb5987Sopenharmony_ci pub(crate) struct Http2Dispatcher<S> { 2406dbb5987Sopenharmony_ci pub(crate) allowed_cache: usize, 2416dbb5987Sopenharmony_ci pub(crate) sender: UnboundedSender<ReqMessage>, 2426dbb5987Sopenharmony_ci pub(crate) io_shutdown: Arc<AtomicBool>, 2436dbb5987Sopenharmony_ci pub(crate) handles: Vec<crate::runtime::JoinHandle<()>>, 2446dbb5987Sopenharmony_ci pub(crate) _mark: PhantomData<S>, 2456dbb5987Sopenharmony_ci } 2466dbb5987Sopenharmony_ci 2476dbb5987Sopenharmony_ci pub(crate) struct Http2Conn<S> { 2486dbb5987Sopenharmony_ci pub(crate) allow_cached_frames: usize, 2496dbb5987Sopenharmony_ci // Sends frame to StreamController 2506dbb5987Sopenharmony_ci pub(crate) sender: UnboundedSender<ReqMessage>, 2516dbb5987Sopenharmony_ci pub(crate) receiver: RespReceiver, 2526dbb5987Sopenharmony_ci pub(crate) io_shutdown: Arc<AtomicBool>, 2536dbb5987Sopenharmony_ci pub(crate) _mark: PhantomData<S>, 2546dbb5987Sopenharmony_ci } 2556dbb5987Sopenharmony_ci 2566dbb5987Sopenharmony_ci pub(crate) struct StreamController { 2576dbb5987Sopenharmony_ci // The connection close flag organizes new stream commits to the current connection when 2586dbb5987Sopenharmony_ci // closed. 2596dbb5987Sopenharmony_ci pub(crate) io_shutdown: Arc<AtomicBool>, 2606dbb5987Sopenharmony_ci // The senders of all connected stream channels of response. 2616dbb5987Sopenharmony_ci pub(crate) senders: HashMap<StreamId, BoundedSender<RespMessage>>, 2626dbb5987Sopenharmony_ci pub(crate) curr_message: HashMap<StreamId, ManagerSendFut>, 2636dbb5987Sopenharmony_ci // Stream information on the connection. 2646dbb5987Sopenharmony_ci pub(crate) streams: Streams, 2656dbb5987Sopenharmony_ci // Received GO_AWAY frame. 2666dbb5987Sopenharmony_ci pub(crate) recved_go_away: Option<StreamId>, 2676dbb5987Sopenharmony_ci // The last GO_AWAY frame sent by the client. 2686dbb5987Sopenharmony_ci pub(crate) go_away_sync: GoAwaySync, 2696dbb5987Sopenharmony_ci } 2706dbb5987Sopenharmony_ci 2716dbb5987Sopenharmony_ci #[derive(Default)] 2726dbb5987Sopenharmony_ci pub(crate) struct GoAwaySync { 2736dbb5987Sopenharmony_ci pub(crate) going_away: Option<Goaway>, 2746dbb5987Sopenharmony_ci } 2756dbb5987Sopenharmony_ci 2766dbb5987Sopenharmony_ci #[derive(Default)] 2776dbb5987Sopenharmony_ci pub(crate) struct SettingsSync { 2786dbb5987Sopenharmony_ci pub(crate) settings: SettingsState, 2796dbb5987Sopenharmony_ci } 2806dbb5987Sopenharmony_ci 2816dbb5987Sopenharmony_ci #[derive(Default, Clone)] 2826dbb5987Sopenharmony_ci pub(crate) enum SettingsState { 2836dbb5987Sopenharmony_ci Acknowledging(Settings), 2846dbb5987Sopenharmony_ci #[default] 2856dbb5987Sopenharmony_ci Synced, 2866dbb5987Sopenharmony_ci } 2876dbb5987Sopenharmony_ci 2886dbb5987Sopenharmony_ci #[derive(Default)] 2896dbb5987Sopenharmony_ci pub(crate) struct RespReceiver { 2906dbb5987Sopenharmony_ci receiver: Option<BoundedReceiver<RespMessage>>, 2916dbb5987Sopenharmony_ci } 2926dbb5987Sopenharmony_ci 2936dbb5987Sopenharmony_ci impl<S> ConnDispatcher<S> 2946dbb5987Sopenharmony_ci where 2956dbb5987Sopenharmony_ci S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static, 2966dbb5987Sopenharmony_ci { 2976dbb5987Sopenharmony_ci pub(crate) fn http2(config: H2Config, io: S) -> Self { 2986dbb5987Sopenharmony_ci Self::Http2(Http2Dispatcher::new(config, io)) 2996dbb5987Sopenharmony_ci } 3006dbb5987Sopenharmony_ci } 3016dbb5987Sopenharmony_ci 3026dbb5987Sopenharmony_ci impl<S> Http2Dispatcher<S> 3036dbb5987Sopenharmony_ci where 3046dbb5987Sopenharmony_ci S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static, 3056dbb5987Sopenharmony_ci { 3066dbb5987Sopenharmony_ci pub(crate) fn new(config: H2Config, io: S) -> Self { 3076dbb5987Sopenharmony_ci let settings = create_initial_settings(&config); 3086dbb5987Sopenharmony_ci 3096dbb5987Sopenharmony_ci let mut flow = FlowControl::new(DEFAULT_WINDOW_SIZE, DEFAULT_WINDOW_SIZE); 3106dbb5987Sopenharmony_ci flow.setup_recv_window(config.conn_window_size()); 3116dbb5987Sopenharmony_ci 3126dbb5987Sopenharmony_ci let streams = Streams::new(config.stream_window_size(), DEFAULT_WINDOW_SIZE, flow); 3136dbb5987Sopenharmony_ci let shutdown_flag = Arc::new(AtomicBool::new(false)); 3146dbb5987Sopenharmony_ci let controller = StreamController::new(streams, shutdown_flag.clone()); 3156dbb5987Sopenharmony_ci 3166dbb5987Sopenharmony_ci let (input_tx, input_rx) = unbounded_channel(); 3176dbb5987Sopenharmony_ci let (req_tx, req_rx) = unbounded_channel(); 3186dbb5987Sopenharmony_ci 3196dbb5987Sopenharmony_ci // Error is not possible, so it is not handled for the time 3206dbb5987Sopenharmony_ci // being. 3216dbb5987Sopenharmony_ci let mut handles = Vec::with_capacity(3); 3226dbb5987Sopenharmony_ci if input_tx.send(settings).is_ok() { 3236dbb5987Sopenharmony_ci Self::launch( 3246dbb5987Sopenharmony_ci config.allowed_cache_frame_size(), 3256dbb5987Sopenharmony_ci config.use_huffman_coding(), 3266dbb5987Sopenharmony_ci controller, 3276dbb5987Sopenharmony_ci (input_tx, input_rx), 3286dbb5987Sopenharmony_ci req_rx, 3296dbb5987Sopenharmony_ci &mut handles, 3306dbb5987Sopenharmony_ci io, 3316dbb5987Sopenharmony_ci ); 3326dbb5987Sopenharmony_ci } 3336dbb5987Sopenharmony_ci Self { 3346dbb5987Sopenharmony_ci allowed_cache: config.allowed_cache_frame_size(), 3356dbb5987Sopenharmony_ci sender: req_tx, 3366dbb5987Sopenharmony_ci io_shutdown: shutdown_flag, 3376dbb5987Sopenharmony_ci handles, 3386dbb5987Sopenharmony_ci _mark: PhantomData, 3396dbb5987Sopenharmony_ci } 3406dbb5987Sopenharmony_ci } 3416dbb5987Sopenharmony_ci 3426dbb5987Sopenharmony_ci fn launch( 3436dbb5987Sopenharmony_ci allow_num: usize, 3446dbb5987Sopenharmony_ci use_huffman: bool, 3456dbb5987Sopenharmony_ci controller: StreamController, 3466dbb5987Sopenharmony_ci input_channel: (UnboundedSender<Frame>, UnboundedReceiver<Frame>), 3476dbb5987Sopenharmony_ci req_rx: UnboundedReceiver<ReqMessage>, 3486dbb5987Sopenharmony_ci handles: &mut Vec<crate::runtime::JoinHandle<()>>, 3496dbb5987Sopenharmony_ci io: S, 3506dbb5987Sopenharmony_ci ) { 3516dbb5987Sopenharmony_ci let (resp_tx, resp_rx) = bounded_channel(allow_num); 3526dbb5987Sopenharmony_ci let (read, write) = crate::runtime::split(io); 3536dbb5987Sopenharmony_ci let settings_sync = Arc::new(Mutex::new(SettingsSync::default())); 3546dbb5987Sopenharmony_ci let send_settings_sync = settings_sync.clone(); 3556dbb5987Sopenharmony_ci let send = crate::runtime::spawn(async move { 3566dbb5987Sopenharmony_ci let mut writer = write; 3576dbb5987Sopenharmony_ci if async_send_preface(&mut writer).await.is_ok() { 3586dbb5987Sopenharmony_ci let encoder = FrameEncoder::new(DEFAULT_MAX_FRAME_SIZE, use_huffman); 3596dbb5987Sopenharmony_ci let mut send = 3606dbb5987Sopenharmony_ci SendData::new(encoder, send_settings_sync, writer, input_channel.1); 3616dbb5987Sopenharmony_ci let _ = Pin::new(&mut send).await; 3626dbb5987Sopenharmony_ci } 3636dbb5987Sopenharmony_ci }); 3646dbb5987Sopenharmony_ci handles.push(send); 3656dbb5987Sopenharmony_ci 3666dbb5987Sopenharmony_ci let recv_settings_sync = settings_sync.clone(); 3676dbb5987Sopenharmony_ci let recv = crate::runtime::spawn(async move { 3686dbb5987Sopenharmony_ci let decoder = FrameDecoder::new(); 3696dbb5987Sopenharmony_ci let mut recv = RecvData::new(decoder, recv_settings_sync, read, resp_tx); 3706dbb5987Sopenharmony_ci let _ = Pin::new(&mut recv).await; 3716dbb5987Sopenharmony_ci }); 3726dbb5987Sopenharmony_ci handles.push(recv); 3736dbb5987Sopenharmony_ci 3746dbb5987Sopenharmony_ci let manager = crate::runtime::spawn(async move { 3756dbb5987Sopenharmony_ci let mut conn_manager = 3766dbb5987Sopenharmony_ci ConnManager::new(settings_sync, input_channel.0, resp_rx, req_rx, controller); 3776dbb5987Sopenharmony_ci let _ = Pin::new(&mut conn_manager).await; 3786dbb5987Sopenharmony_ci }); 3796dbb5987Sopenharmony_ci handles.push(manager); 3806dbb5987Sopenharmony_ci } 3816dbb5987Sopenharmony_ci } 3826dbb5987Sopenharmony_ci 3836dbb5987Sopenharmony_ci impl<S> Dispatcher for Http2Dispatcher<S> { 3846dbb5987Sopenharmony_ci type Handle = Http2Conn<S>; 3856dbb5987Sopenharmony_ci 3866dbb5987Sopenharmony_ci fn dispatch(&self) -> Option<Self::Handle> { 3876dbb5987Sopenharmony_ci let sender = self.sender.clone(); 3886dbb5987Sopenharmony_ci let handle = Http2Conn::new(self.allowed_cache, self.io_shutdown.clone(), sender); 3896dbb5987Sopenharmony_ci Some(handle) 3906dbb5987Sopenharmony_ci } 3916dbb5987Sopenharmony_ci 3926dbb5987Sopenharmony_ci fn is_shutdown(&self) -> bool { 3936dbb5987Sopenharmony_ci self.io_shutdown.load(Ordering::Relaxed) 3946dbb5987Sopenharmony_ci } 3956dbb5987Sopenharmony_ci 3966dbb5987Sopenharmony_ci fn is_goaway(&self) -> bool { 3976dbb5987Sopenharmony_ci // todo: goaway and shutdown 3986dbb5987Sopenharmony_ci false 3996dbb5987Sopenharmony_ci } 4006dbb5987Sopenharmony_ci } 4016dbb5987Sopenharmony_ci 4026dbb5987Sopenharmony_ci impl<S> Drop for Http2Dispatcher<S> { 4036dbb5987Sopenharmony_ci fn drop(&mut self) { 4046dbb5987Sopenharmony_ci for handle in &self.handles { 4056dbb5987Sopenharmony_ci #[cfg(feature = "ylong_base")] 4066dbb5987Sopenharmony_ci handle.cancel(); 4076dbb5987Sopenharmony_ci #[cfg(feature = "tokio_base")] 4086dbb5987Sopenharmony_ci handle.abort(); 4096dbb5987Sopenharmony_ci } 4106dbb5987Sopenharmony_ci } 4116dbb5987Sopenharmony_ci } 4126dbb5987Sopenharmony_ci 4136dbb5987Sopenharmony_ci impl<S> Http2Conn<S> { 4146dbb5987Sopenharmony_ci pub(crate) fn new( 4156dbb5987Sopenharmony_ci allow_cached_num: usize, 4166dbb5987Sopenharmony_ci io_shutdown: Arc<AtomicBool>, 4176dbb5987Sopenharmony_ci sender: UnboundedSender<ReqMessage>, 4186dbb5987Sopenharmony_ci ) -> Self { 4196dbb5987Sopenharmony_ci Self { 4206dbb5987Sopenharmony_ci allow_cached_frames: allow_cached_num, 4216dbb5987Sopenharmony_ci sender, 4226dbb5987Sopenharmony_ci receiver: RespReceiver::default(), 4236dbb5987Sopenharmony_ci io_shutdown, 4246dbb5987Sopenharmony_ci _mark: PhantomData, 4256dbb5987Sopenharmony_ci } 4266dbb5987Sopenharmony_ci } 4276dbb5987Sopenharmony_ci 4286dbb5987Sopenharmony_ci pub(crate) fn send_frame_to_controller( 4296dbb5987Sopenharmony_ci &mut self, 4306dbb5987Sopenharmony_ci request: RequestWrapper, 4316dbb5987Sopenharmony_ci ) -> Result<(), HttpClientError> { 4326dbb5987Sopenharmony_ci let (tx, rx) = bounded_channel::<RespMessage>(self.allow_cached_frames); 4336dbb5987Sopenharmony_ci self.receiver.set_receiver(rx); 4346dbb5987Sopenharmony_ci self.sender 4356dbb5987Sopenharmony_ci .send(ReqMessage { 4366dbb5987Sopenharmony_ci sender: tx, 4376dbb5987Sopenharmony_ci request, 4386dbb5987Sopenharmony_ci }) 4396dbb5987Sopenharmony_ci .map_err(|_| { 4406dbb5987Sopenharmony_ci HttpClientError::from_str(ErrorKind::Request, "Request Sender Closed !") 4416dbb5987Sopenharmony_ci }) 4426dbb5987Sopenharmony_ci } 4436dbb5987Sopenharmony_ci } 4446dbb5987Sopenharmony_ci 4456dbb5987Sopenharmony_ci impl StreamController { 4466dbb5987Sopenharmony_ci pub(crate) fn new(streams: Streams, shutdown: Arc<AtomicBool>) -> Self { 4476dbb5987Sopenharmony_ci Self { 4486dbb5987Sopenharmony_ci io_shutdown: shutdown, 4496dbb5987Sopenharmony_ci senders: HashMap::new(), 4506dbb5987Sopenharmony_ci curr_message: HashMap::new(), 4516dbb5987Sopenharmony_ci streams, 4526dbb5987Sopenharmony_ci recved_go_away: None, 4536dbb5987Sopenharmony_ci go_away_sync: GoAwaySync::default(), 4546dbb5987Sopenharmony_ci } 4556dbb5987Sopenharmony_ci } 4566dbb5987Sopenharmony_ci 4576dbb5987Sopenharmony_ci pub(crate) fn shutdown(&self) { 4586dbb5987Sopenharmony_ci self.io_shutdown.store(true, Ordering::Release); 4596dbb5987Sopenharmony_ci } 4606dbb5987Sopenharmony_ci 4616dbb5987Sopenharmony_ci pub(crate) fn get_unsent_streams( 4626dbb5987Sopenharmony_ci &mut self, 4636dbb5987Sopenharmony_ci last_stream_id: StreamId, 4646dbb5987Sopenharmony_ci ) -> Result<Vec<StreamId>, H2Error> { 4656dbb5987Sopenharmony_ci // The last-stream-id in the subsequent GO_AWAY frame 4666dbb5987Sopenharmony_ci // cannot be greater than the last-stream-id in the previous GO_AWAY frame. 4676dbb5987Sopenharmony_ci if self.streams.max_send_id < last_stream_id { 4686dbb5987Sopenharmony_ci return Err(H2Error::ConnectionError(ErrorCode::ProtocolError)); 4696dbb5987Sopenharmony_ci } 4706dbb5987Sopenharmony_ci self.streams.max_send_id = last_stream_id; 4716dbb5987Sopenharmony_ci Ok(self.streams.get_go_away_streams(last_stream_id)) 4726dbb5987Sopenharmony_ci } 4736dbb5987Sopenharmony_ci 4746dbb5987Sopenharmony_ci pub(crate) fn send_message_to_stream( 4756dbb5987Sopenharmony_ci &mut self, 4766dbb5987Sopenharmony_ci cx: &mut Context<'_>, 4776dbb5987Sopenharmony_ci stream_id: StreamId, 4786dbb5987Sopenharmony_ci message: RespMessage, 4796dbb5987Sopenharmony_ci ) -> Poll<Result<(), H2Error>> { 4806dbb5987Sopenharmony_ci if let Some(sender) = self.senders.get(&stream_id) { 4816dbb5987Sopenharmony_ci // If the client coroutine has exited, this frame is skipped. 4826dbb5987Sopenharmony_ci let mut tx = { 4836dbb5987Sopenharmony_ci let sender = sender.clone(); 4846dbb5987Sopenharmony_ci let ft = async move { sender.send(message).await }; 4856dbb5987Sopenharmony_ci Box::pin(ft) 4866dbb5987Sopenharmony_ci }; 4876dbb5987Sopenharmony_ci 4886dbb5987Sopenharmony_ci match tx.as_mut().poll(cx) { 4896dbb5987Sopenharmony_ci Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), 4906dbb5987Sopenharmony_ci // The current coroutine sending the request exited prematurely. 4916dbb5987Sopenharmony_ci Poll::Ready(Err(_)) => { 4926dbb5987Sopenharmony_ci self.senders.remove(&stream_id); 4936dbb5987Sopenharmony_ci Poll::Ready(Err(H2Error::StreamError(stream_id, ErrorCode::NoError))) 4946dbb5987Sopenharmony_ci } 4956dbb5987Sopenharmony_ci Poll::Pending => { 4966dbb5987Sopenharmony_ci self.curr_message.insert(stream_id, tx); 4976dbb5987Sopenharmony_ci Poll::Pending 4986dbb5987Sopenharmony_ci } 4996dbb5987Sopenharmony_ci } 5006dbb5987Sopenharmony_ci } else { 5016dbb5987Sopenharmony_ci Poll::Ready(Err(H2Error::StreamError(stream_id, ErrorCode::NoError))) 5026dbb5987Sopenharmony_ci } 5036dbb5987Sopenharmony_ci } 5046dbb5987Sopenharmony_ci 5056dbb5987Sopenharmony_ci pub(crate) fn poll_blocked_message( 5066dbb5987Sopenharmony_ci &mut self, 5076dbb5987Sopenharmony_ci cx: &mut Context<'_>, 5086dbb5987Sopenharmony_ci input_tx: &UnboundedSender<Frame>, 5096dbb5987Sopenharmony_ci ) -> Poll<()> { 5106dbb5987Sopenharmony_ci let keys: Vec<StreamId> = self.curr_message.keys().cloned().collect(); 5116dbb5987Sopenharmony_ci let mut blocked = false; 5126dbb5987Sopenharmony_ci 5136dbb5987Sopenharmony_ci for key in keys { 5146dbb5987Sopenharmony_ci if let Some(mut task) = self.curr_message.remove(&key) { 5156dbb5987Sopenharmony_ci match task.as_mut().poll(cx) { 5166dbb5987Sopenharmony_ci Poll::Ready(Ok(_)) => {} 5176dbb5987Sopenharmony_ci // The current coroutine sending the request exited prematurely. 5186dbb5987Sopenharmony_ci Poll::Ready(Err(_)) => { 5196dbb5987Sopenharmony_ci self.senders.remove(&key); 5206dbb5987Sopenharmony_ci if let Some(state) = self.streams.stream_state(key) { 5216dbb5987Sopenharmony_ci if !matches!(state, H2StreamState::Closed(_)) { 5226dbb5987Sopenharmony_ci if let StreamEndState::OK = self.streams.send_local_reset(key) { 5236dbb5987Sopenharmony_ci let rest_payload = 5246dbb5987Sopenharmony_ci RstStream::new(ErrorCode::NoError.into_code()); 5256dbb5987Sopenharmony_ci let frame = Frame::new( 5266dbb5987Sopenharmony_ci key, 5276dbb5987Sopenharmony_ci FrameFlags::empty(), 5286dbb5987Sopenharmony_ci Payload::RstStream(rest_payload), 5296dbb5987Sopenharmony_ci ); 5306dbb5987Sopenharmony_ci // ignore the send error occurs here in order to finish all 5316dbb5987Sopenharmony_ci // tasks. 5326dbb5987Sopenharmony_ci let _ = input_tx.send(frame); 5336dbb5987Sopenharmony_ci } 5346dbb5987Sopenharmony_ci } 5356dbb5987Sopenharmony_ci } 5366dbb5987Sopenharmony_ci } 5376dbb5987Sopenharmony_ci Poll::Pending => { 5386dbb5987Sopenharmony_ci self.curr_message.insert(key, task); 5396dbb5987Sopenharmony_ci blocked = true; 5406dbb5987Sopenharmony_ci } 5416dbb5987Sopenharmony_ci } 5426dbb5987Sopenharmony_ci } 5436dbb5987Sopenharmony_ci } 5446dbb5987Sopenharmony_ci if blocked { 5456dbb5987Sopenharmony_ci Poll::Pending 5466dbb5987Sopenharmony_ci } else { 5476dbb5987Sopenharmony_ci Poll::Ready(()) 5486dbb5987Sopenharmony_ci } 5496dbb5987Sopenharmony_ci } 5506dbb5987Sopenharmony_ci } 5516dbb5987Sopenharmony_ci 5526dbb5987Sopenharmony_ci impl RespReceiver { 5536dbb5987Sopenharmony_ci pub(crate) fn set_receiver(&mut self, receiver: BoundedReceiver<RespMessage>) { 5546dbb5987Sopenharmony_ci self.receiver = Some(receiver); 5556dbb5987Sopenharmony_ci } 5566dbb5987Sopenharmony_ci 5576dbb5987Sopenharmony_ci pub(crate) async fn recv(&mut self) -> Result<Frame, HttpClientError> { 5586dbb5987Sopenharmony_ci match self.receiver { 5596dbb5987Sopenharmony_ci Some(ref mut receiver) => { 5606dbb5987Sopenharmony_ci #[cfg(feature = "tokio_base")] 5616dbb5987Sopenharmony_ci match receiver.recv().await { 5626dbb5987Sopenharmony_ci None => err_from_msg!(Request, "Response Receiver Closed !"), 5636dbb5987Sopenharmony_ci Some(message) => match message { 5646dbb5987Sopenharmony_ci RespMessage::Output(frame) => Ok(frame), 5656dbb5987Sopenharmony_ci RespMessage::OutputExit(e) => Err(dispatch_client_error(e)), 5666dbb5987Sopenharmony_ci }, 5676dbb5987Sopenharmony_ci } 5686dbb5987Sopenharmony_ci 5696dbb5987Sopenharmony_ci #[cfg(feature = "ylong_base")] 5706dbb5987Sopenharmony_ci match receiver.recv().await { 5716dbb5987Sopenharmony_ci Err(err) => Err(HttpClientError::from_error(ErrorKind::Request, err)), 5726dbb5987Sopenharmony_ci Ok(message) => match message { 5736dbb5987Sopenharmony_ci RespMessage::Output(frame) => Ok(frame), 5746dbb5987Sopenharmony_ci RespMessage::OutputExit(e) => Err(dispatch_client_error(e)), 5756dbb5987Sopenharmony_ci }, 5766dbb5987Sopenharmony_ci } 5776dbb5987Sopenharmony_ci } 5786dbb5987Sopenharmony_ci // this will not happen. 5796dbb5987Sopenharmony_ci None => Err(HttpClientError::from_str( 5806dbb5987Sopenharmony_ci ErrorKind::Request, 5816dbb5987Sopenharmony_ci "Invalid Frame Receiver !", 5826dbb5987Sopenharmony_ci )), 5836dbb5987Sopenharmony_ci } 5846dbb5987Sopenharmony_ci } 5856dbb5987Sopenharmony_ci 5866dbb5987Sopenharmony_ci pub(crate) fn poll_recv( 5876dbb5987Sopenharmony_ci &mut self, 5886dbb5987Sopenharmony_ci cx: &mut Context<'_>, 5896dbb5987Sopenharmony_ci ) -> Poll<Result<Frame, HttpClientError>> { 5906dbb5987Sopenharmony_ci if let Some(ref mut receiver) = self.receiver { 5916dbb5987Sopenharmony_ci #[cfg(feature = "tokio_base")] 5926dbb5987Sopenharmony_ci match receiver.poll_recv(cx) { 5936dbb5987Sopenharmony_ci Poll::Ready(None) => { 5946dbb5987Sopenharmony_ci Poll::Ready(err_from_msg!(Request, "Error receive response !")) 5956dbb5987Sopenharmony_ci } 5966dbb5987Sopenharmony_ci Poll::Ready(Some(message)) => match message { 5976dbb5987Sopenharmony_ci RespMessage::Output(frame) => Poll::Ready(Ok(frame)), 5986dbb5987Sopenharmony_ci RespMessage::OutputExit(e) => Poll::Ready(Err(dispatch_client_error(e))), 5996dbb5987Sopenharmony_ci }, 6006dbb5987Sopenharmony_ci Poll::Pending => Poll::Pending, 6016dbb5987Sopenharmony_ci } 6026dbb5987Sopenharmony_ci 6036dbb5987Sopenharmony_ci #[cfg(feature = "ylong_base")] 6046dbb5987Sopenharmony_ci match receiver.poll_recv(cx) { 6056dbb5987Sopenharmony_ci Poll::Ready(Err(e)) => { 6066dbb5987Sopenharmony_ci Poll::Ready(Err(HttpClientError::from_error(ErrorKind::Request, e))) 6076dbb5987Sopenharmony_ci } 6086dbb5987Sopenharmony_ci Poll::Ready(Ok(message)) => match message { 6096dbb5987Sopenharmony_ci RespMessage::Output(frame) => Poll::Ready(Ok(frame)), 6106dbb5987Sopenharmony_ci RespMessage::OutputExit(e) => Poll::Ready(Err(dispatch_client_error(e))), 6116dbb5987Sopenharmony_ci }, 6126dbb5987Sopenharmony_ci Poll::Pending => Poll::Pending, 6136dbb5987Sopenharmony_ci } 6146dbb5987Sopenharmony_ci } else { 6156dbb5987Sopenharmony_ci Poll::Ready(err_from_msg!(Request, "Invalid Frame Receiver !")) 6166dbb5987Sopenharmony_ci } 6176dbb5987Sopenharmony_ci } 6186dbb5987Sopenharmony_ci } 6196dbb5987Sopenharmony_ci 6206dbb5987Sopenharmony_ci async fn async_send_preface<S>(writer: &mut WriteHalf<S>) -> Result<(), DispatchErrorKind> 6216dbb5987Sopenharmony_ci where 6226dbb5987Sopenharmony_ci S: AsyncWrite + Unpin, 6236dbb5987Sopenharmony_ci { 6246dbb5987Sopenharmony_ci const PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; 6256dbb5987Sopenharmony_ci writer 6266dbb5987Sopenharmony_ci .write_all(PREFACE) 6276dbb5987Sopenharmony_ci .await 6286dbb5987Sopenharmony_ci .map_err(|e| DispatchErrorKind::Io(e.kind())) 6296dbb5987Sopenharmony_ci } 6306dbb5987Sopenharmony_ci 6316dbb5987Sopenharmony_ci pub(crate) fn create_initial_settings(config: &H2Config) -> Frame { 6326dbb5987Sopenharmony_ci let settings = SettingsBuilder::new() 6336dbb5987Sopenharmony_ci .max_header_list_size(config.max_header_list_size()) 6346dbb5987Sopenharmony_ci .max_frame_size(config.max_frame_size()) 6356dbb5987Sopenharmony_ci .header_table_size(config.header_table_size()) 6366dbb5987Sopenharmony_ci .enable_push(config.enable_push()) 6376dbb5987Sopenharmony_ci .initial_window_size(config.stream_window_size()) 6386dbb5987Sopenharmony_ci .build(); 6396dbb5987Sopenharmony_ci 6406dbb5987Sopenharmony_ci Frame::new(0, FrameFlags::new(0), Payload::Settings(settings)) 6416dbb5987Sopenharmony_ci } 6426dbb5987Sopenharmony_ci 6436dbb5987Sopenharmony_ci impl From<std::io::Error> for DispatchErrorKind { 6446dbb5987Sopenharmony_ci fn from(value: std::io::Error) -> Self { 6456dbb5987Sopenharmony_ci DispatchErrorKind::Io(value.kind()) 6466dbb5987Sopenharmony_ci } 6476dbb5987Sopenharmony_ci } 6486dbb5987Sopenharmony_ci 6496dbb5987Sopenharmony_ci impl From<H2Error> for DispatchErrorKind { 6506dbb5987Sopenharmony_ci fn from(err: H2Error) -> Self { 6516dbb5987Sopenharmony_ci DispatchErrorKind::H2(err) 6526dbb5987Sopenharmony_ci } 6536dbb5987Sopenharmony_ci } 6546dbb5987Sopenharmony_ci 6556dbb5987Sopenharmony_ci pub(crate) fn dispatch_client_error(dispatch_error: DispatchErrorKind) -> HttpClientError { 6566dbb5987Sopenharmony_ci match dispatch_error { 6576dbb5987Sopenharmony_ci DispatchErrorKind::H2(e) => HttpClientError::from_error(Request, HttpError::from(e)), 6586dbb5987Sopenharmony_ci DispatchErrorKind::Io(e) => { 6596dbb5987Sopenharmony_ci HttpClientError::from_io_error(Request, std::io::Error::from(e)) 6606dbb5987Sopenharmony_ci } 6616dbb5987Sopenharmony_ci DispatchErrorKind::ChannelClosed => { 6626dbb5987Sopenharmony_ci HttpClientError::from_str(Request, "Coroutine channel closed.") 6636dbb5987Sopenharmony_ci } 6646dbb5987Sopenharmony_ci DispatchErrorKind::Disconnect => { 6656dbb5987Sopenharmony_ci HttpClientError::from_str(Request, "remote peer closed.") 6666dbb5987Sopenharmony_ci } 6676dbb5987Sopenharmony_ci } 6686dbb5987Sopenharmony_ci } 6696dbb5987Sopenharmony_ci} 6706dbb5987Sopenharmony_ci 6716dbb5987Sopenharmony_ci#[cfg(feature = "http3")] 6726dbb5987Sopenharmony_cipub(crate) mod http3 { 6736dbb5987Sopenharmony_ci use std::marker::PhantomData; 6746dbb5987Sopenharmony_ci use std::pin::Pin; 6756dbb5987Sopenharmony_ci use std::sync::atomic::{AtomicBool, Ordering}; 6766dbb5987Sopenharmony_ci use std::sync::{Arc, Mutex}; 6776dbb5987Sopenharmony_ci 6786dbb5987Sopenharmony_ci use ylong_http::error::HttpError; 6796dbb5987Sopenharmony_ci use ylong_http::h3::{Frame, FrameDecoder, H3Error}; 6806dbb5987Sopenharmony_ci 6816dbb5987Sopenharmony_ci use crate::async_impl::{ConnInfo, QuicConn}; 6826dbb5987Sopenharmony_ci use crate::runtime::{ 6836dbb5987Sopenharmony_ci bounded_channel, unbounded_channel, AsyncRead, AsyncWrite, BoundedReceiver, BoundedSender, 6846dbb5987Sopenharmony_ci UnboundedSender, 6856dbb5987Sopenharmony_ci }; 6866dbb5987Sopenharmony_ci use crate::util::config::H3Config; 6876dbb5987Sopenharmony_ci use crate::util::data_ref::BodyDataRef; 6886dbb5987Sopenharmony_ci use crate::util::dispatcher::{ConnDispatcher, Dispatcher}; 6896dbb5987Sopenharmony_ci use crate::util::h3::io_manager::IOManager; 6906dbb5987Sopenharmony_ci use crate::util::h3::stream_manager::StreamManager; 6916dbb5987Sopenharmony_ci use crate::ErrorKind::Request; 6926dbb5987Sopenharmony_ci use crate::{ErrorKind, HttpClientError}; 6936dbb5987Sopenharmony_ci 6946dbb5987Sopenharmony_ci pub(crate) struct Http3Dispatcher<S> { 6956dbb5987Sopenharmony_ci pub(crate) req_tx: UnboundedSender<ReqMessage>, 6966dbb5987Sopenharmony_ci pub(crate) handles: Vec<crate::runtime::JoinHandle<()>>, 6976dbb5987Sopenharmony_ci pub(crate) _mark: PhantomData<S>, 6986dbb5987Sopenharmony_ci pub(crate) io_shutdown: Arc<AtomicBool>, 6996dbb5987Sopenharmony_ci pub(crate) io_goaway: Arc<AtomicBool>, 7006dbb5987Sopenharmony_ci } 7016dbb5987Sopenharmony_ci 7026dbb5987Sopenharmony_ci pub(crate) struct Http3Conn<S> { 7036dbb5987Sopenharmony_ci pub(crate) sender: UnboundedSender<ReqMessage>, 7046dbb5987Sopenharmony_ci pub(crate) resp_receiver: BoundedReceiver<RespMessage>, 7056dbb5987Sopenharmony_ci pub(crate) resp_sender: BoundedSender<RespMessage>, 7066dbb5987Sopenharmony_ci pub(crate) io_shutdown: Arc<AtomicBool>, 7076dbb5987Sopenharmony_ci pub(crate) _mark: PhantomData<S>, 7086dbb5987Sopenharmony_ci } 7096dbb5987Sopenharmony_ci 7106dbb5987Sopenharmony_ci pub(crate) struct RequestWrapper { 7116dbb5987Sopenharmony_ci pub(crate) header: Frame, 7126dbb5987Sopenharmony_ci pub(crate) data: BodyDataRef, 7136dbb5987Sopenharmony_ci } 7146dbb5987Sopenharmony_ci 7156dbb5987Sopenharmony_ci #[derive(Debug, Clone)] 7166dbb5987Sopenharmony_ci pub(crate) enum DispatchErrorKind { 7176dbb5987Sopenharmony_ci H3(H3Error), 7186dbb5987Sopenharmony_ci Io(std::io::ErrorKind), 7196dbb5987Sopenharmony_ci Quic(quiche::Error), 7206dbb5987Sopenharmony_ci ChannelClosed, 7216dbb5987Sopenharmony_ci StreamFinished, 7226dbb5987Sopenharmony_ci // todo: retry? 7236dbb5987Sopenharmony_ci GoawayReceived, 7246dbb5987Sopenharmony_ci Disconnect, 7256dbb5987Sopenharmony_ci } 7266dbb5987Sopenharmony_ci 7276dbb5987Sopenharmony_ci pub(crate) enum RespMessage { 7286dbb5987Sopenharmony_ci Output(Frame), 7296dbb5987Sopenharmony_ci OutputExit(DispatchErrorKind), 7306dbb5987Sopenharmony_ci } 7316dbb5987Sopenharmony_ci 7326dbb5987Sopenharmony_ci pub(crate) struct ReqMessage { 7336dbb5987Sopenharmony_ci pub(crate) request: RequestWrapper, 7346dbb5987Sopenharmony_ci pub(crate) frame_tx: BoundedSender<RespMessage>, 7356dbb5987Sopenharmony_ci } 7366dbb5987Sopenharmony_ci 7376dbb5987Sopenharmony_ci impl<S> Http3Dispatcher<S> 7386dbb5987Sopenharmony_ci where 7396dbb5987Sopenharmony_ci S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static, 7406dbb5987Sopenharmony_ci { 7416dbb5987Sopenharmony_ci pub(crate) fn new(config: H3Config, io: S, quic_connection: QuicConn) -> Self { 7426dbb5987Sopenharmony_ci let (req_tx, req_rx) = unbounded_channel(); 7436dbb5987Sopenharmony_ci let (io_manager_tx, io_manager_rx) = unbounded_channel(); 7446dbb5987Sopenharmony_ci let (stream_manager_tx, stream_manager_rx) = unbounded_channel(); 7456dbb5987Sopenharmony_ci let mut handles = Vec::with_capacity(2); 7466dbb5987Sopenharmony_ci let conn = Arc::new(Mutex::new(quic_connection)); 7476dbb5987Sopenharmony_ci let io_shutdown = Arc::new(AtomicBool::new(false)); 7486dbb5987Sopenharmony_ci let io_goaway = Arc::new(AtomicBool::new(false)); 7496dbb5987Sopenharmony_ci let mut stream_manager = StreamManager::new( 7506dbb5987Sopenharmony_ci conn.clone(), 7516dbb5987Sopenharmony_ci io_manager_tx, 7526dbb5987Sopenharmony_ci stream_manager_rx, 7536dbb5987Sopenharmony_ci req_rx, 7546dbb5987Sopenharmony_ci FrameDecoder::new( 7556dbb5987Sopenharmony_ci config.qpack_blocked_streams() as usize, 7566dbb5987Sopenharmony_ci config.qpack_max_table_capacity() as usize, 7576dbb5987Sopenharmony_ci ), 7586dbb5987Sopenharmony_ci io_shutdown.clone(), 7596dbb5987Sopenharmony_ci io_goaway.clone(), 7606dbb5987Sopenharmony_ci ); 7616dbb5987Sopenharmony_ci let stream_handle = crate::runtime::spawn(async move { 7626dbb5987Sopenharmony_ci if stream_manager.init(config).is_err() { 7636dbb5987Sopenharmony_ci return; 7646dbb5987Sopenharmony_ci } 7656dbb5987Sopenharmony_ci let _ = Pin::new(&mut stream_manager).await; 7666dbb5987Sopenharmony_ci }); 7676dbb5987Sopenharmony_ci handles.push(stream_handle); 7686dbb5987Sopenharmony_ci 7696dbb5987Sopenharmony_ci let io_handle = crate::runtime::spawn(async move { 7706dbb5987Sopenharmony_ci let mut io_manager = IOManager::new(io, conn, io_manager_rx, stream_manager_tx); 7716dbb5987Sopenharmony_ci let _ = Pin::new(&mut io_manager).await; 7726dbb5987Sopenharmony_ci }); 7736dbb5987Sopenharmony_ci handles.push(io_handle); 7746dbb5987Sopenharmony_ci // read_rx gets readable stream ids and writable client channels, then read 7756dbb5987Sopenharmony_ci // stream and send to the corresponding channel 7766dbb5987Sopenharmony_ci Self { 7776dbb5987Sopenharmony_ci req_tx, 7786dbb5987Sopenharmony_ci handles, 7796dbb5987Sopenharmony_ci _mark: PhantomData, 7806dbb5987Sopenharmony_ci io_shutdown, 7816dbb5987Sopenharmony_ci io_goaway, 7826dbb5987Sopenharmony_ci } 7836dbb5987Sopenharmony_ci } 7846dbb5987Sopenharmony_ci } 7856dbb5987Sopenharmony_ci 7866dbb5987Sopenharmony_ci impl<S> Http3Conn<S> { 7876dbb5987Sopenharmony_ci pub(crate) fn new( 7886dbb5987Sopenharmony_ci sender: UnboundedSender<ReqMessage>, 7896dbb5987Sopenharmony_ci io_shutdown: Arc<AtomicBool>, 7906dbb5987Sopenharmony_ci ) -> Self { 7916dbb5987Sopenharmony_ci const CHANNEL_SIZE: usize = 3; 7926dbb5987Sopenharmony_ci let (resp_sender, resp_receiver) = bounded_channel(CHANNEL_SIZE); 7936dbb5987Sopenharmony_ci Self { 7946dbb5987Sopenharmony_ci sender, 7956dbb5987Sopenharmony_ci resp_sender, 7966dbb5987Sopenharmony_ci resp_receiver, 7976dbb5987Sopenharmony_ci _mark: PhantomData, 7986dbb5987Sopenharmony_ci io_shutdown, 7996dbb5987Sopenharmony_ci } 8006dbb5987Sopenharmony_ci } 8016dbb5987Sopenharmony_ci 8026dbb5987Sopenharmony_ci pub(crate) fn send_frame_to_reader( 8036dbb5987Sopenharmony_ci &mut self, 8046dbb5987Sopenharmony_ci request: RequestWrapper, 8056dbb5987Sopenharmony_ci ) -> Result<(), HttpClientError> { 8066dbb5987Sopenharmony_ci self.sender 8076dbb5987Sopenharmony_ci .send(ReqMessage { 8086dbb5987Sopenharmony_ci request, 8096dbb5987Sopenharmony_ci frame_tx: self.resp_sender.clone(), 8106dbb5987Sopenharmony_ci }) 8116dbb5987Sopenharmony_ci .map_err(|_| { 8126dbb5987Sopenharmony_ci HttpClientError::from_str(ErrorKind::Request, "Request Sender Closed !") 8136dbb5987Sopenharmony_ci }) 8146dbb5987Sopenharmony_ci } 8156dbb5987Sopenharmony_ci 8166dbb5987Sopenharmony_ci pub(crate) async fn recv_resp(&mut self) -> Result<Frame, HttpClientError> { 8176dbb5987Sopenharmony_ci #[cfg(feature = "tokio_base")] 8186dbb5987Sopenharmony_ci match self.resp_receiver.recv().await { 8196dbb5987Sopenharmony_ci None => err_from_msg!(Request, "Response Receiver Closed !"), 8206dbb5987Sopenharmony_ci Some(message) => match message { 8216dbb5987Sopenharmony_ci RespMessage::Output(frame) => Ok(frame), 8226dbb5987Sopenharmony_ci RespMessage::OutputExit(e) => Err(dispatch_client_error(e)), 8236dbb5987Sopenharmony_ci }, 8246dbb5987Sopenharmony_ci } 8256dbb5987Sopenharmony_ci 8266dbb5987Sopenharmony_ci #[cfg(feature = "ylong_base")] 8276dbb5987Sopenharmony_ci match self.resp_receiver.recv().await { 8286dbb5987Sopenharmony_ci Err(err) => Err(HttpClientError::from_error(ErrorKind::Request, err)), 8296dbb5987Sopenharmony_ci Ok(message) => match message { 8306dbb5987Sopenharmony_ci RespMessage::Output(frame) => Ok(frame), 8316dbb5987Sopenharmony_ci RespMessage::OutputExit(e) => Err(dispatch_client_error(e)), 8326dbb5987Sopenharmony_ci }, 8336dbb5987Sopenharmony_ci } 8346dbb5987Sopenharmony_ci } 8356dbb5987Sopenharmony_ci } 8366dbb5987Sopenharmony_ci 8376dbb5987Sopenharmony_ci impl<S> ConnDispatcher<S> 8386dbb5987Sopenharmony_ci where 8396dbb5987Sopenharmony_ci S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static, 8406dbb5987Sopenharmony_ci { 8416dbb5987Sopenharmony_ci pub(crate) fn http3(config: H3Config, io: S, quic_connection: QuicConn) -> Self { 8426dbb5987Sopenharmony_ci Self::Http3(Http3Dispatcher::new(config, io, quic_connection)) 8436dbb5987Sopenharmony_ci } 8446dbb5987Sopenharmony_ci } 8456dbb5987Sopenharmony_ci 8466dbb5987Sopenharmony_ci impl<S> Dispatcher for Http3Dispatcher<S> { 8476dbb5987Sopenharmony_ci type Handle = Http3Conn<S>; 8486dbb5987Sopenharmony_ci 8496dbb5987Sopenharmony_ci fn dispatch(&self) -> Option<Self::Handle> { 8506dbb5987Sopenharmony_ci let sender = self.req_tx.clone(); 8516dbb5987Sopenharmony_ci Some(Http3Conn::new(sender, self.io_shutdown.clone())) 8526dbb5987Sopenharmony_ci } 8536dbb5987Sopenharmony_ci 8546dbb5987Sopenharmony_ci fn is_shutdown(&self) -> bool { 8556dbb5987Sopenharmony_ci self.io_shutdown.load(Ordering::Relaxed) 8566dbb5987Sopenharmony_ci } 8576dbb5987Sopenharmony_ci 8586dbb5987Sopenharmony_ci fn is_goaway(&self) -> bool { 8596dbb5987Sopenharmony_ci self.io_goaway.load(Ordering::Relaxed) 8606dbb5987Sopenharmony_ci } 8616dbb5987Sopenharmony_ci } 8626dbb5987Sopenharmony_ci 8636dbb5987Sopenharmony_ci impl<S> Drop for Http3Dispatcher<S> { 8646dbb5987Sopenharmony_ci fn drop(&mut self) { 8656dbb5987Sopenharmony_ci for handle in &self.handles { 8666dbb5987Sopenharmony_ci #[cfg(feature = "tokio_base")] 8676dbb5987Sopenharmony_ci handle.abort(); 8686dbb5987Sopenharmony_ci #[cfg(feature = "ylong_base")] 8696dbb5987Sopenharmony_ci handle.cancel(); 8706dbb5987Sopenharmony_ci } 8716dbb5987Sopenharmony_ci } 8726dbb5987Sopenharmony_ci } 8736dbb5987Sopenharmony_ci 8746dbb5987Sopenharmony_ci impl From<std::io::Error> for DispatchErrorKind { 8756dbb5987Sopenharmony_ci fn from(value: std::io::Error) -> Self { 8766dbb5987Sopenharmony_ci DispatchErrorKind::Io(value.kind()) 8776dbb5987Sopenharmony_ci } 8786dbb5987Sopenharmony_ci } 8796dbb5987Sopenharmony_ci 8806dbb5987Sopenharmony_ci impl From<H3Error> for DispatchErrorKind { 8816dbb5987Sopenharmony_ci fn from(err: H3Error) -> Self { 8826dbb5987Sopenharmony_ci DispatchErrorKind::H3(err) 8836dbb5987Sopenharmony_ci } 8846dbb5987Sopenharmony_ci } 8856dbb5987Sopenharmony_ci 8866dbb5987Sopenharmony_ci impl From<quiche::Error> for DispatchErrorKind { 8876dbb5987Sopenharmony_ci fn from(value: quiche::Error) -> Self { 8886dbb5987Sopenharmony_ci DispatchErrorKind::Quic(value) 8896dbb5987Sopenharmony_ci } 8906dbb5987Sopenharmony_ci } 8916dbb5987Sopenharmony_ci 8926dbb5987Sopenharmony_ci pub(crate) fn dispatch_client_error(dispatch_error: DispatchErrorKind) -> HttpClientError { 8936dbb5987Sopenharmony_ci match dispatch_error { 8946dbb5987Sopenharmony_ci DispatchErrorKind::H3(e) => HttpClientError::from_error(Request, HttpError::from(e)), 8956dbb5987Sopenharmony_ci DispatchErrorKind::Io(e) => { 8966dbb5987Sopenharmony_ci HttpClientError::from_io_error(Request, std::io::Error::from(e)) 8976dbb5987Sopenharmony_ci } 8986dbb5987Sopenharmony_ci DispatchErrorKind::ChannelClosed => { 8996dbb5987Sopenharmony_ci HttpClientError::from_str(Request, "Coroutine channel closed.") 9006dbb5987Sopenharmony_ci } 9016dbb5987Sopenharmony_ci DispatchErrorKind::Quic(e) => HttpClientError::from_error(Request, e), 9026dbb5987Sopenharmony_ci DispatchErrorKind::GoawayReceived => { 9036dbb5987Sopenharmony_ci HttpClientError::from_str(Request, "received remote goaway.") 9046dbb5987Sopenharmony_ci } 9056dbb5987Sopenharmony_ci DispatchErrorKind::StreamFinished => { 9066dbb5987Sopenharmony_ci HttpClientError::from_str(Request, "stream finished.") 9076dbb5987Sopenharmony_ci } 9086dbb5987Sopenharmony_ci DispatchErrorKind::Disconnect => { 9096dbb5987Sopenharmony_ci HttpClientError::from_str(Request, "remote peer closed.") 9106dbb5987Sopenharmony_ci } 9116dbb5987Sopenharmony_ci } 9126dbb5987Sopenharmony_ci } 9136dbb5987Sopenharmony_ci} 9146dbb5987Sopenharmony_ci 9156dbb5987Sopenharmony_ci#[cfg(test)] 9166dbb5987Sopenharmony_cimod ut_dispatch { 9176dbb5987Sopenharmony_ci use crate::dispatcher::{ConnDispatcher, Dispatcher}; 9186dbb5987Sopenharmony_ci 9196dbb5987Sopenharmony_ci /// UT test cases for `ConnDispatcher::is_shutdown`. 9206dbb5987Sopenharmony_ci /// 9216dbb5987Sopenharmony_ci /// # Brief 9226dbb5987Sopenharmony_ci /// 1. Creates a `ConnDispatcher`. 9236dbb5987Sopenharmony_ci /// 2. Calls `ConnDispatcher::is_shutdown` to get the result. 9246dbb5987Sopenharmony_ci /// 3. Calls `ConnDispatcher::dispatch` to get the result. 9256dbb5987Sopenharmony_ci /// 4. Checks if the result is false. 9266dbb5987Sopenharmony_ci #[test] 9276dbb5987Sopenharmony_ci fn ut_is_shutdown() { 9286dbb5987Sopenharmony_ci let conn = ConnDispatcher::http1(b"Data"); 9296dbb5987Sopenharmony_ci let res = conn.is_shutdown(); 9306dbb5987Sopenharmony_ci assert!(!res); 9316dbb5987Sopenharmony_ci let res = conn.dispatch(); 9326dbb5987Sopenharmony_ci assert!(res.is_some()); 9336dbb5987Sopenharmony_ci } 9346dbb5987Sopenharmony_ci} 935