16dbb5987Sopenharmony_ci// Copyright (c) 2023 Huawei Device Co., Ltd.
26dbb5987Sopenharmony_ci// Licensed under the Apache License, Version 2.0 (the "License");
36dbb5987Sopenharmony_ci// you may not use this file except in compliance with the License.
46dbb5987Sopenharmony_ci// You may obtain a copy of the License at
56dbb5987Sopenharmony_ci//
66dbb5987Sopenharmony_ci//     http://www.apache.org/licenses/LICENSE-2.0
76dbb5987Sopenharmony_ci//
86dbb5987Sopenharmony_ci// Unless required by applicable law or agreed to in writing, software
96dbb5987Sopenharmony_ci// distributed under the License is distributed on an "AS IS" BASIS,
106dbb5987Sopenharmony_ci// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
116dbb5987Sopenharmony_ci// See the License for the specific language governing permissions and
126dbb5987Sopenharmony_ci// limitations under the License.
136dbb5987Sopenharmony_ci
146dbb5987Sopenharmony_cipub(crate) trait Dispatcher {
156dbb5987Sopenharmony_ci    type Handle;
166dbb5987Sopenharmony_ci
176dbb5987Sopenharmony_ci    fn dispatch(&self) -> Option<Self::Handle>;
186dbb5987Sopenharmony_ci
196dbb5987Sopenharmony_ci    fn is_shutdown(&self) -> bool;
206dbb5987Sopenharmony_ci
216dbb5987Sopenharmony_ci    #[allow(dead_code)]
226dbb5987Sopenharmony_ci    fn is_goaway(&self) -> bool;
236dbb5987Sopenharmony_ci}
246dbb5987Sopenharmony_ci
256dbb5987Sopenharmony_cipub(crate) enum ConnDispatcher<S> {
266dbb5987Sopenharmony_ci    #[cfg(feature = "http1_1")]
276dbb5987Sopenharmony_ci    Http1(http1::Http1Dispatcher<S>),
286dbb5987Sopenharmony_ci
296dbb5987Sopenharmony_ci    #[cfg(feature = "http2")]
306dbb5987Sopenharmony_ci    Http2(http2::Http2Dispatcher<S>),
316dbb5987Sopenharmony_ci
326dbb5987Sopenharmony_ci    #[cfg(feature = "http3")]
336dbb5987Sopenharmony_ci    Http3(http3::Http3Dispatcher<S>),
346dbb5987Sopenharmony_ci}
356dbb5987Sopenharmony_ci
366dbb5987Sopenharmony_ciimpl<S> Dispatcher for ConnDispatcher<S> {
376dbb5987Sopenharmony_ci    type Handle = Conn<S>;
386dbb5987Sopenharmony_ci
396dbb5987Sopenharmony_ci    fn dispatch(&self) -> Option<Self::Handle> {
406dbb5987Sopenharmony_ci        match self {
416dbb5987Sopenharmony_ci            #[cfg(feature = "http1_1")]
426dbb5987Sopenharmony_ci            Self::Http1(h1) => h1.dispatch().map(Conn::Http1),
436dbb5987Sopenharmony_ci
446dbb5987Sopenharmony_ci            #[cfg(feature = "http2")]
456dbb5987Sopenharmony_ci            Self::Http2(h2) => h2.dispatch().map(Conn::Http2),
466dbb5987Sopenharmony_ci
476dbb5987Sopenharmony_ci            #[cfg(feature = "http3")]
486dbb5987Sopenharmony_ci            Self::Http3(h3) => h3.dispatch().map(Conn::Http3),
496dbb5987Sopenharmony_ci        }
506dbb5987Sopenharmony_ci    }
516dbb5987Sopenharmony_ci
526dbb5987Sopenharmony_ci    fn is_shutdown(&self) -> bool {
536dbb5987Sopenharmony_ci        match self {
546dbb5987Sopenharmony_ci            #[cfg(feature = "http1_1")]
556dbb5987Sopenharmony_ci            Self::Http1(h1) => h1.is_shutdown(),
566dbb5987Sopenharmony_ci
576dbb5987Sopenharmony_ci            #[cfg(feature = "http2")]
586dbb5987Sopenharmony_ci            Self::Http2(h2) => h2.is_shutdown(),
596dbb5987Sopenharmony_ci
606dbb5987Sopenharmony_ci            #[cfg(feature = "http3")]
616dbb5987Sopenharmony_ci            Self::Http3(h3) => h3.is_shutdown(),
626dbb5987Sopenharmony_ci        }
636dbb5987Sopenharmony_ci    }
646dbb5987Sopenharmony_ci
656dbb5987Sopenharmony_ci    fn is_goaway(&self) -> bool {
666dbb5987Sopenharmony_ci        match self {
676dbb5987Sopenharmony_ci            #[cfg(feature = "http1_1")]
686dbb5987Sopenharmony_ci            Self::Http1(h1) => h1.is_goaway(),
696dbb5987Sopenharmony_ci
706dbb5987Sopenharmony_ci            #[cfg(feature = "http2")]
716dbb5987Sopenharmony_ci            Self::Http2(h2) => h2.is_goaway(),
726dbb5987Sopenharmony_ci
736dbb5987Sopenharmony_ci            #[cfg(feature = "http3")]
746dbb5987Sopenharmony_ci            Self::Http3(h3) => h3.is_goaway(),
756dbb5987Sopenharmony_ci        }
766dbb5987Sopenharmony_ci    }
776dbb5987Sopenharmony_ci}
786dbb5987Sopenharmony_ci
796dbb5987Sopenharmony_cipub(crate) enum Conn<S> {
806dbb5987Sopenharmony_ci    #[cfg(feature = "http1_1")]
816dbb5987Sopenharmony_ci    Http1(http1::Http1Conn<S>),
826dbb5987Sopenharmony_ci
836dbb5987Sopenharmony_ci    #[cfg(feature = "http2")]
846dbb5987Sopenharmony_ci    Http2(http2::Http2Conn<S>),
856dbb5987Sopenharmony_ci
866dbb5987Sopenharmony_ci    #[cfg(feature = "http3")]
876dbb5987Sopenharmony_ci    Http3(http3::Http3Conn<S>),
886dbb5987Sopenharmony_ci}
896dbb5987Sopenharmony_ci
906dbb5987Sopenharmony_ci#[cfg(feature = "http1_1")]
916dbb5987Sopenharmony_cipub(crate) mod http1 {
926dbb5987Sopenharmony_ci    use std::cell::UnsafeCell;
936dbb5987Sopenharmony_ci    use std::sync::atomic::{AtomicBool, Ordering};
946dbb5987Sopenharmony_ci    use std::sync::Arc;
956dbb5987Sopenharmony_ci
966dbb5987Sopenharmony_ci    use super::{ConnDispatcher, Dispatcher};
976dbb5987Sopenharmony_ci
986dbb5987Sopenharmony_ci    impl<S> ConnDispatcher<S> {
996dbb5987Sopenharmony_ci        pub(crate) fn http1(io: S) -> Self {
1006dbb5987Sopenharmony_ci            Self::Http1(Http1Dispatcher::new(io))
1016dbb5987Sopenharmony_ci        }
1026dbb5987Sopenharmony_ci    }
1036dbb5987Sopenharmony_ci
1046dbb5987Sopenharmony_ci    /// HTTP1-based connection manager, which can dispatch connections to other
1056dbb5987Sopenharmony_ci    /// threads according to HTTP1 syntax.
1066dbb5987Sopenharmony_ci    pub(crate) struct Http1Dispatcher<S> {
1076dbb5987Sopenharmony_ci        inner: Arc<Inner<S>>,
1086dbb5987Sopenharmony_ci    }
1096dbb5987Sopenharmony_ci
1106dbb5987Sopenharmony_ci    pub(crate) struct Inner<S> {
1116dbb5987Sopenharmony_ci        pub(crate) io: UnsafeCell<S>,
1126dbb5987Sopenharmony_ci        // `occupied` indicates that the connection is occupied. Only one coroutine
1136dbb5987Sopenharmony_ci        // can get the handle at the same time. Once the handle is fetched, the flag
1146dbb5987Sopenharmony_ci        // position is true.
1156dbb5987Sopenharmony_ci        pub(crate) occupied: AtomicBool,
1166dbb5987Sopenharmony_ci        // `shutdown` indicates that the connection need to be shut down.
1176dbb5987Sopenharmony_ci        pub(crate) shutdown: AtomicBool,
1186dbb5987Sopenharmony_ci    }
1196dbb5987Sopenharmony_ci
1206dbb5987Sopenharmony_ci    unsafe impl<S> Sync for Inner<S> {}
1216dbb5987Sopenharmony_ci
1226dbb5987Sopenharmony_ci    impl<S> Http1Dispatcher<S> {
1236dbb5987Sopenharmony_ci        pub(crate) fn new(io: S) -> Self {
1246dbb5987Sopenharmony_ci            Self {
1256dbb5987Sopenharmony_ci                inner: Arc::new(Inner {
1266dbb5987Sopenharmony_ci                    io: UnsafeCell::new(io),
1276dbb5987Sopenharmony_ci                    occupied: AtomicBool::new(false),
1286dbb5987Sopenharmony_ci                    shutdown: AtomicBool::new(false),
1296dbb5987Sopenharmony_ci                }),
1306dbb5987Sopenharmony_ci            }
1316dbb5987Sopenharmony_ci        }
1326dbb5987Sopenharmony_ci    }
1336dbb5987Sopenharmony_ci
1346dbb5987Sopenharmony_ci    impl<S> Dispatcher for Http1Dispatcher<S> {
1356dbb5987Sopenharmony_ci        type Handle = Http1Conn<S>;
1366dbb5987Sopenharmony_ci
1376dbb5987Sopenharmony_ci        fn dispatch(&self) -> Option<Self::Handle> {
1386dbb5987Sopenharmony_ci            self.inner
1396dbb5987Sopenharmony_ci                .occupied
1406dbb5987Sopenharmony_ci                .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
1416dbb5987Sopenharmony_ci                .ok()
1426dbb5987Sopenharmony_ci                .map(|_| Http1Conn {
1436dbb5987Sopenharmony_ci                    inner: self.inner.clone(),
1446dbb5987Sopenharmony_ci                })
1456dbb5987Sopenharmony_ci        }
1466dbb5987Sopenharmony_ci
1476dbb5987Sopenharmony_ci        fn is_shutdown(&self) -> bool {
1486dbb5987Sopenharmony_ci            self.inner.shutdown.load(Ordering::Relaxed)
1496dbb5987Sopenharmony_ci        }
1506dbb5987Sopenharmony_ci
1516dbb5987Sopenharmony_ci        fn is_goaway(&self) -> bool {
1526dbb5987Sopenharmony_ci            false
1536dbb5987Sopenharmony_ci        }
1546dbb5987Sopenharmony_ci    }
1556dbb5987Sopenharmony_ci
1566dbb5987Sopenharmony_ci    /// Handle returned to other threads for I/O operations.
1576dbb5987Sopenharmony_ci    pub(crate) struct Http1Conn<S> {
1586dbb5987Sopenharmony_ci        pub(crate) inner: Arc<Inner<S>>,
1596dbb5987Sopenharmony_ci    }
1606dbb5987Sopenharmony_ci
1616dbb5987Sopenharmony_ci    impl<S> Http1Conn<S> {
1626dbb5987Sopenharmony_ci        pub(crate) fn raw_mut(&mut self) -> &mut S {
1636dbb5987Sopenharmony_ci            // SAFETY: In the case of `HTTP1`, only one coroutine gets the handle
1646dbb5987Sopenharmony_ci            // at the same time.
1656dbb5987Sopenharmony_ci            unsafe { &mut *self.inner.io.get() }
1666dbb5987Sopenharmony_ci        }
1676dbb5987Sopenharmony_ci
1686dbb5987Sopenharmony_ci        pub(crate) fn shutdown(&self) {
1696dbb5987Sopenharmony_ci            self.inner.shutdown.store(true, Ordering::Release);
1706dbb5987Sopenharmony_ci        }
1716dbb5987Sopenharmony_ci    }
1726dbb5987Sopenharmony_ci
1736dbb5987Sopenharmony_ci    impl<S> Drop for Http1Conn<S> {
1746dbb5987Sopenharmony_ci        fn drop(&mut self) {
1756dbb5987Sopenharmony_ci            self.inner.occupied.store(false, Ordering::Release)
1766dbb5987Sopenharmony_ci        }
1776dbb5987Sopenharmony_ci    }
1786dbb5987Sopenharmony_ci}
1796dbb5987Sopenharmony_ci
1806dbb5987Sopenharmony_ci#[cfg(feature = "http2")]
1816dbb5987Sopenharmony_cipub(crate) mod http2 {
1826dbb5987Sopenharmony_ci    use std::collections::HashMap;
1836dbb5987Sopenharmony_ci    use std::future::Future;
1846dbb5987Sopenharmony_ci    use std::marker::PhantomData;
1856dbb5987Sopenharmony_ci    use std::pin::Pin;
1866dbb5987Sopenharmony_ci    use std::sync::atomic::{AtomicBool, Ordering};
1876dbb5987Sopenharmony_ci    use std::sync::{Arc, Mutex};
1886dbb5987Sopenharmony_ci    use std::task::{Context, Poll};
1896dbb5987Sopenharmony_ci
1906dbb5987Sopenharmony_ci    use ylong_http::error::HttpError;
1916dbb5987Sopenharmony_ci    use ylong_http::h2::{
1926dbb5987Sopenharmony_ci        ErrorCode, Frame, FrameDecoder, FrameEncoder, FrameFlags, Goaway, H2Error, Payload,
1936dbb5987Sopenharmony_ci        RstStream, Settings, SettingsBuilder, StreamId,
1946dbb5987Sopenharmony_ci    };
1956dbb5987Sopenharmony_ci
1966dbb5987Sopenharmony_ci    use crate::runtime::{
1976dbb5987Sopenharmony_ci        bounded_channel, unbounded_channel, AsyncRead, AsyncWrite, AsyncWriteExt, BoundedReceiver,
1986dbb5987Sopenharmony_ci        BoundedSender, SendError, UnboundedReceiver, UnboundedSender, WriteHalf,
1996dbb5987Sopenharmony_ci    };
2006dbb5987Sopenharmony_ci    use crate::util::config::H2Config;
2016dbb5987Sopenharmony_ci    use crate::util::dispatcher::{ConnDispatcher, Dispatcher};
2026dbb5987Sopenharmony_ci    use crate::util::h2::{
2036dbb5987Sopenharmony_ci        ConnManager, FlowControl, H2StreamState, RecvData, RequestWrapper, SendData,
2046dbb5987Sopenharmony_ci        StreamEndState, Streams,
2056dbb5987Sopenharmony_ci    };
2066dbb5987Sopenharmony_ci    use crate::ErrorKind::Request;
2076dbb5987Sopenharmony_ci    use crate::{ErrorKind, HttpClientError};
2086dbb5987Sopenharmony_ci    const DEFAULT_MAX_FRAME_SIZE: usize = 2 << 13;
2096dbb5987Sopenharmony_ci    const DEFAULT_WINDOW_SIZE: u32 = 65535;
2106dbb5987Sopenharmony_ci
2116dbb5987Sopenharmony_ci    pub(crate) type ManagerSendFut =
2126dbb5987Sopenharmony_ci        Pin<Box<dyn Future<Output = Result<(), SendError<RespMessage>>> + Send + Sync>>;
2136dbb5987Sopenharmony_ci
2146dbb5987Sopenharmony_ci    pub(crate) enum RespMessage {
2156dbb5987Sopenharmony_ci        Output(Frame),
2166dbb5987Sopenharmony_ci        OutputExit(DispatchErrorKind),
2176dbb5987Sopenharmony_ci    }
2186dbb5987Sopenharmony_ci
2196dbb5987Sopenharmony_ci    pub(crate) enum OutputMessage {
2206dbb5987Sopenharmony_ci        Output(Frame),
2216dbb5987Sopenharmony_ci        OutputExit(DispatchErrorKind),
2226dbb5987Sopenharmony_ci    }
2236dbb5987Sopenharmony_ci
2246dbb5987Sopenharmony_ci    pub(crate) struct ReqMessage {
2256dbb5987Sopenharmony_ci        pub(crate) sender: BoundedSender<RespMessage>,
2266dbb5987Sopenharmony_ci        pub(crate) request: RequestWrapper,
2276dbb5987Sopenharmony_ci    }
2286dbb5987Sopenharmony_ci
2296dbb5987Sopenharmony_ci    #[derive(Debug, Eq, PartialEq, Copy, Clone)]
2306dbb5987Sopenharmony_ci    pub(crate) enum DispatchErrorKind {
2316dbb5987Sopenharmony_ci        H2(H2Error),
2326dbb5987Sopenharmony_ci        Io(std::io::ErrorKind),
2336dbb5987Sopenharmony_ci        ChannelClosed,
2346dbb5987Sopenharmony_ci        Disconnect,
2356dbb5987Sopenharmony_ci    }
2366dbb5987Sopenharmony_ci
2376dbb5987Sopenharmony_ci    // HTTP2-based connection manager, which can dispatch connections to other
2386dbb5987Sopenharmony_ci    // threads according to HTTP2 syntax.
2396dbb5987Sopenharmony_ci    pub(crate) struct Http2Dispatcher<S> {
2406dbb5987Sopenharmony_ci        pub(crate) allowed_cache: usize,
2416dbb5987Sopenharmony_ci        pub(crate) sender: UnboundedSender<ReqMessage>,
2426dbb5987Sopenharmony_ci        pub(crate) io_shutdown: Arc<AtomicBool>,
2436dbb5987Sopenharmony_ci        pub(crate) handles: Vec<crate::runtime::JoinHandle<()>>,
2446dbb5987Sopenharmony_ci        pub(crate) _mark: PhantomData<S>,
2456dbb5987Sopenharmony_ci    }
2466dbb5987Sopenharmony_ci
2476dbb5987Sopenharmony_ci    pub(crate) struct Http2Conn<S> {
2486dbb5987Sopenharmony_ci        pub(crate) allow_cached_frames: usize,
2496dbb5987Sopenharmony_ci        // Sends frame to StreamController
2506dbb5987Sopenharmony_ci        pub(crate) sender: UnboundedSender<ReqMessage>,
2516dbb5987Sopenharmony_ci        pub(crate) receiver: RespReceiver,
2526dbb5987Sopenharmony_ci        pub(crate) io_shutdown: Arc<AtomicBool>,
2536dbb5987Sopenharmony_ci        pub(crate) _mark: PhantomData<S>,
2546dbb5987Sopenharmony_ci    }
2556dbb5987Sopenharmony_ci
2566dbb5987Sopenharmony_ci    pub(crate) struct StreamController {
2576dbb5987Sopenharmony_ci        // The connection close flag organizes new stream commits to the current connection when
2586dbb5987Sopenharmony_ci        // closed.
2596dbb5987Sopenharmony_ci        pub(crate) io_shutdown: Arc<AtomicBool>,
2606dbb5987Sopenharmony_ci        // The senders of all connected stream channels of response.
2616dbb5987Sopenharmony_ci        pub(crate) senders: HashMap<StreamId, BoundedSender<RespMessage>>,
2626dbb5987Sopenharmony_ci        pub(crate) curr_message: HashMap<StreamId, ManagerSendFut>,
2636dbb5987Sopenharmony_ci        // Stream information on the connection.
2646dbb5987Sopenharmony_ci        pub(crate) streams: Streams,
2656dbb5987Sopenharmony_ci        // Received GO_AWAY frame.
2666dbb5987Sopenharmony_ci        pub(crate) recved_go_away: Option<StreamId>,
2676dbb5987Sopenharmony_ci        // The last GO_AWAY frame sent by the client.
2686dbb5987Sopenharmony_ci        pub(crate) go_away_sync: GoAwaySync,
2696dbb5987Sopenharmony_ci    }
2706dbb5987Sopenharmony_ci
2716dbb5987Sopenharmony_ci    #[derive(Default)]
2726dbb5987Sopenharmony_ci    pub(crate) struct GoAwaySync {
2736dbb5987Sopenharmony_ci        pub(crate) going_away: Option<Goaway>,
2746dbb5987Sopenharmony_ci    }
2756dbb5987Sopenharmony_ci
2766dbb5987Sopenharmony_ci    #[derive(Default)]
2776dbb5987Sopenharmony_ci    pub(crate) struct SettingsSync {
2786dbb5987Sopenharmony_ci        pub(crate) settings: SettingsState,
2796dbb5987Sopenharmony_ci    }
2806dbb5987Sopenharmony_ci
2816dbb5987Sopenharmony_ci    #[derive(Default, Clone)]
2826dbb5987Sopenharmony_ci    pub(crate) enum SettingsState {
2836dbb5987Sopenharmony_ci        Acknowledging(Settings),
2846dbb5987Sopenharmony_ci        #[default]
2856dbb5987Sopenharmony_ci        Synced,
2866dbb5987Sopenharmony_ci    }
2876dbb5987Sopenharmony_ci
2886dbb5987Sopenharmony_ci    #[derive(Default)]
2896dbb5987Sopenharmony_ci    pub(crate) struct RespReceiver {
2906dbb5987Sopenharmony_ci        receiver: Option<BoundedReceiver<RespMessage>>,
2916dbb5987Sopenharmony_ci    }
2926dbb5987Sopenharmony_ci
2936dbb5987Sopenharmony_ci    impl<S> ConnDispatcher<S>
2946dbb5987Sopenharmony_ci    where
2956dbb5987Sopenharmony_ci        S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,
2966dbb5987Sopenharmony_ci    {
2976dbb5987Sopenharmony_ci        pub(crate) fn http2(config: H2Config, io: S) -> Self {
2986dbb5987Sopenharmony_ci            Self::Http2(Http2Dispatcher::new(config, io))
2996dbb5987Sopenharmony_ci        }
3006dbb5987Sopenharmony_ci    }
3016dbb5987Sopenharmony_ci
3026dbb5987Sopenharmony_ci    impl<S> Http2Dispatcher<S>
3036dbb5987Sopenharmony_ci    where
3046dbb5987Sopenharmony_ci        S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,
3056dbb5987Sopenharmony_ci    {
3066dbb5987Sopenharmony_ci        pub(crate) fn new(config: H2Config, io: S) -> Self {
3076dbb5987Sopenharmony_ci            let settings = create_initial_settings(&config);
3086dbb5987Sopenharmony_ci
3096dbb5987Sopenharmony_ci            let mut flow = FlowControl::new(DEFAULT_WINDOW_SIZE, DEFAULT_WINDOW_SIZE);
3106dbb5987Sopenharmony_ci            flow.setup_recv_window(config.conn_window_size());
3116dbb5987Sopenharmony_ci
3126dbb5987Sopenharmony_ci            let streams = Streams::new(config.stream_window_size(), DEFAULT_WINDOW_SIZE, flow);
3136dbb5987Sopenharmony_ci            let shutdown_flag = Arc::new(AtomicBool::new(false));
3146dbb5987Sopenharmony_ci            let controller = StreamController::new(streams, shutdown_flag.clone());
3156dbb5987Sopenharmony_ci
3166dbb5987Sopenharmony_ci            let (input_tx, input_rx) = unbounded_channel();
3176dbb5987Sopenharmony_ci            let (req_tx, req_rx) = unbounded_channel();
3186dbb5987Sopenharmony_ci
3196dbb5987Sopenharmony_ci            // Error is not possible, so it is not handled for the time
3206dbb5987Sopenharmony_ci            // being.
3216dbb5987Sopenharmony_ci            let mut handles = Vec::with_capacity(3);
3226dbb5987Sopenharmony_ci            if input_tx.send(settings).is_ok() {
3236dbb5987Sopenharmony_ci                Self::launch(
3246dbb5987Sopenharmony_ci                    config.allowed_cache_frame_size(),
3256dbb5987Sopenharmony_ci                    config.use_huffman_coding(),
3266dbb5987Sopenharmony_ci                    controller,
3276dbb5987Sopenharmony_ci                    (input_tx, input_rx),
3286dbb5987Sopenharmony_ci                    req_rx,
3296dbb5987Sopenharmony_ci                    &mut handles,
3306dbb5987Sopenharmony_ci                    io,
3316dbb5987Sopenharmony_ci                );
3326dbb5987Sopenharmony_ci            }
3336dbb5987Sopenharmony_ci            Self {
3346dbb5987Sopenharmony_ci                allowed_cache: config.allowed_cache_frame_size(),
3356dbb5987Sopenharmony_ci                sender: req_tx,
3366dbb5987Sopenharmony_ci                io_shutdown: shutdown_flag,
3376dbb5987Sopenharmony_ci                handles,
3386dbb5987Sopenharmony_ci                _mark: PhantomData,
3396dbb5987Sopenharmony_ci            }
3406dbb5987Sopenharmony_ci        }
3416dbb5987Sopenharmony_ci
3426dbb5987Sopenharmony_ci        fn launch(
3436dbb5987Sopenharmony_ci            allow_num: usize,
3446dbb5987Sopenharmony_ci            use_huffman: bool,
3456dbb5987Sopenharmony_ci            controller: StreamController,
3466dbb5987Sopenharmony_ci            input_channel: (UnboundedSender<Frame>, UnboundedReceiver<Frame>),
3476dbb5987Sopenharmony_ci            req_rx: UnboundedReceiver<ReqMessage>,
3486dbb5987Sopenharmony_ci            handles: &mut Vec<crate::runtime::JoinHandle<()>>,
3496dbb5987Sopenharmony_ci            io: S,
3506dbb5987Sopenharmony_ci        ) {
3516dbb5987Sopenharmony_ci            let (resp_tx, resp_rx) = bounded_channel(allow_num);
3526dbb5987Sopenharmony_ci            let (read, write) = crate::runtime::split(io);
3536dbb5987Sopenharmony_ci            let settings_sync = Arc::new(Mutex::new(SettingsSync::default()));
3546dbb5987Sopenharmony_ci            let send_settings_sync = settings_sync.clone();
3556dbb5987Sopenharmony_ci            let send = crate::runtime::spawn(async move {
3566dbb5987Sopenharmony_ci                let mut writer = write;
3576dbb5987Sopenharmony_ci                if async_send_preface(&mut writer).await.is_ok() {
3586dbb5987Sopenharmony_ci                    let encoder = FrameEncoder::new(DEFAULT_MAX_FRAME_SIZE, use_huffman);
3596dbb5987Sopenharmony_ci                    let mut send =
3606dbb5987Sopenharmony_ci                        SendData::new(encoder, send_settings_sync, writer, input_channel.1);
3616dbb5987Sopenharmony_ci                    let _ = Pin::new(&mut send).await;
3626dbb5987Sopenharmony_ci                }
3636dbb5987Sopenharmony_ci            });
3646dbb5987Sopenharmony_ci            handles.push(send);
3656dbb5987Sopenharmony_ci
3666dbb5987Sopenharmony_ci            let recv_settings_sync = settings_sync.clone();
3676dbb5987Sopenharmony_ci            let recv = crate::runtime::spawn(async move {
3686dbb5987Sopenharmony_ci                let decoder = FrameDecoder::new();
3696dbb5987Sopenharmony_ci                let mut recv = RecvData::new(decoder, recv_settings_sync, read, resp_tx);
3706dbb5987Sopenharmony_ci                let _ = Pin::new(&mut recv).await;
3716dbb5987Sopenharmony_ci            });
3726dbb5987Sopenharmony_ci            handles.push(recv);
3736dbb5987Sopenharmony_ci
3746dbb5987Sopenharmony_ci            let manager = crate::runtime::spawn(async move {
3756dbb5987Sopenharmony_ci                let mut conn_manager =
3766dbb5987Sopenharmony_ci                    ConnManager::new(settings_sync, input_channel.0, resp_rx, req_rx, controller);
3776dbb5987Sopenharmony_ci                let _ = Pin::new(&mut conn_manager).await;
3786dbb5987Sopenharmony_ci            });
3796dbb5987Sopenharmony_ci            handles.push(manager);
3806dbb5987Sopenharmony_ci        }
3816dbb5987Sopenharmony_ci    }
3826dbb5987Sopenharmony_ci
3836dbb5987Sopenharmony_ci    impl<S> Dispatcher for Http2Dispatcher<S> {
3846dbb5987Sopenharmony_ci        type Handle = Http2Conn<S>;
3856dbb5987Sopenharmony_ci
3866dbb5987Sopenharmony_ci        fn dispatch(&self) -> Option<Self::Handle> {
3876dbb5987Sopenharmony_ci            let sender = self.sender.clone();
3886dbb5987Sopenharmony_ci            let handle = Http2Conn::new(self.allowed_cache, self.io_shutdown.clone(), sender);
3896dbb5987Sopenharmony_ci            Some(handle)
3906dbb5987Sopenharmony_ci        }
3916dbb5987Sopenharmony_ci
3926dbb5987Sopenharmony_ci        fn is_shutdown(&self) -> bool {
3936dbb5987Sopenharmony_ci            self.io_shutdown.load(Ordering::Relaxed)
3946dbb5987Sopenharmony_ci        }
3956dbb5987Sopenharmony_ci
3966dbb5987Sopenharmony_ci        fn is_goaway(&self) -> bool {
3976dbb5987Sopenharmony_ci            // todo: goaway and shutdown
3986dbb5987Sopenharmony_ci            false
3996dbb5987Sopenharmony_ci        }
4006dbb5987Sopenharmony_ci    }
4016dbb5987Sopenharmony_ci
4026dbb5987Sopenharmony_ci    impl<S> Drop for Http2Dispatcher<S> {
4036dbb5987Sopenharmony_ci        fn drop(&mut self) {
4046dbb5987Sopenharmony_ci            for handle in &self.handles {
4056dbb5987Sopenharmony_ci                #[cfg(feature = "ylong_base")]
4066dbb5987Sopenharmony_ci                handle.cancel();
4076dbb5987Sopenharmony_ci                #[cfg(feature = "tokio_base")]
4086dbb5987Sopenharmony_ci                handle.abort();
4096dbb5987Sopenharmony_ci            }
4106dbb5987Sopenharmony_ci        }
4116dbb5987Sopenharmony_ci    }
4126dbb5987Sopenharmony_ci
4136dbb5987Sopenharmony_ci    impl<S> Http2Conn<S> {
4146dbb5987Sopenharmony_ci        pub(crate) fn new(
4156dbb5987Sopenharmony_ci            allow_cached_num: usize,
4166dbb5987Sopenharmony_ci            io_shutdown: Arc<AtomicBool>,
4176dbb5987Sopenharmony_ci            sender: UnboundedSender<ReqMessage>,
4186dbb5987Sopenharmony_ci        ) -> Self {
4196dbb5987Sopenharmony_ci            Self {
4206dbb5987Sopenharmony_ci                allow_cached_frames: allow_cached_num,
4216dbb5987Sopenharmony_ci                sender,
4226dbb5987Sopenharmony_ci                receiver: RespReceiver::default(),
4236dbb5987Sopenharmony_ci                io_shutdown,
4246dbb5987Sopenharmony_ci                _mark: PhantomData,
4256dbb5987Sopenharmony_ci            }
4266dbb5987Sopenharmony_ci        }
4276dbb5987Sopenharmony_ci
4286dbb5987Sopenharmony_ci        pub(crate) fn send_frame_to_controller(
4296dbb5987Sopenharmony_ci            &mut self,
4306dbb5987Sopenharmony_ci            request: RequestWrapper,
4316dbb5987Sopenharmony_ci        ) -> Result<(), HttpClientError> {
4326dbb5987Sopenharmony_ci            let (tx, rx) = bounded_channel::<RespMessage>(self.allow_cached_frames);
4336dbb5987Sopenharmony_ci            self.receiver.set_receiver(rx);
4346dbb5987Sopenharmony_ci            self.sender
4356dbb5987Sopenharmony_ci                .send(ReqMessage {
4366dbb5987Sopenharmony_ci                    sender: tx,
4376dbb5987Sopenharmony_ci                    request,
4386dbb5987Sopenharmony_ci                })
4396dbb5987Sopenharmony_ci                .map_err(|_| {
4406dbb5987Sopenharmony_ci                    HttpClientError::from_str(ErrorKind::Request, "Request Sender Closed !")
4416dbb5987Sopenharmony_ci                })
4426dbb5987Sopenharmony_ci        }
4436dbb5987Sopenharmony_ci    }
4446dbb5987Sopenharmony_ci
4456dbb5987Sopenharmony_ci    impl StreamController {
4466dbb5987Sopenharmony_ci        pub(crate) fn new(streams: Streams, shutdown: Arc<AtomicBool>) -> Self {
4476dbb5987Sopenharmony_ci            Self {
4486dbb5987Sopenharmony_ci                io_shutdown: shutdown,
4496dbb5987Sopenharmony_ci                senders: HashMap::new(),
4506dbb5987Sopenharmony_ci                curr_message: HashMap::new(),
4516dbb5987Sopenharmony_ci                streams,
4526dbb5987Sopenharmony_ci                recved_go_away: None,
4536dbb5987Sopenharmony_ci                go_away_sync: GoAwaySync::default(),
4546dbb5987Sopenharmony_ci            }
4556dbb5987Sopenharmony_ci        }
4566dbb5987Sopenharmony_ci
4576dbb5987Sopenharmony_ci        pub(crate) fn shutdown(&self) {
4586dbb5987Sopenharmony_ci            self.io_shutdown.store(true, Ordering::Release);
4596dbb5987Sopenharmony_ci        }
4606dbb5987Sopenharmony_ci
4616dbb5987Sopenharmony_ci        pub(crate) fn get_unsent_streams(
4626dbb5987Sopenharmony_ci            &mut self,
4636dbb5987Sopenharmony_ci            last_stream_id: StreamId,
4646dbb5987Sopenharmony_ci        ) -> Result<Vec<StreamId>, H2Error> {
4656dbb5987Sopenharmony_ci            // The last-stream-id in the subsequent GO_AWAY frame
4666dbb5987Sopenharmony_ci            // cannot be greater than the last-stream-id in the previous GO_AWAY frame.
4676dbb5987Sopenharmony_ci            if self.streams.max_send_id < last_stream_id {
4686dbb5987Sopenharmony_ci                return Err(H2Error::ConnectionError(ErrorCode::ProtocolError));
4696dbb5987Sopenharmony_ci            }
4706dbb5987Sopenharmony_ci            self.streams.max_send_id = last_stream_id;
4716dbb5987Sopenharmony_ci            Ok(self.streams.get_go_away_streams(last_stream_id))
4726dbb5987Sopenharmony_ci        }
4736dbb5987Sopenharmony_ci
4746dbb5987Sopenharmony_ci        pub(crate) fn send_message_to_stream(
4756dbb5987Sopenharmony_ci            &mut self,
4766dbb5987Sopenharmony_ci            cx: &mut Context<'_>,
4776dbb5987Sopenharmony_ci            stream_id: StreamId,
4786dbb5987Sopenharmony_ci            message: RespMessage,
4796dbb5987Sopenharmony_ci        ) -> Poll<Result<(), H2Error>> {
4806dbb5987Sopenharmony_ci            if let Some(sender) = self.senders.get(&stream_id) {
4816dbb5987Sopenharmony_ci                // If the client coroutine has exited, this frame is skipped.
4826dbb5987Sopenharmony_ci                let mut tx = {
4836dbb5987Sopenharmony_ci                    let sender = sender.clone();
4846dbb5987Sopenharmony_ci                    let ft = async move { sender.send(message).await };
4856dbb5987Sopenharmony_ci                    Box::pin(ft)
4866dbb5987Sopenharmony_ci                };
4876dbb5987Sopenharmony_ci
4886dbb5987Sopenharmony_ci                match tx.as_mut().poll(cx) {
4896dbb5987Sopenharmony_ci                    Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
4906dbb5987Sopenharmony_ci                    // The current coroutine sending the request exited prematurely.
4916dbb5987Sopenharmony_ci                    Poll::Ready(Err(_)) => {
4926dbb5987Sopenharmony_ci                        self.senders.remove(&stream_id);
4936dbb5987Sopenharmony_ci                        Poll::Ready(Err(H2Error::StreamError(stream_id, ErrorCode::NoError)))
4946dbb5987Sopenharmony_ci                    }
4956dbb5987Sopenharmony_ci                    Poll::Pending => {
4966dbb5987Sopenharmony_ci                        self.curr_message.insert(stream_id, tx);
4976dbb5987Sopenharmony_ci                        Poll::Pending
4986dbb5987Sopenharmony_ci                    }
4996dbb5987Sopenharmony_ci                }
5006dbb5987Sopenharmony_ci            } else {
5016dbb5987Sopenharmony_ci                Poll::Ready(Err(H2Error::StreamError(stream_id, ErrorCode::NoError)))
5026dbb5987Sopenharmony_ci            }
5036dbb5987Sopenharmony_ci        }
5046dbb5987Sopenharmony_ci
5056dbb5987Sopenharmony_ci        pub(crate) fn poll_blocked_message(
5066dbb5987Sopenharmony_ci            &mut self,
5076dbb5987Sopenharmony_ci            cx: &mut Context<'_>,
5086dbb5987Sopenharmony_ci            input_tx: &UnboundedSender<Frame>,
5096dbb5987Sopenharmony_ci        ) -> Poll<()> {
5106dbb5987Sopenharmony_ci            let keys: Vec<StreamId> = self.curr_message.keys().cloned().collect();
5116dbb5987Sopenharmony_ci            let mut blocked = false;
5126dbb5987Sopenharmony_ci
5136dbb5987Sopenharmony_ci            for key in keys {
5146dbb5987Sopenharmony_ci                if let Some(mut task) = self.curr_message.remove(&key) {
5156dbb5987Sopenharmony_ci                    match task.as_mut().poll(cx) {
5166dbb5987Sopenharmony_ci                        Poll::Ready(Ok(_)) => {}
5176dbb5987Sopenharmony_ci                        // The current coroutine sending the request exited prematurely.
5186dbb5987Sopenharmony_ci                        Poll::Ready(Err(_)) => {
5196dbb5987Sopenharmony_ci                            self.senders.remove(&key);
5206dbb5987Sopenharmony_ci                            if let Some(state) = self.streams.stream_state(key) {
5216dbb5987Sopenharmony_ci                                if !matches!(state, H2StreamState::Closed(_)) {
5226dbb5987Sopenharmony_ci                                    if let StreamEndState::OK = self.streams.send_local_reset(key) {
5236dbb5987Sopenharmony_ci                                        let rest_payload =
5246dbb5987Sopenharmony_ci                                            RstStream::new(ErrorCode::NoError.into_code());
5256dbb5987Sopenharmony_ci                                        let frame = Frame::new(
5266dbb5987Sopenharmony_ci                                            key,
5276dbb5987Sopenharmony_ci                                            FrameFlags::empty(),
5286dbb5987Sopenharmony_ci                                            Payload::RstStream(rest_payload),
5296dbb5987Sopenharmony_ci                                        );
5306dbb5987Sopenharmony_ci                                        // ignore the send error occurs here in order to finish all
5316dbb5987Sopenharmony_ci                                        // tasks.
5326dbb5987Sopenharmony_ci                                        let _ = input_tx.send(frame);
5336dbb5987Sopenharmony_ci                                    }
5346dbb5987Sopenharmony_ci                                }
5356dbb5987Sopenharmony_ci                            }
5366dbb5987Sopenharmony_ci                        }
5376dbb5987Sopenharmony_ci                        Poll::Pending => {
5386dbb5987Sopenharmony_ci                            self.curr_message.insert(key, task);
5396dbb5987Sopenharmony_ci                            blocked = true;
5406dbb5987Sopenharmony_ci                        }
5416dbb5987Sopenharmony_ci                    }
5426dbb5987Sopenharmony_ci                }
5436dbb5987Sopenharmony_ci            }
5446dbb5987Sopenharmony_ci            if blocked {
5456dbb5987Sopenharmony_ci                Poll::Pending
5466dbb5987Sopenharmony_ci            } else {
5476dbb5987Sopenharmony_ci                Poll::Ready(())
5486dbb5987Sopenharmony_ci            }
5496dbb5987Sopenharmony_ci        }
5506dbb5987Sopenharmony_ci    }
5516dbb5987Sopenharmony_ci
5526dbb5987Sopenharmony_ci    impl RespReceiver {
5536dbb5987Sopenharmony_ci        pub(crate) fn set_receiver(&mut self, receiver: BoundedReceiver<RespMessage>) {
5546dbb5987Sopenharmony_ci            self.receiver = Some(receiver);
5556dbb5987Sopenharmony_ci        }
5566dbb5987Sopenharmony_ci
5576dbb5987Sopenharmony_ci        pub(crate) async fn recv(&mut self) -> Result<Frame, HttpClientError> {
5586dbb5987Sopenharmony_ci            match self.receiver {
5596dbb5987Sopenharmony_ci                Some(ref mut receiver) => {
5606dbb5987Sopenharmony_ci                    #[cfg(feature = "tokio_base")]
5616dbb5987Sopenharmony_ci                    match receiver.recv().await {
5626dbb5987Sopenharmony_ci                        None => err_from_msg!(Request, "Response Receiver Closed !"),
5636dbb5987Sopenharmony_ci                        Some(message) => match message {
5646dbb5987Sopenharmony_ci                            RespMessage::Output(frame) => Ok(frame),
5656dbb5987Sopenharmony_ci                            RespMessage::OutputExit(e) => Err(dispatch_client_error(e)),
5666dbb5987Sopenharmony_ci                        },
5676dbb5987Sopenharmony_ci                    }
5686dbb5987Sopenharmony_ci
5696dbb5987Sopenharmony_ci                    #[cfg(feature = "ylong_base")]
5706dbb5987Sopenharmony_ci                    match receiver.recv().await {
5716dbb5987Sopenharmony_ci                        Err(err) => Err(HttpClientError::from_error(ErrorKind::Request, err)),
5726dbb5987Sopenharmony_ci                        Ok(message) => match message {
5736dbb5987Sopenharmony_ci                            RespMessage::Output(frame) => Ok(frame),
5746dbb5987Sopenharmony_ci                            RespMessage::OutputExit(e) => Err(dispatch_client_error(e)),
5756dbb5987Sopenharmony_ci                        },
5766dbb5987Sopenharmony_ci                    }
5776dbb5987Sopenharmony_ci                }
5786dbb5987Sopenharmony_ci                // this will not happen.
5796dbb5987Sopenharmony_ci                None => Err(HttpClientError::from_str(
5806dbb5987Sopenharmony_ci                    ErrorKind::Request,
5816dbb5987Sopenharmony_ci                    "Invalid Frame Receiver !",
5826dbb5987Sopenharmony_ci                )),
5836dbb5987Sopenharmony_ci            }
5846dbb5987Sopenharmony_ci        }
5856dbb5987Sopenharmony_ci
5866dbb5987Sopenharmony_ci        pub(crate) fn poll_recv(
5876dbb5987Sopenharmony_ci            &mut self,
5886dbb5987Sopenharmony_ci            cx: &mut Context<'_>,
5896dbb5987Sopenharmony_ci        ) -> Poll<Result<Frame, HttpClientError>> {
5906dbb5987Sopenharmony_ci            if let Some(ref mut receiver) = self.receiver {
5916dbb5987Sopenharmony_ci                #[cfg(feature = "tokio_base")]
5926dbb5987Sopenharmony_ci                match receiver.poll_recv(cx) {
5936dbb5987Sopenharmony_ci                    Poll::Ready(None) => {
5946dbb5987Sopenharmony_ci                        Poll::Ready(err_from_msg!(Request, "Error receive response !"))
5956dbb5987Sopenharmony_ci                    }
5966dbb5987Sopenharmony_ci                    Poll::Ready(Some(message)) => match message {
5976dbb5987Sopenharmony_ci                        RespMessage::Output(frame) => Poll::Ready(Ok(frame)),
5986dbb5987Sopenharmony_ci                        RespMessage::OutputExit(e) => Poll::Ready(Err(dispatch_client_error(e))),
5996dbb5987Sopenharmony_ci                    },
6006dbb5987Sopenharmony_ci                    Poll::Pending => Poll::Pending,
6016dbb5987Sopenharmony_ci                }
6026dbb5987Sopenharmony_ci
6036dbb5987Sopenharmony_ci                #[cfg(feature = "ylong_base")]
6046dbb5987Sopenharmony_ci                match receiver.poll_recv(cx) {
6056dbb5987Sopenharmony_ci                    Poll::Ready(Err(e)) => {
6066dbb5987Sopenharmony_ci                        Poll::Ready(Err(HttpClientError::from_error(ErrorKind::Request, e)))
6076dbb5987Sopenharmony_ci                    }
6086dbb5987Sopenharmony_ci                    Poll::Ready(Ok(message)) => match message {
6096dbb5987Sopenharmony_ci                        RespMessage::Output(frame) => Poll::Ready(Ok(frame)),
6106dbb5987Sopenharmony_ci                        RespMessage::OutputExit(e) => Poll::Ready(Err(dispatch_client_error(e))),
6116dbb5987Sopenharmony_ci                    },
6126dbb5987Sopenharmony_ci                    Poll::Pending => Poll::Pending,
6136dbb5987Sopenharmony_ci                }
6146dbb5987Sopenharmony_ci            } else {
6156dbb5987Sopenharmony_ci                Poll::Ready(err_from_msg!(Request, "Invalid Frame Receiver !"))
6166dbb5987Sopenharmony_ci            }
6176dbb5987Sopenharmony_ci        }
6186dbb5987Sopenharmony_ci    }
6196dbb5987Sopenharmony_ci
6206dbb5987Sopenharmony_ci    async fn async_send_preface<S>(writer: &mut WriteHalf<S>) -> Result<(), DispatchErrorKind>
6216dbb5987Sopenharmony_ci    where
6226dbb5987Sopenharmony_ci        S: AsyncWrite + Unpin,
6236dbb5987Sopenharmony_ci    {
6246dbb5987Sopenharmony_ci        const PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
6256dbb5987Sopenharmony_ci        writer
6266dbb5987Sopenharmony_ci            .write_all(PREFACE)
6276dbb5987Sopenharmony_ci            .await
6286dbb5987Sopenharmony_ci            .map_err(|e| DispatchErrorKind::Io(e.kind()))
6296dbb5987Sopenharmony_ci    }
6306dbb5987Sopenharmony_ci
6316dbb5987Sopenharmony_ci    pub(crate) fn create_initial_settings(config: &H2Config) -> Frame {
6326dbb5987Sopenharmony_ci        let settings = SettingsBuilder::new()
6336dbb5987Sopenharmony_ci            .max_header_list_size(config.max_header_list_size())
6346dbb5987Sopenharmony_ci            .max_frame_size(config.max_frame_size())
6356dbb5987Sopenharmony_ci            .header_table_size(config.header_table_size())
6366dbb5987Sopenharmony_ci            .enable_push(config.enable_push())
6376dbb5987Sopenharmony_ci            .initial_window_size(config.stream_window_size())
6386dbb5987Sopenharmony_ci            .build();
6396dbb5987Sopenharmony_ci
6406dbb5987Sopenharmony_ci        Frame::new(0, FrameFlags::new(0), Payload::Settings(settings))
6416dbb5987Sopenharmony_ci    }
6426dbb5987Sopenharmony_ci
6436dbb5987Sopenharmony_ci    impl From<std::io::Error> for DispatchErrorKind {
6446dbb5987Sopenharmony_ci        fn from(value: std::io::Error) -> Self {
6456dbb5987Sopenharmony_ci            DispatchErrorKind::Io(value.kind())
6466dbb5987Sopenharmony_ci        }
6476dbb5987Sopenharmony_ci    }
6486dbb5987Sopenharmony_ci
6496dbb5987Sopenharmony_ci    impl From<H2Error> for DispatchErrorKind {
6506dbb5987Sopenharmony_ci        fn from(err: H2Error) -> Self {
6516dbb5987Sopenharmony_ci            DispatchErrorKind::H2(err)
6526dbb5987Sopenharmony_ci        }
6536dbb5987Sopenharmony_ci    }
6546dbb5987Sopenharmony_ci
6556dbb5987Sopenharmony_ci    pub(crate) fn dispatch_client_error(dispatch_error: DispatchErrorKind) -> HttpClientError {
6566dbb5987Sopenharmony_ci        match dispatch_error {
6576dbb5987Sopenharmony_ci            DispatchErrorKind::H2(e) => HttpClientError::from_error(Request, HttpError::from(e)),
6586dbb5987Sopenharmony_ci            DispatchErrorKind::Io(e) => {
6596dbb5987Sopenharmony_ci                HttpClientError::from_io_error(Request, std::io::Error::from(e))
6606dbb5987Sopenharmony_ci            }
6616dbb5987Sopenharmony_ci            DispatchErrorKind::ChannelClosed => {
6626dbb5987Sopenharmony_ci                HttpClientError::from_str(Request, "Coroutine channel closed.")
6636dbb5987Sopenharmony_ci            }
6646dbb5987Sopenharmony_ci            DispatchErrorKind::Disconnect => {
6656dbb5987Sopenharmony_ci                HttpClientError::from_str(Request, "remote peer closed.")
6666dbb5987Sopenharmony_ci            }
6676dbb5987Sopenharmony_ci        }
6686dbb5987Sopenharmony_ci    }
6696dbb5987Sopenharmony_ci}
6706dbb5987Sopenharmony_ci
6716dbb5987Sopenharmony_ci#[cfg(feature = "http3")]
6726dbb5987Sopenharmony_cipub(crate) mod http3 {
6736dbb5987Sopenharmony_ci    use std::marker::PhantomData;
6746dbb5987Sopenharmony_ci    use std::pin::Pin;
6756dbb5987Sopenharmony_ci    use std::sync::atomic::{AtomicBool, Ordering};
6766dbb5987Sopenharmony_ci    use std::sync::{Arc, Mutex};
6776dbb5987Sopenharmony_ci
6786dbb5987Sopenharmony_ci    use ylong_http::error::HttpError;
6796dbb5987Sopenharmony_ci    use ylong_http::h3::{Frame, FrameDecoder, H3Error};
6806dbb5987Sopenharmony_ci
6816dbb5987Sopenharmony_ci    use crate::async_impl::{ConnInfo, QuicConn};
6826dbb5987Sopenharmony_ci    use crate::runtime::{
6836dbb5987Sopenharmony_ci        bounded_channel, unbounded_channel, AsyncRead, AsyncWrite, BoundedReceiver, BoundedSender,
6846dbb5987Sopenharmony_ci        UnboundedSender,
6856dbb5987Sopenharmony_ci    };
6866dbb5987Sopenharmony_ci    use crate::util::config::H3Config;
6876dbb5987Sopenharmony_ci    use crate::util::data_ref::BodyDataRef;
6886dbb5987Sopenharmony_ci    use crate::util::dispatcher::{ConnDispatcher, Dispatcher};
6896dbb5987Sopenharmony_ci    use crate::util::h3::io_manager::IOManager;
6906dbb5987Sopenharmony_ci    use crate::util::h3::stream_manager::StreamManager;
6916dbb5987Sopenharmony_ci    use crate::ErrorKind::Request;
6926dbb5987Sopenharmony_ci    use crate::{ErrorKind, HttpClientError};
6936dbb5987Sopenharmony_ci
6946dbb5987Sopenharmony_ci    pub(crate) struct Http3Dispatcher<S> {
6956dbb5987Sopenharmony_ci        pub(crate) req_tx: UnboundedSender<ReqMessage>,
6966dbb5987Sopenharmony_ci        pub(crate) handles: Vec<crate::runtime::JoinHandle<()>>,
6976dbb5987Sopenharmony_ci        pub(crate) _mark: PhantomData<S>,
6986dbb5987Sopenharmony_ci        pub(crate) io_shutdown: Arc<AtomicBool>,
6996dbb5987Sopenharmony_ci        pub(crate) io_goaway: Arc<AtomicBool>,
7006dbb5987Sopenharmony_ci    }
7016dbb5987Sopenharmony_ci
7026dbb5987Sopenharmony_ci    pub(crate) struct Http3Conn<S> {
7036dbb5987Sopenharmony_ci        pub(crate) sender: UnboundedSender<ReqMessage>,
7046dbb5987Sopenharmony_ci        pub(crate) resp_receiver: BoundedReceiver<RespMessage>,
7056dbb5987Sopenharmony_ci        pub(crate) resp_sender: BoundedSender<RespMessage>,
7066dbb5987Sopenharmony_ci        pub(crate) io_shutdown: Arc<AtomicBool>,
7076dbb5987Sopenharmony_ci        pub(crate) _mark: PhantomData<S>,
7086dbb5987Sopenharmony_ci    }
7096dbb5987Sopenharmony_ci
7106dbb5987Sopenharmony_ci    pub(crate) struct RequestWrapper {
7116dbb5987Sopenharmony_ci        pub(crate) header: Frame,
7126dbb5987Sopenharmony_ci        pub(crate) data: BodyDataRef,
7136dbb5987Sopenharmony_ci    }
7146dbb5987Sopenharmony_ci
7156dbb5987Sopenharmony_ci    #[derive(Debug, Clone)]
7166dbb5987Sopenharmony_ci    pub(crate) enum DispatchErrorKind {
7176dbb5987Sopenharmony_ci        H3(H3Error),
7186dbb5987Sopenharmony_ci        Io(std::io::ErrorKind),
7196dbb5987Sopenharmony_ci        Quic(quiche::Error),
7206dbb5987Sopenharmony_ci        ChannelClosed,
7216dbb5987Sopenharmony_ci        StreamFinished,
7226dbb5987Sopenharmony_ci        // todo: retry?
7236dbb5987Sopenharmony_ci        GoawayReceived,
7246dbb5987Sopenharmony_ci        Disconnect,
7256dbb5987Sopenharmony_ci    }
7266dbb5987Sopenharmony_ci
7276dbb5987Sopenharmony_ci    pub(crate) enum RespMessage {
7286dbb5987Sopenharmony_ci        Output(Frame),
7296dbb5987Sopenharmony_ci        OutputExit(DispatchErrorKind),
7306dbb5987Sopenharmony_ci    }
7316dbb5987Sopenharmony_ci
7326dbb5987Sopenharmony_ci    pub(crate) struct ReqMessage {
7336dbb5987Sopenharmony_ci        pub(crate) request: RequestWrapper,
7346dbb5987Sopenharmony_ci        pub(crate) frame_tx: BoundedSender<RespMessage>,
7356dbb5987Sopenharmony_ci    }
7366dbb5987Sopenharmony_ci
7376dbb5987Sopenharmony_ci    impl<S> Http3Dispatcher<S>
7386dbb5987Sopenharmony_ci    where
7396dbb5987Sopenharmony_ci        S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,
7406dbb5987Sopenharmony_ci    {
7416dbb5987Sopenharmony_ci        pub(crate) fn new(config: H3Config, io: S, quic_connection: QuicConn) -> Self {
7426dbb5987Sopenharmony_ci            let (req_tx, req_rx) = unbounded_channel();
7436dbb5987Sopenharmony_ci            let (io_manager_tx, io_manager_rx) = unbounded_channel();
7446dbb5987Sopenharmony_ci            let (stream_manager_tx, stream_manager_rx) = unbounded_channel();
7456dbb5987Sopenharmony_ci            let mut handles = Vec::with_capacity(2);
7466dbb5987Sopenharmony_ci            let conn = Arc::new(Mutex::new(quic_connection));
7476dbb5987Sopenharmony_ci            let io_shutdown = Arc::new(AtomicBool::new(false));
7486dbb5987Sopenharmony_ci            let io_goaway = Arc::new(AtomicBool::new(false));
7496dbb5987Sopenharmony_ci            let mut stream_manager = StreamManager::new(
7506dbb5987Sopenharmony_ci                conn.clone(),
7516dbb5987Sopenharmony_ci                io_manager_tx,
7526dbb5987Sopenharmony_ci                stream_manager_rx,
7536dbb5987Sopenharmony_ci                req_rx,
7546dbb5987Sopenharmony_ci                FrameDecoder::new(
7556dbb5987Sopenharmony_ci                    config.qpack_blocked_streams() as usize,
7566dbb5987Sopenharmony_ci                    config.qpack_max_table_capacity() as usize,
7576dbb5987Sopenharmony_ci                ),
7586dbb5987Sopenharmony_ci                io_shutdown.clone(),
7596dbb5987Sopenharmony_ci                io_goaway.clone(),
7606dbb5987Sopenharmony_ci            );
7616dbb5987Sopenharmony_ci            let stream_handle = crate::runtime::spawn(async move {
7626dbb5987Sopenharmony_ci                if stream_manager.init(config).is_err() {
7636dbb5987Sopenharmony_ci                    return;
7646dbb5987Sopenharmony_ci                }
7656dbb5987Sopenharmony_ci                let _ = Pin::new(&mut stream_manager).await;
7666dbb5987Sopenharmony_ci            });
7676dbb5987Sopenharmony_ci            handles.push(stream_handle);
7686dbb5987Sopenharmony_ci
7696dbb5987Sopenharmony_ci            let io_handle = crate::runtime::spawn(async move {
7706dbb5987Sopenharmony_ci                let mut io_manager = IOManager::new(io, conn, io_manager_rx, stream_manager_tx);
7716dbb5987Sopenharmony_ci                let _ = Pin::new(&mut io_manager).await;
7726dbb5987Sopenharmony_ci            });
7736dbb5987Sopenharmony_ci            handles.push(io_handle);
7746dbb5987Sopenharmony_ci            // read_rx gets readable stream ids and writable client channels, then read
7756dbb5987Sopenharmony_ci            // stream and send to the corresponding channel
7766dbb5987Sopenharmony_ci            Self {
7776dbb5987Sopenharmony_ci                req_tx,
7786dbb5987Sopenharmony_ci                handles,
7796dbb5987Sopenharmony_ci                _mark: PhantomData,
7806dbb5987Sopenharmony_ci                io_shutdown,
7816dbb5987Sopenharmony_ci                io_goaway,
7826dbb5987Sopenharmony_ci            }
7836dbb5987Sopenharmony_ci        }
7846dbb5987Sopenharmony_ci    }
7856dbb5987Sopenharmony_ci
7866dbb5987Sopenharmony_ci    impl<S> Http3Conn<S> {
7876dbb5987Sopenharmony_ci        pub(crate) fn new(
7886dbb5987Sopenharmony_ci            sender: UnboundedSender<ReqMessage>,
7896dbb5987Sopenharmony_ci            io_shutdown: Arc<AtomicBool>,
7906dbb5987Sopenharmony_ci        ) -> Self {
7916dbb5987Sopenharmony_ci            const CHANNEL_SIZE: usize = 3;
7926dbb5987Sopenharmony_ci            let (resp_sender, resp_receiver) = bounded_channel(CHANNEL_SIZE);
7936dbb5987Sopenharmony_ci            Self {
7946dbb5987Sopenharmony_ci                sender,
7956dbb5987Sopenharmony_ci                resp_sender,
7966dbb5987Sopenharmony_ci                resp_receiver,
7976dbb5987Sopenharmony_ci                _mark: PhantomData,
7986dbb5987Sopenharmony_ci                io_shutdown,
7996dbb5987Sopenharmony_ci            }
8006dbb5987Sopenharmony_ci        }
8016dbb5987Sopenharmony_ci
8026dbb5987Sopenharmony_ci        pub(crate) fn send_frame_to_reader(
8036dbb5987Sopenharmony_ci            &mut self,
8046dbb5987Sopenharmony_ci            request: RequestWrapper,
8056dbb5987Sopenharmony_ci        ) -> Result<(), HttpClientError> {
8066dbb5987Sopenharmony_ci            self.sender
8076dbb5987Sopenharmony_ci                .send(ReqMessage {
8086dbb5987Sopenharmony_ci                    request,
8096dbb5987Sopenharmony_ci                    frame_tx: self.resp_sender.clone(),
8106dbb5987Sopenharmony_ci                })
8116dbb5987Sopenharmony_ci                .map_err(|_| {
8126dbb5987Sopenharmony_ci                    HttpClientError::from_str(ErrorKind::Request, "Request Sender Closed !")
8136dbb5987Sopenharmony_ci                })
8146dbb5987Sopenharmony_ci        }
8156dbb5987Sopenharmony_ci
8166dbb5987Sopenharmony_ci        pub(crate) async fn recv_resp(&mut self) -> Result<Frame, HttpClientError> {
8176dbb5987Sopenharmony_ci            #[cfg(feature = "tokio_base")]
8186dbb5987Sopenharmony_ci            match self.resp_receiver.recv().await {
8196dbb5987Sopenharmony_ci                None => err_from_msg!(Request, "Response Receiver Closed !"),
8206dbb5987Sopenharmony_ci                Some(message) => match message {
8216dbb5987Sopenharmony_ci                    RespMessage::Output(frame) => Ok(frame),
8226dbb5987Sopenharmony_ci                    RespMessage::OutputExit(e) => Err(dispatch_client_error(e)),
8236dbb5987Sopenharmony_ci                },
8246dbb5987Sopenharmony_ci            }
8256dbb5987Sopenharmony_ci
8266dbb5987Sopenharmony_ci            #[cfg(feature = "ylong_base")]
8276dbb5987Sopenharmony_ci            match self.resp_receiver.recv().await {
8286dbb5987Sopenharmony_ci                Err(err) => Err(HttpClientError::from_error(ErrorKind::Request, err)),
8296dbb5987Sopenharmony_ci                Ok(message) => match message {
8306dbb5987Sopenharmony_ci                    RespMessage::Output(frame) => Ok(frame),
8316dbb5987Sopenharmony_ci                    RespMessage::OutputExit(e) => Err(dispatch_client_error(e)),
8326dbb5987Sopenharmony_ci                },
8336dbb5987Sopenharmony_ci            }
8346dbb5987Sopenharmony_ci        }
8356dbb5987Sopenharmony_ci    }
8366dbb5987Sopenharmony_ci
8376dbb5987Sopenharmony_ci    impl<S> ConnDispatcher<S>
8386dbb5987Sopenharmony_ci    where
8396dbb5987Sopenharmony_ci        S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,
8406dbb5987Sopenharmony_ci    {
8416dbb5987Sopenharmony_ci        pub(crate) fn http3(config: H3Config, io: S, quic_connection: QuicConn) -> Self {
8426dbb5987Sopenharmony_ci            Self::Http3(Http3Dispatcher::new(config, io, quic_connection))
8436dbb5987Sopenharmony_ci        }
8446dbb5987Sopenharmony_ci    }
8456dbb5987Sopenharmony_ci
8466dbb5987Sopenharmony_ci    impl<S> Dispatcher for Http3Dispatcher<S> {
8476dbb5987Sopenharmony_ci        type Handle = Http3Conn<S>;
8486dbb5987Sopenharmony_ci
8496dbb5987Sopenharmony_ci        fn dispatch(&self) -> Option<Self::Handle> {
8506dbb5987Sopenharmony_ci            let sender = self.req_tx.clone();
8516dbb5987Sopenharmony_ci            Some(Http3Conn::new(sender, self.io_shutdown.clone()))
8526dbb5987Sopenharmony_ci        }
8536dbb5987Sopenharmony_ci
8546dbb5987Sopenharmony_ci        fn is_shutdown(&self) -> bool {
8556dbb5987Sopenharmony_ci            self.io_shutdown.load(Ordering::Relaxed)
8566dbb5987Sopenharmony_ci        }
8576dbb5987Sopenharmony_ci
8586dbb5987Sopenharmony_ci        fn is_goaway(&self) -> bool {
8596dbb5987Sopenharmony_ci            self.io_goaway.load(Ordering::Relaxed)
8606dbb5987Sopenharmony_ci        }
8616dbb5987Sopenharmony_ci    }
8626dbb5987Sopenharmony_ci
8636dbb5987Sopenharmony_ci    impl<S> Drop for Http3Dispatcher<S> {
8646dbb5987Sopenharmony_ci        fn drop(&mut self) {
8656dbb5987Sopenharmony_ci            for handle in &self.handles {
8666dbb5987Sopenharmony_ci                #[cfg(feature = "tokio_base")]
8676dbb5987Sopenharmony_ci                handle.abort();
8686dbb5987Sopenharmony_ci                #[cfg(feature = "ylong_base")]
8696dbb5987Sopenharmony_ci                handle.cancel();
8706dbb5987Sopenharmony_ci            }
8716dbb5987Sopenharmony_ci        }
8726dbb5987Sopenharmony_ci    }
8736dbb5987Sopenharmony_ci
8746dbb5987Sopenharmony_ci    impl From<std::io::Error> for DispatchErrorKind {
8756dbb5987Sopenharmony_ci        fn from(value: std::io::Error) -> Self {
8766dbb5987Sopenharmony_ci            DispatchErrorKind::Io(value.kind())
8776dbb5987Sopenharmony_ci        }
8786dbb5987Sopenharmony_ci    }
8796dbb5987Sopenharmony_ci
8806dbb5987Sopenharmony_ci    impl From<H3Error> for DispatchErrorKind {
8816dbb5987Sopenharmony_ci        fn from(err: H3Error) -> Self {
8826dbb5987Sopenharmony_ci            DispatchErrorKind::H3(err)
8836dbb5987Sopenharmony_ci        }
8846dbb5987Sopenharmony_ci    }
8856dbb5987Sopenharmony_ci
8866dbb5987Sopenharmony_ci    impl From<quiche::Error> for DispatchErrorKind {
8876dbb5987Sopenharmony_ci        fn from(value: quiche::Error) -> Self {
8886dbb5987Sopenharmony_ci            DispatchErrorKind::Quic(value)
8896dbb5987Sopenharmony_ci        }
8906dbb5987Sopenharmony_ci    }
8916dbb5987Sopenharmony_ci
8926dbb5987Sopenharmony_ci    pub(crate) fn dispatch_client_error(dispatch_error: DispatchErrorKind) -> HttpClientError {
8936dbb5987Sopenharmony_ci        match dispatch_error {
8946dbb5987Sopenharmony_ci            DispatchErrorKind::H3(e) => HttpClientError::from_error(Request, HttpError::from(e)),
8956dbb5987Sopenharmony_ci            DispatchErrorKind::Io(e) => {
8966dbb5987Sopenharmony_ci                HttpClientError::from_io_error(Request, std::io::Error::from(e))
8976dbb5987Sopenharmony_ci            }
8986dbb5987Sopenharmony_ci            DispatchErrorKind::ChannelClosed => {
8996dbb5987Sopenharmony_ci                HttpClientError::from_str(Request, "Coroutine channel closed.")
9006dbb5987Sopenharmony_ci            }
9016dbb5987Sopenharmony_ci            DispatchErrorKind::Quic(e) => HttpClientError::from_error(Request, e),
9026dbb5987Sopenharmony_ci            DispatchErrorKind::GoawayReceived => {
9036dbb5987Sopenharmony_ci                HttpClientError::from_str(Request, "received remote goaway.")
9046dbb5987Sopenharmony_ci            }
9056dbb5987Sopenharmony_ci            DispatchErrorKind::StreamFinished => {
9066dbb5987Sopenharmony_ci                HttpClientError::from_str(Request, "stream finished.")
9076dbb5987Sopenharmony_ci            }
9086dbb5987Sopenharmony_ci            DispatchErrorKind::Disconnect => {
9096dbb5987Sopenharmony_ci                HttpClientError::from_str(Request, "remote peer closed.")
9106dbb5987Sopenharmony_ci            }
9116dbb5987Sopenharmony_ci        }
9126dbb5987Sopenharmony_ci    }
9136dbb5987Sopenharmony_ci}
9146dbb5987Sopenharmony_ci
9156dbb5987Sopenharmony_ci#[cfg(test)]
9166dbb5987Sopenharmony_cimod ut_dispatch {
9176dbb5987Sopenharmony_ci    use crate::dispatcher::{ConnDispatcher, Dispatcher};
9186dbb5987Sopenharmony_ci
9196dbb5987Sopenharmony_ci    /// UT test cases for `ConnDispatcher::is_shutdown`.
9206dbb5987Sopenharmony_ci    ///
9216dbb5987Sopenharmony_ci    /// # Brief
9226dbb5987Sopenharmony_ci    /// 1. Creates a `ConnDispatcher`.
9236dbb5987Sopenharmony_ci    /// 2. Calls `ConnDispatcher::is_shutdown` to get the result.
9246dbb5987Sopenharmony_ci    /// 3. Calls `ConnDispatcher::dispatch` to get the result.
9256dbb5987Sopenharmony_ci    /// 4. Checks if the result is false.
9266dbb5987Sopenharmony_ci    #[test]
9276dbb5987Sopenharmony_ci    fn ut_is_shutdown() {
9286dbb5987Sopenharmony_ci        let conn = ConnDispatcher::http1(b"Data");
9296dbb5987Sopenharmony_ci        let res = conn.is_shutdown();
9306dbb5987Sopenharmony_ci        assert!(!res);
9316dbb5987Sopenharmony_ci        let res = conn.dispatch();
9326dbb5987Sopenharmony_ci        assert!(res.is_some());
9336dbb5987Sopenharmony_ci    }
9346dbb5987Sopenharmony_ci}
935