1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 //! Streams manage coroutine. 15 16 use std::future::Future; 17 use std::pin::Pin; 18 use std::sync::{Arc, Mutex}; 19 use std::task::{Context, Poll}; 20 21 use ylong_http::h2::{ 22 ErrorCode, Frame, FrameFlags, Goaway, H2Error, Payload, Ping, RstStream, Setting, StreamId, 23 }; 24 25 use crate::runtime::{BoundedReceiver, UnboundedReceiver, UnboundedSender}; 26 use crate::util::dispatcher::http2::{ 27 DispatchErrorKind, OutputMessage, ReqMessage, RespMessage, SettingsState, SettingsSync, 28 StreamController, 29 }; 30 use crate::util::h2::streams::{DataReadState, FrameRecvState, StreamEndState}; 31 32 #[derive(Copy, Clone)] 33 enum ManagerState { 34 Send, 35 Receive, 36 Exit(DispatchErrorKind), 37 } 38 39 pub(crate) struct ConnManager { 40 state: ManagerState, 41 next_state: ManagerState, 42 // Synchronize SETTINGS frames sent by the client. 43 settings: Arc<Mutex<SettingsSync>>, 44 // channel transmitter between manager and io input. 45 input_tx: UnboundedSender<Frame>, 46 // channel receiver between manager and io output. 47 resp_rx: BoundedReceiver<OutputMessage>, 48 // channel receiver between manager and stream coroutine. 49 req_rx: UnboundedReceiver<ReqMessage>, 50 controller: StreamController, 51 handshakes: HandShakes, 52 } 53 54 struct HandShakes { 55 local: bool, 56 peer: bool, 57 } 58 59 impl Future for ConnManager { 60 type Output = Result<(), DispatchErrorKind>; 61 pollnull62 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 63 let manager = self.get_mut(); 64 loop { 65 match manager.state { 66 ManagerState::Send => { 67 if manager.poll_blocked_frames(cx).is_pending() { 68 return Poll::Pending; 69 } 70 } 71 ManagerState::Receive => { 72 // Receives a response frame from io output. 73 match manager.resp_rx.poll_recv(cx) { 74 #[cfg(feature = "tokio_base")] 75 Poll::Ready(Some(message)) => match message { 76 OutputMessage::Output(frame) => { 77 if manager.poll_recv_message(cx, frame)?.is_pending() { 78 return Poll::Pending; 79 } 80 } 81 // io output occurs error. 82 OutputMessage::OutputExit(e) => { 83 // Note error returned immediately. 84 if manager.manage_resp_error(cx, e)?.is_pending() { 85 return Poll::Pending; 86 } 87 } 88 }, 89 #[cfg(feature = "ylong_base")] 90 Poll::Ready(Ok(message)) => match message { 91 OutputMessage::Output(frame) => { 92 if manager.poll_recv_message(cx, frame)?.is_pending() { 93 return Poll::Pending; 94 } 95 } 96 // io output occurs error. 97 OutputMessage::OutputExit(e) => { 98 if manager.manage_resp_error(cx, e)?.is_pending() { 99 return Poll::Pending; 100 } 101 } 102 }, 103 #[cfg(feature = "tokio_base")] 104 Poll::Ready(None) => { 105 return manager.poll_channel_closed_exit(cx); 106 } 107 #[cfg(feature = "ylong_base")] 108 Poll::Ready(Err(_e)) => { 109 return manager.poll_channel_closed_exit(cx); 110 } 111 112 Poll::Pending => { 113 // TODO manage error state. 114 return manager.manage_pending_state(cx); 115 } 116 } 117 } 118 ManagerState::Exit(e) => return Poll::Ready(Err(e)), 119 } 120 } 121 } 122 } 123 124 impl ConnManager { 125 pub(crate) fn new( 126 settings: Arc<Mutex<SettingsSync>>, 127 input_tx: UnboundedSender<Frame>, 128 resp_rx: BoundedReceiver<OutputMessage>, 129 req_rx: UnboundedReceiver<ReqMessage>, 130 controller: StreamController, 131 ) -> Self { 132 Self { 133 state: ManagerState::Receive, 134 next_state: ManagerState::Receive, 135 settings, 136 input_tx, 137 resp_rx, 138 req_rx, 139 controller, 140 handshakes: HandShakes { 141 local: false, 142 peer: false, 143 }, 144 } 145 } 146 manage_pending_statenull147 fn manage_pending_state( 148 &mut self, 149 cx: &mut Context<'_>, 150 ) -> Poll<Result<(), DispatchErrorKind>> { 151 // The manager previously accepted a GOAWAY Frame. 152 if let Some(code) = self.controller.recved_go_away { 153 self.poll_deal_with_go_away(code)?; 154 } 155 self.controller.streams.window_update_conn(&self.input_tx)?; 156 self.controller 157 .streams 158 .window_update_streams(&self.input_tx)?; 159 self.poll_recv_request(cx)?; 160 if self.handshakes.local && self.handshakes.peer { 161 self.poll_input_request(cx)?; 162 } 163 Poll::Pending 164 } 165 poll_recv_requestnull166 fn poll_recv_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind> { 167 loop { 168 #[cfg(feature = "tokio_base")] 169 let message = match self.req_rx.poll_recv(cx) { 170 Poll::Ready(Some(message)) => message, 171 Poll::Ready(None) => return Err(DispatchErrorKind::ChannelClosed), 172 Poll::Pending => break, 173 }; 174 #[cfg(feature = "ylong_base")] 175 let message = match self.req_rx.poll_recv(cx) { 176 Poll::Ready(Ok(message)) => message, 177 Poll::Ready(Err(_e)) => return Err(DispatchErrorKind::ChannelClosed), 178 Poll::Pending => break, 179 }; 180 let id = match self.controller.streams.generate_id() { 181 Ok(id) => id, 182 Err(e) => { 183 let _ = message.sender.try_send(RespMessage::OutputExit(e)); 184 break; 185 } 186 }; 187 let headers = Frame::new(id, message.request.flag, message.request.payload); 188 if self.controller.streams.reach_max_concurrency() 189 || !self.controller.streams.is_pending_concurrency_empty() 190 { 191 self.controller.streams.push_pending_concurrency(id) 192 } else { 193 self.controller.streams.increase_current_concurrency(); 194 self.controller.streams.push_back_pending_send(id) 195 } 196 self.controller.senders.insert(id, message.sender); 197 self.controller 198 .streams 199 .insert(id, headers, message.request.data); 200 } 201 Ok(()) 202 } 203 poll_input_requestnull204 fn poll_input_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind> { 205 self.controller.streams.try_consume_pending_concurrency(); 206 let size = self.controller.streams.pending_stream_num(); 207 let mut index = 0; 208 while index < size { 209 match self.controller.streams.next_pending_stream() { 210 None => { 211 break; 212 } 213 Some(id) => { 214 self.input_stream_frame(cx, id)?; 215 } 216 } 217 index += 1; 218 } 219 Ok(()) 220 } 221 input_stream_framenull222 fn input_stream_frame( 223 &mut self, 224 cx: &mut Context<'_>, 225 id: StreamId, 226 ) -> Result<(), DispatchErrorKind> { 227 match self.controller.streams.headers(id)? { 228 None => {} 229 Some(header) => { 230 self.poll_send_frame(header)?; 231 } 232 } 233 234 loop { 235 match self.controller.streams.poll_read_body(cx, id)? { 236 DataReadState::Closed => { 237 break; 238 } 239 DataReadState::Pending => { 240 break; 241 } 242 DataReadState::Ready(data) => { 243 self.poll_send_frame(data)?; 244 } 245 DataReadState::Finish(frame) => { 246 self.poll_send_frame(frame)?; 247 break; 248 } 249 } 250 } 251 Ok(()) 252 } 253 poll_send_framenull254 fn poll_send_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> { 255 match frame.payload() { 256 Payload::Headers(_) => { 257 if let FrameRecvState::Err(e) = self 258 .controller 259 .streams 260 .send_headers_frame(frame.stream_id(), frame.flags().is_end_stream()) 261 { 262 // Never return FrameRecvState::Ignore case. 263 return Err(e.into()); 264 } 265 } 266 Payload::Data(_) => { 267 if let FrameRecvState::Err(e) = self 268 .controller 269 .streams 270 .send_data_frame(frame.stream_id(), frame.flags().is_end_stream()) 271 { 272 // Never return FrameRecvState::Ignore case. 273 return Err(e.into()); 274 } 275 } 276 _ => {} 277 } 278 279 self.input_tx 280 .send(frame) 281 .map_err(|_e| DispatchErrorKind::ChannelClosed) 282 } 283 poll_recv_framenull284 fn poll_recv_frame( 285 &mut self, 286 cx: &mut Context<'_>, 287 frame: Frame, 288 ) -> Poll<Result<(), DispatchErrorKind>> { 289 match frame.payload() { 290 Payload::Settings(_settings) => { 291 self.recv_settings_frame(frame)?; 292 } 293 Payload::Ping(_ping) => { 294 self.recv_ping_frame(frame)?; 295 } 296 Payload::PushPromise(_) => { 297 // TODO The current settings_enable_push setting is fixed to false. 298 return Poll::Ready(Err( 299 H2Error::ConnectionError(ErrorCode::ProtocolError).into() 300 )); 301 } 302 Payload::Goaway(_go_away) => { 303 return self.recv_go_away_frame(cx, frame).map_err(Into::into); 304 } 305 Payload::RstStream(_reset) => { 306 return self.recv_reset_frame(cx, frame).map_err(Into::into); 307 } 308 Payload::Headers(_headers) => { 309 return self.recv_header_frame(cx, frame).map_err(Into::into); 310 } 311 Payload::Data(_data) => { 312 return self.recv_data_frame(cx, frame).map_err(Into::into); 313 } 314 Payload::WindowUpdate(_windows) => { 315 self.recv_window_frame(frame)?; 316 } 317 // Priority is no longer recommended, so keep it compatible but not processed. 318 Payload::Priority(_priority) => {} 319 } 320 Poll::Ready(Ok(())) 321 } 322 recv_settings_framenull323 fn recv_settings_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> { 324 let settings = if let Payload::Settings(settings) = frame.payload() { 325 settings 326 } else { 327 // this will not happen forever. 328 return Ok(()); 329 }; 330 331 if frame.flags().is_ack() { 332 let mut connection = self.settings.lock().unwrap(); 333 334 if let SettingsState::Acknowledging(ref acknowledged) = connection.settings { 335 for setting in acknowledged.get_settings() { 336 if let Setting::InitialWindowSize(size) = setting { 337 self.controller 338 .streams 339 .apply_recv_initial_window_size(*size); 340 } 341 } 342 } 343 connection.settings = SettingsState::Synced; 344 self.handshakes.local = true; 345 Ok(()) 346 } else { 347 for setting in settings.get_settings() { 348 if let Setting::MaxConcurrentStreams(num) = setting { 349 self.controller.streams.apply_max_concurrent_streams(*num); 350 } 351 if let Setting::InitialWindowSize(size) = setting { 352 self.controller 353 .streams 354 .apply_send_initial_window_size(*size)?; 355 } 356 } 357 358 // The reason for copying the payload is to pass information to the io input to 359 // set the frame encoder, and the input will empty the 360 // payload when it is sent 361 let new_settings = Frame::new( 362 frame.stream_id(), 363 FrameFlags::new(0x1), 364 frame.payload().clone(), 365 ); 366 self.input_tx 367 .send(new_settings) 368 .map_err(|_e| DispatchErrorKind::ChannelClosed)?; 369 self.handshakes.peer = true; 370 Ok(()) 371 } 372 } 373 recv_ping_framenull374 fn recv_ping_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> { 375 let ping = if let Payload::Ping(ping) = frame.payload() { 376 ping 377 } else { 378 // this will not happen forever. 379 return Ok(()); 380 }; 381 if frame.flags().is_ack() { 382 // TODO The client does not have the logic to send ping frames. Therefore, the 383 // ack ping is not processed. 384 Ok(()) 385 } else { 386 self.input_tx 387 .send(Ping::ack(ping.clone())) 388 .map_err(|_e| DispatchErrorKind::ChannelClosed) 389 } 390 } 391 recv_go_away_framenull392 fn recv_go_away_frame( 393 &mut self, 394 cx: &mut Context<'_>, 395 frame: Frame, 396 ) -> Poll<Result<(), H2Error>> { 397 let go_away = if let Payload::Goaway(goaway) = frame.payload() { 398 goaway 399 } else { 400 // this will not happen forever. 401 return Poll::Ready(Ok(())); 402 }; 403 // Prevents the current connection from generating a new stream. 404 self.controller.shutdown(); 405 self.req_rx.close(); 406 let last_stream_id = go_away.get_last_stream_id(); 407 let streams = self.controller.get_unsent_streams(last_stream_id)?; 408 409 let error = H2Error::ConnectionError(ErrorCode::try_from(go_away.get_error_code())?); 410 411 let mut blocked = false; 412 for stream_id in streams { 413 match self.controller.send_message_to_stream( 414 cx, 415 stream_id, 416 RespMessage::OutputExit(error.into()), 417 ) { 418 // ignore error when going away. 419 Poll::Ready(_) => {} 420 Poll::Pending => { 421 blocked = true; 422 } 423 } 424 } 425 // Exit after the allowed stream is complete. 426 self.controller.recved_go_away = Some(go_away.get_error_code()); 427 if blocked { 428 Poll::Pending 429 } else { 430 Poll::Ready(Ok(())) 431 } 432 } 433 recv_reset_framenull434 fn recv_reset_frame( 435 &mut self, 436 cx: &mut Context<'_>, 437 frame: Frame, 438 ) -> Poll<Result<(), H2Error>> { 439 match self.controller.streams.recv_remote_reset(frame.stream_id()) { 440 StreamEndState::OK => self.controller.send_message_to_stream( 441 cx, 442 frame.stream_id(), 443 RespMessage::Output(frame), 444 ), 445 StreamEndState::Err(e) => Poll::Ready(Err(e)), 446 StreamEndState::Ignore => Poll::Ready(Ok(())), 447 } 448 } 449 recv_header_framenull450 fn recv_header_frame( 451 &mut self, 452 cx: &mut Context<'_>, 453 frame: Frame, 454 ) -> Poll<Result<(), H2Error>> { 455 match self 456 .controller 457 .streams 458 .recv_headers(frame.stream_id(), frame.flags().is_end_stream()) 459 { 460 FrameRecvState::OK => self.controller.send_message_to_stream( 461 cx, 462 frame.stream_id(), 463 RespMessage::Output(frame), 464 ), 465 FrameRecvState::Err(e) => Poll::Ready(Err(e)), 466 FrameRecvState::Ignore => Poll::Ready(Ok(())), 467 } 468 } 469 recv_data_framenull470 fn recv_data_frame(&mut self, cx: &mut Context<'_>, frame: Frame) -> Poll<Result<(), H2Error>> { 471 let data = if let Payload::Data(data) = frame.payload() { 472 data 473 } else { 474 // this will not happen forever. 475 return Poll::Ready(Ok(())); 476 }; 477 let id = frame.stream_id(); 478 let len = data.size() as u32; 479 480 self.controller.streams.release_conn_recv_window(len)?; 481 self.controller 482 .streams 483 .release_stream_recv_window(id, len)?; 484 485 match self 486 .controller 487 .streams 488 .recv_data(id, frame.flags().is_end_stream()) 489 { 490 FrameRecvState::OK => self.controller.send_message_to_stream( 491 cx, 492 frame.stream_id(), 493 RespMessage::Output(frame), 494 ), 495 FrameRecvState::Ignore => Poll::Ready(Ok(())), 496 FrameRecvState::Err(e) => Poll::Ready(Err(e)), 497 } 498 } 499 recv_window_framenull500 fn recv_window_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> { 501 let windows = if let Payload::WindowUpdate(windows) = frame.payload() { 502 windows 503 } else { 504 // this will not happen forever. 505 return Ok(()); 506 }; 507 let id = frame.stream_id(); 508 let increment = windows.get_increment(); 509 if id == 0 { 510 self.controller 511 .streams 512 .increase_conn_send_window(increment)?; 513 self.controller.streams.reassign_conn_send_window(); 514 } else { 515 self.controller 516 .streams 517 .reassign_stream_send_window(id, increment)?; 518 } 519 Ok(()) 520 } 521 manage_resp_errornull522 fn manage_resp_error( 523 &mut self, 524 cx: &mut Context<'_>, 525 kind: DispatchErrorKind, 526 ) -> Poll<Result<(), DispatchErrorKind>> { 527 match kind { 528 DispatchErrorKind::H2(h2) => match h2 { 529 H2Error::StreamError(id, code) => self.manage_stream_error(cx, id, code), 530 H2Error::ConnectionError(code) => self.manage_conn_error(cx, code), 531 }, 532 other => { 533 let blocked = self.exit_with_error(cx, other); 534 if blocked { 535 self.state = ManagerState::Send; 536 self.next_state = ManagerState::Exit(other); 537 Poll::Pending 538 } else { 539 Poll::Ready(Err(other)) 540 } 541 } 542 } 543 } 544 manage_stream_errornull545 fn manage_stream_error( 546 &mut self, 547 cx: &mut Context<'_>, 548 id: StreamId, 549 code: ErrorCode, 550 ) -> Poll<Result<(), DispatchErrorKind>> { 551 let rest_payload = RstStream::new(code.into_code()); 552 let frame = Frame::new(id, FrameFlags::empty(), Payload::RstStream(rest_payload)); 553 match self.controller.streams.send_local_reset(id) { 554 StreamEndState::OK => { 555 self.input_tx 556 .send(frame) 557 .map_err(|_e| DispatchErrorKind::ChannelClosed)?; 558 559 match self.controller.send_message_to_stream( 560 cx, 561 id, 562 RespMessage::OutputExit(DispatchErrorKind::ChannelClosed), 563 ) { 564 Poll::Ready(_) => { 565 // error at the stream level due to early exit of the coroutine in which the 566 // request is located, ignored to avoid manager coroutine exit. 567 Poll::Ready(Ok(())) 568 } 569 Poll::Pending => { 570 self.state = ManagerState::Send; 571 // stream error will not cause manager exit with error(exit state). Takes 572 // effect only if blocked. 573 self.next_state = ManagerState::Receive; 574 Poll::Pending 575 } 576 } 577 } 578 StreamEndState::Ignore => Poll::Ready(Ok(())), 579 StreamEndState::Err(e) => { 580 // This error will never happen. 581 Poll::Ready(Err(e.into())) 582 } 583 } 584 } 585 manage_conn_errornull586 fn manage_conn_error( 587 &mut self, 588 cx: &mut Context<'_>, 589 code: ErrorCode, 590 ) -> Poll<Result<(), DispatchErrorKind>> { 591 // last_stream_id is set to 0 to ensure that all pushed streams are 592 // shutdown. 593 let go_away_payload = Goaway::new( 594 code.into_code(), 595 self.controller.streams.latest_remote_id, 596 vec![], 597 ); 598 let frame = Frame::new( 599 0, 600 FrameFlags::empty(), 601 Payload::Goaway(go_away_payload.clone()), 602 ); 603 // Avoid sending the same GO_AWAY frame multiple times. 604 if let Some(ref go_away) = self.controller.go_away_sync.going_away { 605 if go_away.get_error_code() == go_away_payload.get_error_code() 606 && go_away.get_last_stream_id() == go_away_payload.get_last_stream_id() 607 { 608 return Poll::Ready(Ok(())); 609 } 610 } 611 self.controller.go_away_sync.going_away = Some(go_away_payload); 612 self.input_tx 613 .send(frame) 614 .map_err(|_e| DispatchErrorKind::ChannelClosed)?; 615 616 let blocked = 617 self.exit_with_error(cx, DispatchErrorKind::H2(H2Error::ConnectionError(code))); 618 619 if blocked { 620 self.state = ManagerState::Send; 621 self.next_state = ManagerState::Exit(H2Error::ConnectionError(code).into()); 622 Poll::Pending 623 } else { 624 // TODO When current client has an error, 625 // it always sends the GO_AWAY frame at the first time and exits directly. 626 // Should we consider letting part of the unfinished stream complete? 627 Poll::Ready(Err(H2Error::ConnectionError(code).into())) 628 } 629 } 630 poll_deal_with_go_awaynull631 fn poll_deal_with_go_away(&mut self, error_code: u32) -> Result<(), DispatchErrorKind> { 632 // The client that receives GO_AWAY needs to return a GO_AWAY to the server 633 // before closed. The preceding operations before receiving the frame 634 // ensure that the connection is in the closing state. 635 if self.controller.streams.is_closed() { 636 let last_stream_id = self.controller.streams.latest_remote_id; 637 let go_away_payload = Goaway::new(error_code, last_stream_id, vec![]); 638 let frame = Frame::new( 639 0, 640 FrameFlags::empty(), 641 Payload::Goaway(go_away_payload.clone()), 642 ); 643 644 self.send_peer_goaway(frame, go_away_payload, error_code)?; 645 return Err(H2Error::ConnectionError(ErrorCode::try_from(error_code)?).into()); 646 } 647 Ok(()) 648 } 649 send_peer_goawaynull650 fn send_peer_goaway( 651 &mut self, 652 frame: Frame, 653 payload: Goaway, 654 err_code: u32, 655 ) -> Result<(), DispatchErrorKind> { 656 match self.controller.go_away_sync.going_away { 657 None => { 658 self.controller.go_away_sync.going_away = Some(payload); 659 self.input_tx 660 .send(frame) 661 .map_err(|_e| DispatchErrorKind::ChannelClosed)?; 662 } 663 Some(ref go_away) => { 664 // Whether the same GOAWAY Frame has been sent before. 665 if !(go_away.get_error_code() == err_code 666 && go_away.get_last_stream_id() == self.controller.streams.latest_remote_id) 667 { 668 self.controller.go_away_sync.going_away = Some(payload); 669 self.input_tx 670 .send(frame) 671 .map_err(|_e| DispatchErrorKind::ChannelClosed)?; 672 } 673 } 674 } 675 Ok(()) 676 } 677 poll_recv_messagenull678 fn poll_recv_message( 679 &mut self, 680 cx: &mut Context<'_>, 681 frame: Frame, 682 ) -> Poll<Result<(), DispatchErrorKind>> { 683 match self.poll_recv_frame(cx, frame) { 684 Poll::Ready(Err(kind)) => self.manage_resp_error(cx, kind), 685 Poll::Pending => { 686 self.state = ManagerState::Send; 687 self.next_state = ManagerState::Receive; 688 Poll::Pending 689 } 690 Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), 691 } 692 } 693 poll_channel_closed_exitnull694 fn poll_channel_closed_exit( 695 &mut self, 696 cx: &mut Context<'_>, 697 ) -> Poll<Result<(), DispatchErrorKind>> { 698 if self.exit_with_error(cx, DispatchErrorKind::ChannelClosed) { 699 self.state = ManagerState::Send; 700 self.next_state = ManagerState::Exit(DispatchErrorKind::ChannelClosed); 701 Poll::Pending 702 } else { 703 Poll::Ready(Err(DispatchErrorKind::ChannelClosed)) 704 } 705 } 706 poll_blocked_framesnull707 fn poll_blocked_frames(&mut self, cx: &mut Context<'_>) -> Poll<()> { 708 match self.controller.poll_blocked_message(cx, &self.input_tx) { 709 Poll::Ready(_) => { 710 self.state = self.next_state; 711 // Reset state. 712 self.next_state = ManagerState::Receive; 713 Poll::Ready(()) 714 } 715 Poll::Pending => Poll::Pending, 716 } 717 } 718 719 pub(crate) fn exit_with_error( 720 &mut self, 721 cx: &mut Context<'_>, 722 error: DispatchErrorKind, 723 ) -> bool { 724 self.controller.shutdown(); 725 self.req_rx.close(); 726 self.controller.streams.clear_streams_states(); 727 728 let ids = self.controller.streams.get_all_unclosed_streams(); 729 let mut blocked = false; 730 for stream_id in ids { 731 match self.controller.send_message_to_stream( 732 cx, 733 stream_id, 734 RespMessage::OutputExit(error), 735 ) { 736 // ignore error when going away. 737 Poll::Ready(_) => {} 738 Poll::Pending => { 739 blocked = true; 740 } 741 } 742 } 743 blocked 744 } 745 } 746