1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::mem::take; 15 use std::sync::{Arc, Mutex}; 16 17 #[cfg(feature = "http3")] 18 use ylong_http::request::uri::Authority; 19 #[cfg(any(feature = "http2", feature = "http3"))] 20 use ylong_http::request::uri::Scheme; 21 use ylong_http::request::uri::Uri; 22 23 use crate::async_impl::connector::ConnInfo; 24 #[cfg(feature = "http3")] 25 use crate::async_impl::quic::QuicConn; 26 use crate::async_impl::Connector; 27 #[cfg(feature = "http3")] 28 use crate::async_impl::Response; 29 use crate::error::HttpClientError; 30 use crate::runtime::{AsyncRead, AsyncWrite}; 31 #[cfg(feature = "http3")] 32 use crate::util::alt_svc::{AltService, AltServiceMap}; 33 #[cfg(feature = "http2")] 34 use crate::util::config::H2Config; 35 #[cfg(feature = "http3")] 36 use crate::util::config::H3Config; 37 use crate::util::config::{HttpConfig, HttpVersion}; 38 use crate::util::dispatcher::{Conn, ConnDispatcher, Dispatcher}; 39 use crate::util::pool::{Pool, PoolKey}; 40 #[cfg(feature = "http3")] 41 use crate::util::request::RequestArc; 42 43 pub(crate) struct ConnPool<C, S> { 44 pool: Pool<PoolKey, Conns<S>>, 45 #[cfg(feature = "http3")] 46 alt_svcs: AltServiceMap, 47 connector: Arc<C>, 48 config: HttpConfig, 49 } 50 51 impl<C: Connector> ConnPool<C, C::Stream> { 52 pub(crate) fn new(config: HttpConfig, connector: C) -> Self { 53 Self { 54 pool: Pool::new(), 55 #[cfg(feature = "http3")] 56 alt_svcs: AltServiceMap::new(), 57 connector: Arc::new(connector), 58 config, 59 } 60 } 61 62 pub(crate) async fn connect_to(&self, uri: &Uri) -> Result<Conn<C::Stream>, HttpClientError> { 63 let key = PoolKey::new( 64 uri.scheme().unwrap().clone(), 65 uri.authority().unwrap().clone(), 66 ); 67 68 #[cfg(feature = "http3")] 69 let alt_svc = self.alt_svcs.get_alt_svcs(&key); 70 71 self.pool 72 .get(key, Conns::new) 73 .conn( 74 self.config.clone(), 75 self.connector.clone(), 76 uri, 77 #[cfg(feature = "http3")] 78 alt_svc, 79 ) 80 .await 81 } 82 83 #[cfg(feature = "http3")] 84 pub(crate) fn set_alt_svcs(&self, request: RequestArc, response: &Response) { 85 self.alt_svcs.set_alt_svcs(request, response); 86 } 87 } 88 89 pub(crate) struct Conns<S> { 90 list: Arc<Mutex<Vec<ConnDispatcher<S>>>>, 91 #[cfg(feature = "http2")] 92 h2_conn: Arc<crate::runtime::AsyncMutex<Vec<ConnDispatcher<S>>>>, 93 #[cfg(feature = "http3")] 94 h3_conn: Arc<crate::runtime::AsyncMutex<Vec<ConnDispatcher<S>>>>, 95 } 96 97 impl<S> Conns<S> { newnull98 fn new() -> Self { 99 Self { 100 list: Arc::new(Mutex::new(Vec::new())), 101 102 #[cfg(feature = "http2")] 103 h2_conn: Arc::new(crate::runtime::AsyncMutex::new(Vec::with_capacity(1))), 104 105 #[cfg(feature = "http3")] 106 h3_conn: Arc::new(crate::runtime::AsyncMutex::new(Vec::with_capacity(1))), 107 } 108 } 109 110 // fn get_alt_svcs 111 } 112 113 impl<S> Clone for Conns<S> { clonenull114 fn clone(&self) -> Self { 115 Self { 116 list: self.list.clone(), 117 118 #[cfg(feature = "http2")] 119 h2_conn: self.h2_conn.clone(), 120 121 #[cfg(feature = "http3")] 122 h3_conn: self.h3_conn.clone(), 123 } 124 } 125 } 126 127 impl<S: AsyncRead + AsyncWrite + ConnInfo + Unpin + Send + Sync + 'static> Conns<S> { 128 async fn conn<C>( 129 &mut self, 130 config: HttpConfig, 131 connector: Arc<C>, 132 url: &Uri, 133 #[cfg(feature = "http3")] alt_svc: Option<Vec<AltService>>, 134 ) -> Result<Conn<S>, HttpClientError> 135 where 136 C: Connector<Stream = S>, 137 { 138 match config.version { 139 #[cfg(feature = "http3")] 140 HttpVersion::Http3 => self.conn_h3(connector, url, config.http3_config).await, 141 #[cfg(feature = "http2")] 142 HttpVersion::Http2 => self.conn_h2(connector, url, config.http2_config).await, 143 #[cfg(feature = "http1_1")] 144 HttpVersion::Http1 => self.conn_h1(connector, url).await, 145 HttpVersion::Negotiate => { 146 #[cfg(feature = "http3")] 147 if let Some(conn) = self 148 .conn_alt_svc(&connector, url, alt_svc, config.http3_config) 149 .await 150 { 151 return Ok(conn); 152 } 153 154 #[cfg(all(feature = "http1_1", not(feature = "http2")))] 155 return self.conn_h1(connector, url).await; 156 157 #[cfg(all(feature = "http2", feature = "http1_1"))] 158 return self 159 .conn_negotiate(connector, url, config.http2_config) 160 .await; 161 } 162 } 163 } 164 165 async fn conn_h1<C>(&self, connector: Arc<C>, url: &Uri) -> Result<Conn<S>, HttpClientError> 166 where 167 C: Connector<Stream = S>, 168 { 169 if let Some(conn) = self.exist_h1_conn() { 170 return Ok(conn); 171 } 172 let dispatcher = ConnDispatcher::http1(connector.connect(url, HttpVersion::Http1).await?); 173 Ok(self.dispatch_h1_conn(dispatcher)) 174 } 175 176 #[cfg(feature = "http2")] 177 async fn conn_h2<C>( 178 &self, 179 connector: Arc<C>, 180 url: &Uri, 181 config: H2Config, 182 ) -> Result<Conn<S>, HttpClientError> 183 where 184 C: Connector<Stream = S>, 185 { 186 // The lock `h2_occupation` is used to prevent multiple coroutines from sending 187 // Requests at the same time under concurrent conditions, 188 // resulting in the creation of multiple tcp connections 189 let mut lock = self.h2_conn.lock().await; 190 191 if let Some(conn) = Self::exist_h2_conn(&mut lock) { 192 return Ok(conn); 193 } 194 let stream = connector.connect(url, HttpVersion::Http2).await?; 195 let details = stream.conn_detail(); 196 let tls = if let Some(scheme) = url.scheme() { 197 *scheme == Scheme::HTTPS 198 } else { 199 false 200 }; 201 match details.alpn() { 202 None if tls => return err_from_msg!(Connect, "The peer does not support http/2."), 203 Some(protocol) if protocol != b"h2" => { 204 return err_from_msg!(Connect, "Alpn negotiate a wrong protocol version.") 205 } 206 _ => {} 207 } 208 209 Ok(Self::dispatch_h2_conn(config, stream, &mut lock)) 210 } 211 212 #[cfg(feature = "http3")] 213 async fn conn_h3<C>( 214 &self, 215 connector: Arc<C>, 216 url: &Uri, 217 config: H3Config, 218 ) -> Result<Conn<S>, HttpClientError> 219 where 220 C: Connector<Stream = S>, 221 { 222 let mut lock = self.h3_conn.lock().await; 223 224 if let Some(conn) = Self::exist_h3_conn(&mut lock) { 225 return Ok(conn); 226 } 227 let mut stream = connector.connect(url, HttpVersion::Http3).await?; 228 let quic_conn = stream.quic_conn().ok_or(HttpClientError::from_str( 229 crate::ErrorKind::Connect, 230 "QUIC connect failed", 231 ))?; 232 233 Ok(Self::dispatch_h3_conn(config, stream, quic_conn, &mut lock)) 234 } 235 236 #[cfg(all(feature = "http2", feature = "http1_1"))] 237 async fn conn_negotiate<C>( 238 &self, 239 connector: Arc<C>, 240 url: &Uri, 241 h2_config: H2Config, 242 ) -> Result<Conn<S>, HttpClientError> 243 where 244 C: Connector<Stream = S>, 245 { 246 match *url.scheme().unwrap() { 247 Scheme::HTTPS => { 248 let mut lock = self.h2_conn.lock().await; 249 if let Some(conn) = Self::exist_h2_conn(&mut lock) { 250 return Ok(conn); 251 } 252 253 if let Some(conn) = self.exist_h1_conn() { 254 return Ok(conn); 255 } 256 257 let stream = connector.connect(url, HttpVersion::Negotiate).await?; 258 let details = stream.conn_detail(); 259 260 let protocol = if let Some(bytes) = details.alpn() { 261 bytes 262 } else { 263 let dispatcher = ConnDispatcher::http1(stream); 264 return Ok(self.dispatch_h1_conn(dispatcher)); 265 }; 266 267 if protocol == b"http/1.1" { 268 let dispatcher = ConnDispatcher::http1(stream); 269 Ok(self.dispatch_h1_conn(dispatcher)) 270 } else if protocol == b"h2" { 271 Ok(Self::dispatch_h2_conn(h2_config, stream, &mut lock)) 272 } else { 273 err_from_msg!(Connect, "Alpn negotiate a wrong protocol version.") 274 } 275 } 276 Scheme::HTTP => self.conn_h1(connector, url).await, 277 } 278 } 279 280 #[cfg(feature = "http3")] 281 async fn conn_alt_svc<C>( 282 &self, 283 connector: &Arc<C>, 284 url: &Uri, 285 alt_svcs: Option<Vec<AltService>>, 286 h3_config: H3Config, 287 ) -> Option<Conn<S>> 288 where 289 C: Connector<Stream = S>, 290 { 291 let mut lock = self.h3_conn.lock().await; 292 if let Some(conn) = Self::exist_h3_conn(&mut lock) { 293 return Some(conn); 294 } 295 if let Some(alt_svcs) = alt_svcs { 296 for alt_svc in alt_svcs { 297 // only support h3 alt_svc now 298 if alt_svc.http_version != HttpVersion::Http3 { 299 continue; 300 } 301 let scheme = Scheme::HTTPS; 302 let host = match alt_svc.host { 303 Some(ref host) => host.clone(), 304 None => url.host().cloned().unwrap(), 305 }; 306 let port = alt_svc.port.clone(); 307 let authority = 308 Authority::from_bytes((host.to_string() + ":" + port.as_str()).as_bytes()) 309 .ok()?; 310 let path = url.path().cloned(); 311 let query = url.query().cloned(); 312 let alt_url = Uri::from_raw_parts(Some(scheme), Some(authority), path, query); 313 let mut stream = connector.connect(&alt_url, HttpVersion::Http3).await.ok()?; 314 let quic_conn = stream.quic_conn().unwrap(); 315 return Some(Self::dispatch_h3_conn( 316 h3_config.clone(), 317 stream, 318 quic_conn, 319 &mut lock, 320 )); 321 } 322 } 323 None 324 } 325 dispatch_h1_connnull326 fn dispatch_h1_conn(&self, dispatcher: ConnDispatcher<S>) -> Conn<S> { 327 // We must be able to get the `Conn` here. 328 let conn = dispatcher.dispatch().unwrap(); 329 let mut list = self.list.lock().unwrap(); 330 list.push(dispatcher); 331 332 conn 333 } 334 335 #[cfg(feature = "http2")] dispatch_h2_connnull336 fn dispatch_h2_conn( 337 config: H2Config, 338 stream: S, 339 lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, 340 ) -> Conn<S> { 341 let dispatcher = ConnDispatcher::http2(config, stream); 342 let conn = dispatcher.dispatch().unwrap(); 343 lock.push(dispatcher); 344 conn 345 } 346 347 #[cfg(feature = "http3")] dispatch_h3_connnull348 fn dispatch_h3_conn( 349 config: H3Config, 350 stream: S, 351 quic_connection: QuicConn, 352 lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, 353 ) -> Conn<S> { 354 let dispatcher = ConnDispatcher::http3(config, stream, quic_connection); 355 let conn = dispatcher.dispatch().unwrap(); 356 lock.push(dispatcher); 357 conn 358 } 359 exist_h1_connnull360 fn exist_h1_conn(&self) -> Option<Conn<S>> { 361 let mut list = self.list.lock().unwrap(); 362 let mut conn = None; 363 let curr = take(&mut *list); 364 // TODO Distinguish between http2 connections and http1 connections. 365 for dispatcher in curr.into_iter() { 366 // Discard invalid dispatchers. 367 if dispatcher.is_shutdown() { 368 continue; 369 } 370 if conn.is_none() { 371 conn = dispatcher.dispatch(); 372 } 373 list.push(dispatcher); 374 } 375 conn 376 } 377 378 #[cfg(feature = "http2")] exist_h2_connnull379 fn exist_h2_conn( 380 lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, 381 ) -> Option<Conn<S>> { 382 if let Some(dispatcher) = lock.pop() { 383 // todo: shutdown and goaway 384 if !dispatcher.is_shutdown() { 385 if let Some(conn) = dispatcher.dispatch() { 386 lock.push(dispatcher); 387 return Some(conn); 388 } 389 } 390 } 391 None 392 } 393 394 #[cfg(feature = "http3")] exist_h3_connnull395 fn exist_h3_conn( 396 lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, 397 ) -> Option<Conn<S>> { 398 if let Some(dispatcher) = lock.pop() { 399 if dispatcher.is_shutdown() { 400 return None; 401 } 402 if !dispatcher.is_goaway() { 403 if let Some(conn) = dispatcher.dispatch() { 404 lock.push(dispatcher); 405 return Some(conn); 406 } 407 } 408 // Not all requests have been processed yet 409 lock.push(dispatcher); 410 } 411 None 412 } 413 } 414