1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 //! Stream Manager module. 15 16 use std::future::Future; 17 use std::pin::Pin; 18 use std::sync::atomic::{AtomicBool, Ordering}; 19 use std::sync::{Arc, Mutex}; 20 use std::task::{Context, Poll}; 21 22 use quiche::Shutdown; 23 use ylong_http::h3::{ 24 Frame, FrameDecoder, FrameEncoder, FrameKind, Frames, H3Error, H3ErrorCode, Headers, Payload, 25 Settings, StreamMessage, CONTROL_STREAM_TYPE, QPACK_DECODER_STREAM_TYPE, 26 QPACK_ENCODER_STREAM_TYPE, SETTINGS_FRAME_TYPE, 27 }; 28 29 use crate::async_impl::QuicConn; 30 use crate::runtime::{UnboundedReceiver, UnboundedSender}; 31 use crate::util::config::H3Config; 32 use crate::util::dispatcher::http3::{DispatchErrorKind, ReqMessage, RespMessage}; 33 use crate::util::h3::streams::{DataReadState, QUICStreamType, Streams}; 34 35 pub(crate) const UPD_RECV_BUF_SIZE: usize = 65535; 36 const DECODE_BUF_SIZE: usize = 1024; 37 38 pub(crate) struct StreamManager { 39 pub(crate) streams: Streams, 40 pub(crate) quic_conn: Arc<Mutex<QuicConn>>, 41 pub(crate) io_manager_tx: UnboundedSender<Result<(), DispatchErrorKind>>, 42 pub(crate) stream_manager_rx: UnboundedReceiver<Result<(), DispatchErrorKind>>, 43 pub(crate) req_rx: UnboundedReceiver<ReqMessage>, 44 pub(crate) stream_recv_buf: [u8; UPD_RECV_BUF_SIZE], 45 pub(crate) encoder: FrameEncoder, 46 pub(crate) decoder: FrameDecoder, 47 pub(crate) encoder_buf: [u8; DECODE_BUF_SIZE], 48 pub(crate) inst_buf: [u8; DECODE_BUF_SIZE], 49 pub(crate) peer_settings: Option<Settings>, 50 pub(crate) io_shutdown: Arc<AtomicBool>, 51 pub(crate) io_goaway: Arc<AtomicBool>, 52 } 53 54 impl StreamManager { 55 pub(crate) fn new( 56 quic_conn: Arc<Mutex<QuicConn>>, 57 io_manager_tx: UnboundedSender<Result<(), DispatchErrorKind>>, 58 stream_manager_rx: UnboundedReceiver<Result<(), DispatchErrorKind>>, 59 req_rx: UnboundedReceiver<ReqMessage>, 60 decoder: FrameDecoder, 61 io_shutdown: Arc<AtomicBool>, 62 io_goaway: Arc<AtomicBool>, 63 ) -> Self { 64 Self { 65 streams: Streams::new(), 66 quic_conn, 67 io_manager_tx, 68 stream_manager_rx, 69 req_rx, 70 stream_recv_buf: [0u8; UPD_RECV_BUF_SIZE], 71 encoder_buf: [0u8; DECODE_BUF_SIZE], 72 inst_buf: [0u8; DECODE_BUF_SIZE], 73 encoder: FrameEncoder::default(), 74 decoder, 75 peer_settings: None, 76 io_shutdown, 77 io_goaway, 78 } 79 } 80 poll_recv_signalnull81 fn poll_recv_signal( 82 &mut self, 83 cx: &mut Context<'_>, 84 ) -> Poll<Result<Result<(), DispatchErrorKind>, DispatchErrorKind>> { 85 #[cfg(feature = "tokio_base")] 86 match self.stream_manager_rx.poll_recv(cx) { 87 Poll::Ready(None) => Poll::Ready(Err(DispatchErrorKind::ChannelClosed)), 88 Poll::Ready(Some(data)) => Poll::Ready(Ok(data)), 89 Poll::Pending => Poll::Pending, 90 } 91 #[cfg(feature = "ylong_base")] 92 match self.stream_manager_rx.poll_recv(cx) { 93 Poll::Ready(Err(_e)) => Poll::Ready(Err(DispatchErrorKind::ChannelClosed)), 94 Poll::Ready(Ok(data)) => Poll::Ready(Ok(data)), 95 Poll::Pending => Poll::Pending, 96 } 97 } 98 poll_recv_requestnull99 fn poll_recv_request( 100 &mut self, 101 cx: &mut Context<'_>, 102 ) -> Poll<Result<ReqMessage, DispatchErrorKind>> { 103 #[cfg(feature = "tokio_base")] 104 match self.req_rx.poll_recv(cx) { 105 Poll::Ready(None) => Poll::Ready(Err(DispatchErrorKind::ChannelClosed)), 106 Poll::Ready(Some(data)) => Poll::Ready(Ok(data)), 107 Poll::Pending => Poll::Pending, 108 } 109 #[cfg(feature = "ylong_base")] 110 match self.req_rx.poll_recv(cx) { 111 Poll::Ready(Err(_e)) => Poll::Ready(Err(DispatchErrorKind::ChannelClosed)), 112 Poll::Ready(Ok(data)) => Poll::Ready(Ok(data)), 113 Poll::Pending => Poll::Pending, 114 } 115 } 116 send_inst_to_peernull117 fn send_inst_to_peer( 118 &mut self, 119 headers: &Headers, 120 quic_conn: &mut QuicConn, 121 ) -> Result<(), DispatchErrorKind> { 122 if let Some(vec) = headers.get_instruction() { 123 let qpack_decode_stream_id = 124 self.streams 125 .qpack_decode_stream_id() 126 .ok_or(DispatchErrorKind::H3(H3Error::Connection( 127 H3ErrorCode::H3InternalError, 128 )))?; 129 quic_conn.stream_send(qpack_decode_stream_id, vec, false)?; 130 } 131 Ok(()) 132 } 133 transmit_errornull134 fn transmit_error( 135 &mut self, 136 cx: &mut Context<'_>, 137 id: u64, 138 error: DispatchErrorKind, 139 ) -> Result<(), DispatchErrorKind> { 140 self.streams.send_error(cx, id, error) 141 } 142 poll_input_requestnull143 fn poll_input_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind> { 144 self.streams.try_consume_pending_concurrency(); 145 let len = self.streams.pending_stream_len(); 146 // Some streams may be blocked due to the server not reading the message. Avoid 147 // reading these streams twice in one loop 148 for _ in 0..len { 149 if let Some(id) = self.streams.next_stream() { 150 self.input_stream_frame(cx, id)?; 151 } else { 152 break; 153 } 154 } 155 Ok(()) 156 } 157 input_stream_framenull158 fn input_stream_frame( 159 &mut self, 160 cx: &mut Context<'_>, 161 id: u64, 162 ) -> Result<(), DispatchErrorKind> { 163 if let Some(header) = self.streams.get_header(id)? { 164 self.poll_send_header(id, header)?; 165 } 166 167 // encoding means last frame is still encoding, can not create new frame before 168 // consumed. 169 if self.streams.encoding(id)? { 170 if let Err(e) = self.poll_send_frame(id, None) { 171 return match e { 172 DispatchErrorKind::Quic(quiche::Error::StreamStopped(_)) => Ok(()), 173 e => Err(e), 174 }; 175 } 176 if self.streams.encoding(id)? { 177 self.streams.push_back_pending_send(id); 178 return Ok(()); 179 } 180 } 181 182 loop { 183 match self.poll_read_body(cx, id)? { 184 DataReadState::Closed | DataReadState::Pending => { 185 break; 186 } 187 DataReadState::Ready(data) => { 188 if let Err(e) = self.poll_send_frame(id, Some(*data)) { 189 return match e { 190 DispatchErrorKind::Quic(quiche::Error::StreamStopped(_)) => Ok(()), 191 e => Err(e), 192 }; 193 } 194 if self.streams.encoding(id)? { 195 self.streams.push_back_pending_send(id); 196 break; 197 } 198 } 199 DataReadState::Finish => { 200 let mut quic_conn = self.quic_conn.lock().unwrap(); 201 quic_conn.stream_send(id, b"", true)?; 202 let _ = self.io_manager_tx.send(Ok(())); 203 break; 204 } 205 } 206 } 207 Ok(()) 208 } 209 poll_send_headernull210 fn poll_send_header(&mut self, id: u64, frame: Frame) -> Result<(), DispatchErrorKind> { 211 self.streams.set_encoding(id, true)?; 212 self.encoder.set_frame(id, frame)?; 213 let quic_conn = self.quic_conn.clone(); 214 let qpack_encode_stream_id = 215 self.streams 216 .qpack_encode_stream_id() 217 .ok_or(DispatchErrorKind::H3(H3Error::Connection( 218 H3ErrorCode::H3InternalError, 219 )))?; 220 let mut quic_conn = quic_conn.lock().unwrap(); 221 222 // invalid means stream has not been created, create it first 223 if let Err(quiche::Error::InvalidStreamState(_)) = 224 quic_conn.stream_writable(id, DECODE_BUF_SIZE) 225 { 226 quic_conn.stream_send(id, b"", false)?; 227 } 228 while quic_conn.stream_writable(id, DECODE_BUF_SIZE)? 229 && quic_conn.stream_writable(qpack_encode_stream_id, DECODE_BUF_SIZE)? 230 { 231 let (data_size, inst_size) = 232 self.encoder 233 .encode(id, &mut self.encoder_buf, &mut self.inst_buf)?; 234 if inst_size != 0 { 235 quic_conn.stream_send( 236 qpack_encode_stream_id, 237 &self.inst_buf[..inst_size], 238 false, 239 )?; 240 } 241 if data_size != 0 { 242 quic_conn.stream_send(id, &self.encoder_buf[..data_size], false)?; 243 } 244 if inst_size == 0 && data_size == 0 { 245 self.streams.set_encoding(id, false)?; 246 break; 247 } 248 } 249 250 let _ = self.io_manager_tx.send(Ok(())); 251 Ok(()) 252 } 253 poll_send_framenull254 fn poll_send_frame(&mut self, id: u64, frame: Option<Frame>) -> Result<(), DispatchErrorKind> { 255 if let Some(frame) = frame { 256 self.streams.set_encoding(id, true)?; 257 self.encoder.set_frame(id, frame)?; 258 } 259 let mut quic_conn = self.quic_conn.lock().unwrap(); 260 261 loop { 262 if !quic_conn.stream_writable(id, DECODE_BUF_SIZE)? { 263 break; 264 } 265 let (data_size, _) = 266 self.encoder 267 .encode(id, &mut self.encoder_buf, &mut self.inst_buf)?; 268 if data_size != 0 { 269 quic_conn.stream_send(id, &self.encoder_buf[..data_size], false)?; 270 let _ = self.io_manager_tx.send(Ok(())); 271 } else { 272 self.streams.set_encoding(id, false)?; 273 break; 274 } 275 } 276 Ok(()) 277 } 278 279 pub(crate) fn poll_read_body( 280 &mut self, 281 cx: &mut Context<'_>, 282 id: u64, 283 ) -> Result<DataReadState, DispatchErrorKind> { 284 const DEFAULT_MAX_FRAME_SIZE: usize = 16 * 1024; 285 let len = std::cmp::min( 286 self.quic_conn 287 .lock() 288 .unwrap() 289 .stream_capacity(id) 290 .map_err(|_| { 291 DispatchErrorKind::H3(H3Error::Stream(id, H3ErrorCode::H3InternalError)) 292 })?, 293 DEFAULT_MAX_FRAME_SIZE, 294 ); 295 let mut buf = [0u8; DEFAULT_MAX_FRAME_SIZE]; 296 self.streams.poll_sized_data(cx, id, &mut buf[..len]) 297 } 298 299 pub(crate) fn init(&mut self, config: H3Config) -> Result<(), DispatchErrorKind> { 300 self.decoder 301 .local_allowed_max_field_section_size(config.max_field_section_size() as usize); 302 self.send_settings(config)?; 303 self.open_uni_stream(QPACK_ENCODER_STREAM_TYPE)?; 304 self.open_uni_stream(QPACK_DECODER_STREAM_TYPE)?; 305 Ok(()) 306 } 307 308 pub(crate) fn open_uni_stream(&mut self, stream_type: u8) -> Result<u64, DispatchErrorKind> { 309 let buf = [stream_type]; 310 let id = match stream_type { 311 CONTROL_STREAM_TYPE => { 312 self.streams 313 .control_stream_id() 314 .ok_or(DispatchErrorKind::H3(H3Error::Connection( 315 H3ErrorCode::H3InternalError, 316 )))? 317 } 318 QPACK_ENCODER_STREAM_TYPE => { 319 self.streams 320 .qpack_encode_stream_id() 321 .ok_or(DispatchErrorKind::H3(H3Error::Connection( 322 H3ErrorCode::H3InternalError, 323 )))? 324 } 325 QPACK_DECODER_STREAM_TYPE => { 326 self.streams 327 .qpack_decode_stream_id() 328 .ok_or(DispatchErrorKind::H3(H3Error::Connection( 329 H3ErrorCode::H3InternalError, 330 )))? 331 } 332 _ => { 333 return Err(DispatchErrorKind::H3(H3Error::Connection( 334 H3ErrorCode::H3InternalError, 335 ))) 336 } 337 }; 338 let mut quic_conn = self.quic_conn.lock().unwrap(); 339 340 quic_conn.stream_send(id, &buf, false)?; 341 let _ = quic_conn.stream_priority(id, 0, false); 342 Ok(id) 343 } 344 345 pub(crate) fn send_settings(&mut self, config: H3Config) -> Result<(), DispatchErrorKind> { 346 let control_stream_id = self.open_uni_stream(CONTROL_STREAM_TYPE)?; 347 348 let mut settings = Settings::default(); 349 settings.set_max_field_section_size(config.max_field_section_size()); 350 settings.set_qpack_max_table_capacity(config.qpack_max_table_capacity()); 351 settings.set_qpack_block_stream(config.qpack_blocked_streams()); 352 353 let mut quic_conn = self.quic_conn.lock().unwrap(); 354 let settings = Frame::new(SETTINGS_FRAME_TYPE, Payload::Settings(settings)); 355 self.encoder.set_frame(control_stream_id, settings)?; 356 loop { 357 let (size, _) = self.encoder.encode( 358 control_stream_id, 359 &mut self.encoder_buf, 360 &mut self.inst_buf, 361 )?; 362 if size == 0 { 363 return Ok(()); 364 } 365 quic_conn.stream_send(control_stream_id, &self.encoder_buf[..size], false)?; 366 } 367 } 368 369 pub(crate) fn poll_stream_recv( 370 &mut self, 371 cx: &mut Context<'_>, 372 ) -> Result<(), DispatchErrorKind> { 373 let mut need_send = false; 374 let lock = self.quic_conn.clone(); 375 let mut quic_conn = lock.lock().unwrap(); 376 377 if let Some(stream_id) = self.streams.peer_control_stream_id() { 378 need_send |= self.try_recv_uni_stream(cx, &mut quic_conn, stream_id)?; 379 }; 380 if let Some(stream_id) = self.streams.peer_qpack_encode_stream_id() { 381 need_send |= self.try_recv_uni_stream(cx, &mut quic_conn, stream_id)?; 382 }; 383 if let Some(stream_id) = self.streams.peer_qpack_decode_stream_id() { 384 need_send |= self.try_recv_uni_stream(cx, &mut quic_conn, stream_id)?; 385 }; 386 for id in quic_conn.readable() { 387 if !self.streams.frame_acceptable(id) { 388 continue; 389 } 390 need_send |= self.read_stream(cx, &mut quic_conn, id)?; 391 } 392 393 if quic_conn.is_closed() { 394 self.shutdown(cx, &DispatchErrorKind::Disconnect); 395 } 396 397 if need_send { 398 let _ = self.io_manager_tx.send(Ok(())); 399 } 400 Ok(()) 401 } 402 try_recv_uni_streamnull403 fn try_recv_uni_stream( 404 &mut self, 405 cx: &mut Context<'_>, 406 quic_conn: &mut QuicConn, 407 stream_id: u64, 408 ) -> Result<bool, DispatchErrorKind> { 409 if quic_conn.stream_finished(stream_id) { 410 return Err(DispatchErrorKind::H3(H3Error::Connection( 411 H3ErrorCode::H3ClosedCriticalStream, 412 ))); 413 } 414 415 match self.read_stream(cx, quic_conn, stream_id) { 416 Ok(need_send) => { 417 if quic_conn.stream_finished(stream_id) { 418 return Err(DispatchErrorKind::H3(H3Error::Connection( 419 H3ErrorCode::H3ClosedCriticalStream, 420 ))); 421 } 422 Ok(need_send) 423 } 424 Err(e) => Err(e), 425 } 426 } 427 read_streamnull428 fn read_stream( 429 &mut self, 430 cx: &mut Context<'_>, 431 quic_conn: &mut QuicConn, 432 id: u64, 433 ) -> Result<bool, DispatchErrorKind> { 434 if QUICStreamType::from(id) == QUICStreamType::ServerInitialBidirectional { 435 return Err(DispatchErrorKind::H3(H3Error::Connection( 436 H3ErrorCode::H3StreamCreationError, 437 ))); 438 } 439 let mut need_send = false; 440 loop { 441 let (size, fin) = match quic_conn.stream_recv(id, &mut self.stream_recv_buf) { 442 Ok((size, fin)) => { 443 need_send = true; 444 (size, fin) 445 } 446 Err(quiche::Error::Done) => { 447 return Ok(need_send); 448 } 449 Err(quiche::Error::StreamStopped(err)) | Err(quiche::Error::StreamReset(err)) => { 450 if err != H3ErrorCode::H3NoError as u64 { 451 return Err(DispatchErrorKind::H3(H3Error::Stream(id, err.into()))); 452 } else { 453 return Ok(false); 454 } 455 } 456 Err(e) => { 457 return Err(DispatchErrorKind::Quic(e)); 458 } 459 }; 460 self.process_recv_data(cx, id, size, quic_conn)?; 461 if fin { 462 self.finish_stream(cx, id)?; 463 return Ok(true); 464 } 465 } 466 } 467 process_recv_datanull468 fn process_recv_data( 469 &mut self, 470 cx: &mut Context<'_>, 471 id: u64, 472 size: usize, 473 quic_conn: &mut QuicConn, 474 ) -> Result<(), DispatchErrorKind> { 475 let mut stream_id = id; 476 let mut size = size; 477 loop { 478 match self 479 .decoder 480 .decode(stream_id, &self.stream_recv_buf[..size]) 481 { 482 Ok(StreamMessage::Request(frames)) => { 483 self.recv_request_stream(cx, stream_id, frames, quic_conn)?; 484 } 485 Ok(StreamMessage::Push(_id, _frames)) => { 486 // MAX_PUSH_ID not send, Push Stream means error 487 return Err(DispatchErrorKind::H3(H3Error::Connection( 488 H3ErrorCode::H3IdError, 489 ))); 490 } 491 Ok(StreamMessage::QpackDecoder(order)) => { 492 self.recv_qpack_decode_stream(stream_id, order)?; 493 } 494 Ok(StreamMessage::Control(frames)) => { 495 self.recv_control_stream(cx, stream_id, frames)?; 496 } 497 Ok(StreamMessage::WaitingMore) | Ok(StreamMessage::Unknown) => {} 498 Ok(StreamMessage::QpackEncoder(vec)) => { 499 self.recv_qpack_encode_stream(stream_id, vec)?; 500 } 501 Err(e) => { 502 self.transmit_error(cx, stream_id, DispatchErrorKind::H3(e))?; 503 } 504 } 505 if let Some(id) = self.streams.get_resume_stream_id() { 506 stream_id = id; 507 } else { 508 return Ok(()); 509 }; 510 size = 0; 511 } 512 } 513 recv_qpack_encode_streamnull514 fn recv_qpack_encode_stream( 515 &mut self, 516 stream_id: u64, 517 vec: Vec<u64>, 518 ) -> Result<(), DispatchErrorKind> { 519 self.streams.set_peer_qpack_encode_stream_id(stream_id)?; 520 for resume_id in vec { 521 self.streams.resume_stream_recv(resume_id); 522 } 523 Ok(()) 524 } 525 recv_request_streamnull526 fn recv_request_stream( 527 &mut self, 528 cx: &mut Context<'_>, 529 id: u64, 530 frames: Frames, 531 quic_conn: &mut QuicConn, 532 ) -> Result<(), DispatchErrorKind> { 533 for kind in frames.into_iter() { 534 let frame = match kind { 535 FrameKind::Complete(frame) => frame, 536 FrameKind::Blocked => { 537 self.streams.pend_stream_recv(id); 538 return Ok(()); 539 } 540 FrameKind::Partial => return Ok(()), 541 }; 542 match frame.payload() { 543 Payload::Headers(headers) => { 544 self.send_inst_to_peer(headers, quic_conn)?; 545 self.streams.send_frame(cx, id, *frame)?; 546 } 547 Payload::Data(_) => { 548 self.streams.send_frame(cx, id, *frame)?; 549 } 550 Payload::PushPromise(_) => { 551 return Err(DispatchErrorKind::H3(H3Error::Connection( 552 H3ErrorCode::H3IdError, 553 ))) 554 } 555 _ => { 556 return Err(DispatchErrorKind::H3(H3Error::Connection( 557 H3ErrorCode::H3FrameUnexpected, 558 ))) 559 } 560 } 561 } 562 Ok(()) 563 } 564 recv_control_streamnull565 fn recv_control_stream( 566 &mut self, 567 cx: &mut Context<'_>, 568 id: u64, 569 frames: Frames, 570 ) -> Result<(), DispatchErrorKind> { 571 let mut is_first_frame = if let Some(stream_id) = self.streams.peer_control_stream_id() { 572 if stream_id != id { 573 return Err(DispatchErrorKind::H3(H3Error::Connection( 574 H3ErrorCode::H3StreamCreationError, 575 ))); 576 } 577 false 578 } else { 579 self.streams.set_peer_control_stream_id(id)?; 580 true 581 }; 582 for frame in frames.iter() { 583 let FrameKind::Complete(frame) = frame else { 584 continue; 585 }; 586 match frame.payload() { 587 Payload::Settings(settings) => { 588 self.recv_setting_frame(settings)?; 589 is_first_frame = false; 590 } 591 Payload::Goaway(goaway) => { 592 self.recv_goaway_frame(cx, *goaway.get_id())?; 593 } 594 Payload::CancelPush(_cancel) => { 595 return Err(DispatchErrorKind::H3(H3Error::Connection( 596 H3ErrorCode::H3IdError, 597 ))); 598 } 599 _ => { 600 return Err(DispatchErrorKind::H3(H3Error::Connection( 601 H3ErrorCode::H3FrameUnexpected, 602 ))); 603 } 604 } 605 if is_first_frame { 606 return Err(DispatchErrorKind::H3(H3Error::Connection( 607 H3ErrorCode::H3MissingSettings, 608 ))); 609 } 610 } 611 Ok(()) 612 } 613 recv_qpack_decode_streamnull614 fn recv_qpack_decode_stream( 615 &mut self, 616 stream_id: u64, 617 order: Vec<u8>, 618 ) -> Result<(), DispatchErrorKind> { 619 self.streams.set_peer_qpack_decode_stream_id(stream_id)?; 620 self.encoder.decode_remote_inst(&order)?; 621 Ok(()) 622 } 623 recv_setting_framenull624 fn recv_setting_frame(&mut self, settings: &Settings) -> Result<(), DispatchErrorKind> { 625 if self.peer_settings.is_some() { 626 return Err(DispatchErrorKind::H3(H3Error::Connection( 627 H3ErrorCode::H3FrameUnexpected, 628 ))); 629 } 630 self.peer_settings = Some(settings.clone()); 631 if let Some(value) = settings.qpack_max_table_capacity() { 632 self.encoder.set_max_table_capacity(value as usize)?; 633 } 634 if let Some(value) = settings.qpack_block_stream() { 635 self.encoder.set_max_blocked_stream_size(value as usize); 636 } 637 Ok(()) 638 } 639 recv_goaway_framenull640 fn recv_goaway_frame( 641 &mut self, 642 cx: &mut Context<'_>, 643 goaway_id: u64, 644 ) -> Result<(), DispatchErrorKind> { 645 self.io_goaway.store(true, Ordering::Relaxed); 646 self.req_rx.close(); 647 self.streams.goaway(cx, goaway_id)?; 648 Ok(()) 649 } 650 handle_errornull651 fn handle_error(&mut self, cx: &mut Context<'_>, err: &DispatchErrorKind) -> bool { 652 match err { 653 DispatchErrorKind::H3(H3Error::Stream(id, e)) => { 654 self.handle_stream_error(cx, *id, e); 655 false 656 } 657 DispatchErrorKind::Quic(quiche::Error::InvalidStreamState(id)) => { 658 self.handle_stream_error(cx, *id, &H3ErrorCode::H3NoError); 659 false 660 } 661 err => { 662 self.handle_connection_error(cx, err); 663 true 664 } 665 } 666 } 667 handle_stream_errornull668 fn handle_stream_error(&mut self, cx: &mut Context<'_>, id: u64, err: &H3ErrorCode) { 669 let _ = self 670 .quic_conn 671 .lock() 672 .unwrap() 673 .stream_shutdown(id, Shutdown::Read, *err as u64); 674 self.streams.shutdown_stream(cx, id, err); 675 } 676 handle_connection_errornull677 fn handle_connection_error(&mut self, cx: &mut Context<'_>, err: &DispatchErrorKind) { 678 self.shutdown(cx, err); 679 let err = match err { 680 DispatchErrorKind::H3(H3Error::Connection(err)) => *err, 681 _ => H3ErrorCode::H3InternalError, 682 }; 683 let _ = self.quic_conn.lock().unwrap().close(true, err as u64, b""); 684 let _ = self.io_manager_tx.send(Ok(())); 685 self.req_rx.close(); 686 } 687 shutdownnull688 fn shutdown(&mut self, cx: &mut Context<'_>, err: &DispatchErrorKind) { 689 self.io_shutdown.store(true, Ordering::Relaxed); 690 self.streams.shutdown(cx, err); 691 } 692 finish_streamnull693 fn finish_stream(&mut self, cx: &mut Context<'_>, id: u64) -> Result<(), DispatchErrorKind> { 694 self.streams.finish_stream(cx, id)?; 695 self.encoder.finish_stream(id)?; 696 self.decoder.finish_stream(id)?; 697 if self.streams.goaway_id().is_some() && self.streams.current_concurrency() == 0 { 698 self.io_shutdown.store(true, Ordering::Relaxed); 699 } 700 Ok(()) 701 } 702 703 pub(crate) fn poll_blocked_message( 704 &mut self, 705 cx: &mut Context<'_>, 706 ) -> Poll<Result<(), DispatchErrorKind>> { 707 self.streams.poll_blocked_message(cx) 708 } 709 } 710 711 impl Future for StreamManager { 712 type Output = Result<(), DispatchErrorKind>; 713 pollnull714 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 715 let this = self.get_mut(); 716 loop { 717 // 1 recv stream_manager_rx, meaning data to send/recv 718 match this.poll_recv_signal(cx) { 719 // consume all the signals 720 Poll::Ready(Ok(Ok(()))) => continue, 721 Poll::Ready(Ok(Err(e))) | Poll::Ready(Err(e)) => { 722 if this.handle_error(cx, &e) { 723 return Poll::Ready(Err(e)); 724 } 725 } 726 Poll::Pending => {} 727 } 728 729 // 2 check id's channel sendable / control/qpack/push, decode, send or cache 730 // frame if stream_recv, send io_manager_tx to io manager 731 if let Err(e) = this.poll_stream_recv(cx) { 732 if this.handle_error(cx, &e) { 733 return Poll::Ready(Err(e)); 734 } 735 } 736 737 if let Poll::Ready(Err(e)) = this.poll_blocked_message(cx) { 738 if this.handle_error(cx, &e) { 739 return Poll::Ready(Err(e)); 740 } 741 } 742 743 // 3 recv req_rx, check concurrency 744 loop { 745 let req = match this.poll_recv_request(cx) { 746 Poll::Ready(Ok(req)) => req, 747 Poll::Ready(Err(e)) => { 748 if this.handle_error(cx, &e) { 749 return Poll::Ready(Err(e)); 750 } 751 break; 752 } 753 Poll::Pending => break, 754 }; 755 if let Err(e) = this.streams.new_unidirectional_stream( 756 req.request.header, 757 req.request.data, 758 req.frame_tx.clone(), 759 ) { 760 let _ = req.frame_tx.try_send(RespMessage::OutputExit(e)); 761 } 762 } 763 764 // 4 in concurrency stream, set frame to encoder, set encoding flag, get encode 765 // result send flag to io manager 766 if let Err(e) = this.poll_input_request(cx) { 767 if this.handle_error(cx, &e) { 768 return Poll::Ready(Err(e)); 769 } 770 } 771 return Poll::Pending; 772 } 773 } 774 } 775