1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::mem::take;
15 use std::sync::{Arc, Mutex};
16 
17 #[cfg(feature = "http3")]
18 use ylong_http::request::uri::Authority;
19 #[cfg(any(feature = "http2", feature = "http3"))]
20 use ylong_http::request::uri::Scheme;
21 use ylong_http::request::uri::Uri;
22 
23 use crate::async_impl::connector::ConnInfo;
24 #[cfg(feature = "http3")]
25 use crate::async_impl::quic::QuicConn;
26 use crate::async_impl::Connector;
27 #[cfg(feature = "http3")]
28 use crate::async_impl::Response;
29 use crate::error::HttpClientError;
30 use crate::runtime::{AsyncRead, AsyncWrite};
31 #[cfg(feature = "http3")]
32 use crate::util::alt_svc::{AltService, AltServiceMap};
33 #[cfg(feature = "http2")]
34 use crate::util::config::H2Config;
35 #[cfg(feature = "http3")]
36 use crate::util::config::H3Config;
37 use crate::util::config::{HttpConfig, HttpVersion};
38 use crate::util::dispatcher::{Conn, ConnDispatcher, Dispatcher};
39 use crate::util::pool::{Pool, PoolKey};
40 #[cfg(feature = "http3")]
41 use crate::util::request::RequestArc;
42 
43 pub(crate) struct ConnPool<C, S> {
44     pool: Pool<PoolKey, Conns<S>>,
45     #[cfg(feature = "http3")]
46     alt_svcs: AltServiceMap,
47     connector: Arc<C>,
48     config: HttpConfig,
49 }
50 
51 impl<C: Connector> ConnPool<C, C::Stream> {
52     pub(crate) fn new(config: HttpConfig, connector: C) -> Self {
53         Self {
54             pool: Pool::new(),
55             #[cfg(feature = "http3")]
56             alt_svcs: AltServiceMap::new(),
57             connector: Arc::new(connector),
58             config,
59         }
60     }
61 
62     pub(crate) async fn connect_to(&self, uri: &Uri) -> Result<Conn<C::Stream>, HttpClientError> {
63         let key = PoolKey::new(
64             uri.scheme().unwrap().clone(),
65             uri.authority().unwrap().clone(),
66         );
67 
68         #[cfg(feature = "http3")]
69         let alt_svc = self.alt_svcs.get_alt_svcs(&key);
70 
71         self.pool
72             .get(key, Conns::new)
73             .conn(
74                 self.config.clone(),
75                 self.connector.clone(),
76                 uri,
77                 #[cfg(feature = "http3")]
78                 alt_svc,
79             )
80             .await
81     }
82 
83     #[cfg(feature = "http3")]
84     pub(crate) fn set_alt_svcs(&self, request: RequestArc, response: &Response) {
85         self.alt_svcs.set_alt_svcs(request, response);
86     }
87 }
88 
89 pub(crate) struct Conns<S> {
90     list: Arc<Mutex<Vec<ConnDispatcher<S>>>>,
91     #[cfg(feature = "http2")]
92     h2_conn: Arc<crate::runtime::AsyncMutex<Vec<ConnDispatcher<S>>>>,
93     #[cfg(feature = "http3")]
94     h3_conn: Arc<crate::runtime::AsyncMutex<Vec<ConnDispatcher<S>>>>,
95 }
96 
97 impl<S> Conns<S> {
newnull98     fn new() -> Self {
99         Self {
100             list: Arc::new(Mutex::new(Vec::new())),
101 
102             #[cfg(feature = "http2")]
103             h2_conn: Arc::new(crate::runtime::AsyncMutex::new(Vec::with_capacity(1))),
104 
105             #[cfg(feature = "http3")]
106             h3_conn: Arc::new(crate::runtime::AsyncMutex::new(Vec::with_capacity(1))),
107         }
108     }
109 
110     // fn get_alt_svcs
111 }
112 
113 impl<S> Clone for Conns<S> {
clonenull114     fn clone(&self) -> Self {
115         Self {
116             list: self.list.clone(),
117 
118             #[cfg(feature = "http2")]
119             h2_conn: self.h2_conn.clone(),
120 
121             #[cfg(feature = "http3")]
122             h3_conn: self.h3_conn.clone(),
123         }
124     }
125 }
126 
127 impl<S: AsyncRead + AsyncWrite + ConnInfo + Unpin + Send + Sync + 'static> Conns<S> {
128     async fn conn<C>(
129         &mut self,
130         config: HttpConfig,
131         connector: Arc<C>,
132         url: &Uri,
133         #[cfg(feature = "http3")] alt_svc: Option<Vec<AltService>>,
134     ) -> Result<Conn<S>, HttpClientError>
135     where
136         C: Connector<Stream = S>,
137     {
138         match config.version {
139             #[cfg(feature = "http3")]
140             HttpVersion::Http3 => self.conn_h3(connector, url, config.http3_config).await,
141             #[cfg(feature = "http2")]
142             HttpVersion::Http2 => self.conn_h2(connector, url, config.http2_config).await,
143             #[cfg(feature = "http1_1")]
144             HttpVersion::Http1 => self.conn_h1(connector, url).await,
145             HttpVersion::Negotiate => {
146                 #[cfg(feature = "http3")]
147                 if let Some(conn) = self
148                     .conn_alt_svc(&connector, url, alt_svc, config.http3_config)
149                     .await
150                 {
151                     return Ok(conn);
152                 }
153 
154                 #[cfg(all(feature = "http1_1", not(feature = "http2")))]
155                 return self.conn_h1(connector, url).await;
156 
157                 #[cfg(all(feature = "http2", feature = "http1_1"))]
158                 return self
159                     .conn_negotiate(connector, url, config.http2_config)
160                     .await;
161             }
162         }
163     }
164 
165     async fn conn_h1<C>(&self, connector: Arc<C>, url: &Uri) -> Result<Conn<S>, HttpClientError>
166     where
167         C: Connector<Stream = S>,
168     {
169         if let Some(conn) = self.exist_h1_conn() {
170             return Ok(conn);
171         }
172         let dispatcher = ConnDispatcher::http1(connector.connect(url, HttpVersion::Http1).await?);
173         Ok(self.dispatch_h1_conn(dispatcher))
174     }
175 
176     #[cfg(feature = "http2")]
177     async fn conn_h2<C>(
178         &self,
179         connector: Arc<C>,
180         url: &Uri,
181         config: H2Config,
182     ) -> Result<Conn<S>, HttpClientError>
183     where
184         C: Connector<Stream = S>,
185     {
186         // The lock `h2_occupation` is used to prevent multiple coroutines from sending
187         // Requests at the same time under concurrent conditions,
188         // resulting in the creation of multiple tcp connections
189         let mut lock = self.h2_conn.lock().await;
190 
191         if let Some(conn) = Self::exist_h2_conn(&mut lock) {
192             return Ok(conn);
193         }
194         let stream = connector.connect(url, HttpVersion::Http2).await?;
195         let details = stream.conn_detail();
196         let tls = if let Some(scheme) = url.scheme() {
197             *scheme == Scheme::HTTPS
198         } else {
199             false
200         };
201         match details.alpn() {
202             None if tls => return err_from_msg!(Connect, "The peer does not support http/2."),
203             Some(protocol) if protocol != b"h2" => {
204                 return err_from_msg!(Connect, "Alpn negotiate a wrong protocol version.")
205             }
206             _ => {}
207         }
208 
209         Ok(Self::dispatch_h2_conn(config, stream, &mut lock))
210     }
211 
212     #[cfg(feature = "http3")]
213     async fn conn_h3<C>(
214         &self,
215         connector: Arc<C>,
216         url: &Uri,
217         config: H3Config,
218     ) -> Result<Conn<S>, HttpClientError>
219     where
220         C: Connector<Stream = S>,
221     {
222         let mut lock = self.h3_conn.lock().await;
223 
224         if let Some(conn) = Self::exist_h3_conn(&mut lock) {
225             return Ok(conn);
226         }
227         let mut stream = connector.connect(url, HttpVersion::Http3).await?;
228         let quic_conn = stream.quic_conn().ok_or(HttpClientError::from_str(
229             crate::ErrorKind::Connect,
230             "QUIC connect failed",
231         ))?;
232 
233         Ok(Self::dispatch_h3_conn(config, stream, quic_conn, &mut lock))
234     }
235 
236     #[cfg(all(feature = "http2", feature = "http1_1"))]
237     async fn conn_negotiate<C>(
238         &self,
239         connector: Arc<C>,
240         url: &Uri,
241         h2_config: H2Config,
242     ) -> Result<Conn<S>, HttpClientError>
243     where
244         C: Connector<Stream = S>,
245     {
246         match *url.scheme().unwrap() {
247             Scheme::HTTPS => {
248                 let mut lock = self.h2_conn.lock().await;
249                 if let Some(conn) = Self::exist_h2_conn(&mut lock) {
250                     return Ok(conn);
251                 }
252 
253                 if let Some(conn) = self.exist_h1_conn() {
254                     return Ok(conn);
255                 }
256 
257                 let stream = connector.connect(url, HttpVersion::Negotiate).await?;
258                 let details = stream.conn_detail();
259 
260                 let protocol = if let Some(bytes) = details.alpn() {
261                     bytes
262                 } else {
263                     let dispatcher = ConnDispatcher::http1(stream);
264                     return Ok(self.dispatch_h1_conn(dispatcher));
265                 };
266 
267                 if protocol == b"http/1.1" {
268                     let dispatcher = ConnDispatcher::http1(stream);
269                     Ok(self.dispatch_h1_conn(dispatcher))
270                 } else if protocol == b"h2" {
271                     Ok(Self::dispatch_h2_conn(h2_config, stream, &mut lock))
272                 } else {
273                     err_from_msg!(Connect, "Alpn negotiate a wrong protocol version.")
274                 }
275             }
276             Scheme::HTTP => self.conn_h1(connector, url).await,
277         }
278     }
279 
280     #[cfg(feature = "http3")]
281     async fn conn_alt_svc<C>(
282         &self,
283         connector: &Arc<C>,
284         url: &Uri,
285         alt_svcs: Option<Vec<AltService>>,
286         h3_config: H3Config,
287     ) -> Option<Conn<S>>
288     where
289         C: Connector<Stream = S>,
290     {
291         let mut lock = self.h3_conn.lock().await;
292         if let Some(conn) = Self::exist_h3_conn(&mut lock) {
293             return Some(conn);
294         }
295         if let Some(alt_svcs) = alt_svcs {
296             for alt_svc in alt_svcs {
297                 // only support h3 alt_svc now
298                 if alt_svc.http_version != HttpVersion::Http3 {
299                     continue;
300                 }
301                 let scheme = Scheme::HTTPS;
302                 let host = match alt_svc.host {
303                     Some(ref host) => host.clone(),
304                     None => url.host().cloned().unwrap(),
305                 };
306                 let port = alt_svc.port.clone();
307                 let authority =
308                     Authority::from_bytes((host.to_string() + ":" + port.as_str()).as_bytes())
309                         .ok()?;
310                 let path = url.path().cloned();
311                 let query = url.query().cloned();
312                 let alt_url = Uri::from_raw_parts(Some(scheme), Some(authority), path, query);
313                 let mut stream = connector.connect(&alt_url, HttpVersion::Http3).await.ok()?;
314                 let quic_conn = stream.quic_conn().unwrap();
315                 return Some(Self::dispatch_h3_conn(
316                     h3_config.clone(),
317                     stream,
318                     quic_conn,
319                     &mut lock,
320                 ));
321             }
322         }
323         None
324     }
325 
dispatch_h1_connnull326     fn dispatch_h1_conn(&self, dispatcher: ConnDispatcher<S>) -> Conn<S> {
327         // We must be able to get the `Conn` here.
328         let conn = dispatcher.dispatch().unwrap();
329         let mut list = self.list.lock().unwrap();
330         list.push(dispatcher);
331 
332         conn
333     }
334 
335     #[cfg(feature = "http2")]
dispatch_h2_connnull336     fn dispatch_h2_conn(
337         config: H2Config,
338         stream: S,
339         lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>,
340     ) -> Conn<S> {
341         let dispatcher = ConnDispatcher::http2(config, stream);
342         let conn = dispatcher.dispatch().unwrap();
343         lock.push(dispatcher);
344         conn
345     }
346 
347     #[cfg(feature = "http3")]
dispatch_h3_connnull348     fn dispatch_h3_conn(
349         config: H3Config,
350         stream: S,
351         quic_connection: QuicConn,
352         lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>,
353     ) -> Conn<S> {
354         let dispatcher = ConnDispatcher::http3(config, stream, quic_connection);
355         let conn = dispatcher.dispatch().unwrap();
356         lock.push(dispatcher);
357         conn
358     }
359 
exist_h1_connnull360     fn exist_h1_conn(&self) -> Option<Conn<S>> {
361         let mut list = self.list.lock().unwrap();
362         let mut conn = None;
363         let curr = take(&mut *list);
364         // TODO Distinguish between http2 connections and http1 connections.
365         for dispatcher in curr.into_iter() {
366             // Discard invalid dispatchers.
367             if dispatcher.is_shutdown() {
368                 continue;
369             }
370             if conn.is_none() {
371                 conn = dispatcher.dispatch();
372             }
373             list.push(dispatcher);
374         }
375         conn
376     }
377 
378     #[cfg(feature = "http2")]
exist_h2_connnull379     fn exist_h2_conn(
380         lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>,
381     ) -> Option<Conn<S>> {
382         if let Some(dispatcher) = lock.pop() {
383             // todo: shutdown and goaway
384             if !dispatcher.is_shutdown() {
385                 if let Some(conn) = dispatcher.dispatch() {
386                     lock.push(dispatcher);
387                     return Some(conn);
388                 }
389             }
390         }
391         None
392     }
393 
394     #[cfg(feature = "http3")]
exist_h3_connnull395     fn exist_h3_conn(
396         lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>,
397     ) -> Option<Conn<S>> {
398         if let Some(dispatcher) = lock.pop() {
399             if dispatcher.is_shutdown() {
400                 return None;
401             }
402             if !dispatcher.is_goaway() {
403                 if let Some(conn) = dispatcher.dispatch() {
404                     lock.push(dispatcher);
405                     return Some(conn);
406                 }
407             }
408             // Not all requests have been processed yet
409             lock.push(dispatcher);
410         }
411         None
412     }
413 }
414