1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 //! Streams manage coroutine.
15 
16 use std::future::Future;
17 use std::pin::Pin;
18 use std::sync::{Arc, Mutex};
19 use std::task::{Context, Poll};
20 
21 use ylong_http::h2::{
22     ErrorCode, Frame, FrameFlags, Goaway, H2Error, Payload, Ping, RstStream, Setting, StreamId,
23 };
24 
25 use crate::runtime::{BoundedReceiver, UnboundedReceiver, UnboundedSender};
26 use crate::util::dispatcher::http2::{
27     DispatchErrorKind, OutputMessage, ReqMessage, RespMessage, SettingsState, SettingsSync,
28     StreamController,
29 };
30 use crate::util::h2::streams::{DataReadState, FrameRecvState, StreamEndState};
31 
32 #[derive(Copy, Clone)]
33 enum ManagerState {
34     Send,
35     Receive,
36     Exit(DispatchErrorKind),
37 }
38 
39 pub(crate) struct ConnManager {
40     state: ManagerState,
41     next_state: ManagerState,
42     // Synchronize SETTINGS frames sent by the client.
43     settings: Arc<Mutex<SettingsSync>>,
44     // channel transmitter between manager and io input.
45     input_tx: UnboundedSender<Frame>,
46     // channel receiver between manager and io output.
47     resp_rx: BoundedReceiver<OutputMessage>,
48     // channel receiver between manager and stream coroutine.
49     req_rx: UnboundedReceiver<ReqMessage>,
50     controller: StreamController,
51     handshakes: HandShakes,
52 }
53 
54 struct HandShakes {
55     local: bool,
56     peer: bool,
57 }
58 
59 impl Future for ConnManager {
60     type Output = Result<(), DispatchErrorKind>;
61 
pollnull62     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
63         let manager = self.get_mut();
64         loop {
65             match manager.state {
66                 ManagerState::Send => {
67                     if manager.poll_blocked_frames(cx).is_pending() {
68                         return Poll::Pending;
69                     }
70                 }
71                 ManagerState::Receive => {
72                     // Receives a response frame from io output.
73                     match manager.resp_rx.poll_recv(cx) {
74                         #[cfg(feature = "tokio_base")]
75                         Poll::Ready(Some(message)) => match message {
76                             OutputMessage::Output(frame) => {
77                                 if manager.poll_recv_message(cx, frame)?.is_pending() {
78                                     return Poll::Pending;
79                                 }
80                             }
81                             // io output occurs error.
82                             OutputMessage::OutputExit(e) => {
83                                 // Note error returned immediately.
84                                 if manager.manage_resp_error(cx, e)?.is_pending() {
85                                     return Poll::Pending;
86                                 }
87                             }
88                         },
89                         #[cfg(feature = "ylong_base")]
90                         Poll::Ready(Ok(message)) => match message {
91                             OutputMessage::Output(frame) => {
92                                 if manager.poll_recv_message(cx, frame)?.is_pending() {
93                                     return Poll::Pending;
94                                 }
95                             }
96                             // io output occurs error.
97                             OutputMessage::OutputExit(e) => {
98                                 if manager.manage_resp_error(cx, e)?.is_pending() {
99                                     return Poll::Pending;
100                                 }
101                             }
102                         },
103                         #[cfg(feature = "tokio_base")]
104                         Poll::Ready(None) => {
105                             return manager.poll_channel_closed_exit(cx);
106                         }
107                         #[cfg(feature = "ylong_base")]
108                         Poll::Ready(Err(_e)) => {
109                             return manager.poll_channel_closed_exit(cx);
110                         }
111 
112                         Poll::Pending => {
113                             // TODO manage error state.
114                             return manager.manage_pending_state(cx);
115                         }
116                     }
117                 }
118                 ManagerState::Exit(e) => return Poll::Ready(Err(e)),
119             }
120         }
121     }
122 }
123 
124 impl ConnManager {
125     pub(crate) fn new(
126         settings: Arc<Mutex<SettingsSync>>,
127         input_tx: UnboundedSender<Frame>,
128         resp_rx: BoundedReceiver<OutputMessage>,
129         req_rx: UnboundedReceiver<ReqMessage>,
130         controller: StreamController,
131     ) -> Self {
132         Self {
133             state: ManagerState::Receive,
134             next_state: ManagerState::Receive,
135             settings,
136             input_tx,
137             resp_rx,
138             req_rx,
139             controller,
140             handshakes: HandShakes {
141                 local: false,
142                 peer: false,
143             },
144         }
145     }
146 
manage_pending_statenull147     fn manage_pending_state(
148         &mut self,
149         cx: &mut Context<'_>,
150     ) -> Poll<Result<(), DispatchErrorKind>> {
151         // The manager previously accepted a GOAWAY Frame.
152         if let Some(code) = self.controller.recved_go_away {
153             self.poll_deal_with_go_away(code)?;
154         }
155         self.controller.streams.window_update_conn(&self.input_tx)?;
156         self.controller
157             .streams
158             .window_update_streams(&self.input_tx)?;
159         self.poll_recv_request(cx)?;
160         if self.handshakes.local && self.handshakes.peer {
161             self.poll_input_request(cx)?;
162         }
163         Poll::Pending
164     }
165 
poll_recv_requestnull166     fn poll_recv_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind> {
167         loop {
168             #[cfg(feature = "tokio_base")]
169             let message = match self.req_rx.poll_recv(cx) {
170                 Poll::Ready(Some(message)) => message,
171                 Poll::Ready(None) => return Err(DispatchErrorKind::ChannelClosed),
172                 Poll::Pending => break,
173             };
174             #[cfg(feature = "ylong_base")]
175             let message = match self.req_rx.poll_recv(cx) {
176                 Poll::Ready(Ok(message)) => message,
177                 Poll::Ready(Err(_e)) => return Err(DispatchErrorKind::ChannelClosed),
178                 Poll::Pending => break,
179             };
180             let id = match self.controller.streams.generate_id() {
181                 Ok(id) => id,
182                 Err(e) => {
183                     let _ = message.sender.try_send(RespMessage::OutputExit(e));
184                     break;
185                 }
186             };
187             let headers = Frame::new(id, message.request.flag, message.request.payload);
188             if self.controller.streams.reach_max_concurrency()
189                 || !self.controller.streams.is_pending_concurrency_empty()
190             {
191                 self.controller.streams.push_pending_concurrency(id)
192             } else {
193                 self.controller.streams.increase_current_concurrency();
194                 self.controller.streams.push_back_pending_send(id)
195             }
196             self.controller.senders.insert(id, message.sender);
197             self.controller
198                 .streams
199                 .insert(id, headers, message.request.data);
200         }
201         Ok(())
202     }
203 
poll_input_requestnull204     fn poll_input_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind> {
205         self.controller.streams.try_consume_pending_concurrency();
206         let size = self.controller.streams.pending_stream_num();
207         let mut index = 0;
208         while index < size {
209             match self.controller.streams.next_pending_stream() {
210                 None => {
211                     break;
212                 }
213                 Some(id) => {
214                     self.input_stream_frame(cx, id)?;
215                 }
216             }
217             index += 1;
218         }
219         Ok(())
220     }
221 
input_stream_framenull222     fn input_stream_frame(
223         &mut self,
224         cx: &mut Context<'_>,
225         id: StreamId,
226     ) -> Result<(), DispatchErrorKind> {
227         match self.controller.streams.headers(id)? {
228             None => {}
229             Some(header) => {
230                 self.poll_send_frame(header)?;
231             }
232         }
233 
234         loop {
235             match self.controller.streams.poll_read_body(cx, id)? {
236                 DataReadState::Closed => {
237                     break;
238                 }
239                 DataReadState::Pending => {
240                     break;
241                 }
242                 DataReadState::Ready(data) => {
243                     self.poll_send_frame(data)?;
244                 }
245                 DataReadState::Finish(frame) => {
246                     self.poll_send_frame(frame)?;
247                     break;
248                 }
249             }
250         }
251         Ok(())
252     }
253 
poll_send_framenull254     fn poll_send_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> {
255         match frame.payload() {
256             Payload::Headers(_) => {
257                 if let FrameRecvState::Err(e) = self
258                     .controller
259                     .streams
260                     .send_headers_frame(frame.stream_id(), frame.flags().is_end_stream())
261                 {
262                     // Never return FrameRecvState::Ignore case.
263                     return Err(e.into());
264                 }
265             }
266             Payload::Data(_) => {
267                 if let FrameRecvState::Err(e) = self
268                     .controller
269                     .streams
270                     .send_data_frame(frame.stream_id(), frame.flags().is_end_stream())
271                 {
272                     // Never return FrameRecvState::Ignore case.
273                     return Err(e.into());
274                 }
275             }
276             _ => {}
277         }
278 
279         self.input_tx
280             .send(frame)
281             .map_err(|_e| DispatchErrorKind::ChannelClosed)
282     }
283 
poll_recv_framenull284     fn poll_recv_frame(
285         &mut self,
286         cx: &mut Context<'_>,
287         frame: Frame,
288     ) -> Poll<Result<(), DispatchErrorKind>> {
289         match frame.payload() {
290             Payload::Settings(_settings) => {
291                 self.recv_settings_frame(frame)?;
292             }
293             Payload::Ping(_ping) => {
294                 self.recv_ping_frame(frame)?;
295             }
296             Payload::PushPromise(_) => {
297                 // TODO The current settings_enable_push setting is fixed to false.
298                 return Poll::Ready(Err(
299                     H2Error::ConnectionError(ErrorCode::ProtocolError).into()
300                 ));
301             }
302             Payload::Goaway(_go_away) => {
303                 return self.recv_go_away_frame(cx, frame).map_err(Into::into);
304             }
305             Payload::RstStream(_reset) => {
306                 return self.recv_reset_frame(cx, frame).map_err(Into::into);
307             }
308             Payload::Headers(_headers) => {
309                 return self.recv_header_frame(cx, frame).map_err(Into::into);
310             }
311             Payload::Data(_data) => {
312                 return self.recv_data_frame(cx, frame).map_err(Into::into);
313             }
314             Payload::WindowUpdate(_windows) => {
315                 self.recv_window_frame(frame)?;
316             }
317             // Priority is no longer recommended, so keep it compatible but not processed.
318             Payload::Priority(_priority) => {}
319         }
320         Poll::Ready(Ok(()))
321     }
322 
recv_settings_framenull323     fn recv_settings_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> {
324         let settings = if let Payload::Settings(settings) = frame.payload() {
325             settings
326         } else {
327             // this will not happen forever.
328             return Ok(());
329         };
330 
331         if frame.flags().is_ack() {
332             let mut connection = self.settings.lock().unwrap();
333 
334             if let SettingsState::Acknowledging(ref acknowledged) = connection.settings {
335                 for setting in acknowledged.get_settings() {
336                     if let Setting::InitialWindowSize(size) = setting {
337                         self.controller
338                             .streams
339                             .apply_recv_initial_window_size(*size);
340                     }
341                 }
342             }
343             connection.settings = SettingsState::Synced;
344             self.handshakes.local = true;
345             Ok(())
346         } else {
347             for setting in settings.get_settings() {
348                 if let Setting::MaxConcurrentStreams(num) = setting {
349                     self.controller.streams.apply_max_concurrent_streams(*num);
350                 }
351                 if let Setting::InitialWindowSize(size) = setting {
352                     self.controller
353                         .streams
354                         .apply_send_initial_window_size(*size)?;
355                 }
356             }
357 
358             // The reason for copying the payload is to pass information to the io input to
359             // set the frame encoder, and the input will empty the
360             // payload when it is sent
361             let new_settings = Frame::new(
362                 frame.stream_id(),
363                 FrameFlags::new(0x1),
364                 frame.payload().clone(),
365             );
366             self.input_tx
367                 .send(new_settings)
368                 .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
369             self.handshakes.peer = true;
370             Ok(())
371         }
372     }
373 
recv_ping_framenull374     fn recv_ping_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> {
375         let ping = if let Payload::Ping(ping) = frame.payload() {
376             ping
377         } else {
378             // this will not happen forever.
379             return Ok(());
380         };
381         if frame.flags().is_ack() {
382             // TODO The client does not have the logic to send ping frames. Therefore, the
383             // ack ping is not processed.
384             Ok(())
385         } else {
386             self.input_tx
387                 .send(Ping::ack(ping.clone()))
388                 .map_err(|_e| DispatchErrorKind::ChannelClosed)
389         }
390     }
391 
recv_go_away_framenull392     fn recv_go_away_frame(
393         &mut self,
394         cx: &mut Context<'_>,
395         frame: Frame,
396     ) -> Poll<Result<(), H2Error>> {
397         let go_away = if let Payload::Goaway(goaway) = frame.payload() {
398             goaway
399         } else {
400             // this will not happen forever.
401             return Poll::Ready(Ok(()));
402         };
403         // Prevents the current connection from generating a new stream.
404         self.controller.shutdown();
405         self.req_rx.close();
406         let last_stream_id = go_away.get_last_stream_id();
407         let streams = self.controller.get_unsent_streams(last_stream_id)?;
408 
409         let error = H2Error::ConnectionError(ErrorCode::try_from(go_away.get_error_code())?);
410 
411         let mut blocked = false;
412         for stream_id in streams {
413             match self.controller.send_message_to_stream(
414                 cx,
415                 stream_id,
416                 RespMessage::OutputExit(error.into()),
417             ) {
418                 // ignore error when going away.
419                 Poll::Ready(_) => {}
420                 Poll::Pending => {
421                     blocked = true;
422                 }
423             }
424         }
425         // Exit after the allowed stream is complete.
426         self.controller.recved_go_away = Some(go_away.get_error_code());
427         if blocked {
428             Poll::Pending
429         } else {
430             Poll::Ready(Ok(()))
431         }
432     }
433 
recv_reset_framenull434     fn recv_reset_frame(
435         &mut self,
436         cx: &mut Context<'_>,
437         frame: Frame,
438     ) -> Poll<Result<(), H2Error>> {
439         match self.controller.streams.recv_remote_reset(frame.stream_id()) {
440             StreamEndState::OK => self.controller.send_message_to_stream(
441                 cx,
442                 frame.stream_id(),
443                 RespMessage::Output(frame),
444             ),
445             StreamEndState::Err(e) => Poll::Ready(Err(e)),
446             StreamEndState::Ignore => Poll::Ready(Ok(())),
447         }
448     }
449 
recv_header_framenull450     fn recv_header_frame(
451         &mut self,
452         cx: &mut Context<'_>,
453         frame: Frame,
454     ) -> Poll<Result<(), H2Error>> {
455         match self
456             .controller
457             .streams
458             .recv_headers(frame.stream_id(), frame.flags().is_end_stream())
459         {
460             FrameRecvState::OK => self.controller.send_message_to_stream(
461                 cx,
462                 frame.stream_id(),
463                 RespMessage::Output(frame),
464             ),
465             FrameRecvState::Err(e) => Poll::Ready(Err(e)),
466             FrameRecvState::Ignore => Poll::Ready(Ok(())),
467         }
468     }
469 
recv_data_framenull470     fn recv_data_frame(&mut self, cx: &mut Context<'_>, frame: Frame) -> Poll<Result<(), H2Error>> {
471         let data = if let Payload::Data(data) = frame.payload() {
472             data
473         } else {
474             // this will not happen forever.
475             return Poll::Ready(Ok(()));
476         };
477         let id = frame.stream_id();
478         let len = data.size() as u32;
479 
480         self.controller.streams.release_conn_recv_window(len)?;
481         self.controller
482             .streams
483             .release_stream_recv_window(id, len)?;
484 
485         match self
486             .controller
487             .streams
488             .recv_data(id, frame.flags().is_end_stream())
489         {
490             FrameRecvState::OK => self.controller.send_message_to_stream(
491                 cx,
492                 frame.stream_id(),
493                 RespMessage::Output(frame),
494             ),
495             FrameRecvState::Ignore => Poll::Ready(Ok(())),
496             FrameRecvState::Err(e) => Poll::Ready(Err(e)),
497         }
498     }
499 
recv_window_framenull500     fn recv_window_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> {
501         let windows = if let Payload::WindowUpdate(windows) = frame.payload() {
502             windows
503         } else {
504             // this will not happen forever.
505             return Ok(());
506         };
507         let id = frame.stream_id();
508         let increment = windows.get_increment();
509         if id == 0 {
510             self.controller
511                 .streams
512                 .increase_conn_send_window(increment)?;
513             self.controller.streams.reassign_conn_send_window();
514         } else {
515             self.controller
516                 .streams
517                 .reassign_stream_send_window(id, increment)?;
518         }
519         Ok(())
520     }
521 
manage_resp_errornull522     fn manage_resp_error(
523         &mut self,
524         cx: &mut Context<'_>,
525         kind: DispatchErrorKind,
526     ) -> Poll<Result<(), DispatchErrorKind>> {
527         match kind {
528             DispatchErrorKind::H2(h2) => match h2 {
529                 H2Error::StreamError(id, code) => self.manage_stream_error(cx, id, code),
530                 H2Error::ConnectionError(code) => self.manage_conn_error(cx, code),
531             },
532             other => {
533                 let blocked = self.exit_with_error(cx, other);
534                 if blocked {
535                     self.state = ManagerState::Send;
536                     self.next_state = ManagerState::Exit(other);
537                     Poll::Pending
538                 } else {
539                     Poll::Ready(Err(other))
540                 }
541             }
542         }
543     }
544 
manage_stream_errornull545     fn manage_stream_error(
546         &mut self,
547         cx: &mut Context<'_>,
548         id: StreamId,
549         code: ErrorCode,
550     ) -> Poll<Result<(), DispatchErrorKind>> {
551         let rest_payload = RstStream::new(code.into_code());
552         let frame = Frame::new(id, FrameFlags::empty(), Payload::RstStream(rest_payload));
553         match self.controller.streams.send_local_reset(id) {
554             StreamEndState::OK => {
555                 self.input_tx
556                     .send(frame)
557                     .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
558 
559                 match self.controller.send_message_to_stream(
560                     cx,
561                     id,
562                     RespMessage::OutputExit(DispatchErrorKind::ChannelClosed),
563                 ) {
564                     Poll::Ready(_) => {
565                         // error at the stream level due to early exit of the coroutine in which the
566                         // request is located, ignored to avoid manager coroutine exit.
567                         Poll::Ready(Ok(()))
568                     }
569                     Poll::Pending => {
570                         self.state = ManagerState::Send;
571                         // stream error will not cause manager exit with error(exit state). Takes
572                         // effect only if blocked.
573                         self.next_state = ManagerState::Receive;
574                         Poll::Pending
575                     }
576                 }
577             }
578             StreamEndState::Ignore => Poll::Ready(Ok(())),
579             StreamEndState::Err(e) => {
580                 // This error will never happen.
581                 Poll::Ready(Err(e.into()))
582             }
583         }
584     }
585 
manage_conn_errornull586     fn manage_conn_error(
587         &mut self,
588         cx: &mut Context<'_>,
589         code: ErrorCode,
590     ) -> Poll<Result<(), DispatchErrorKind>> {
591         // last_stream_id is set to 0 to ensure that all pushed streams are
592         // shutdown.
593         let go_away_payload = Goaway::new(
594             code.into_code(),
595             self.controller.streams.latest_remote_id,
596             vec![],
597         );
598         let frame = Frame::new(
599             0,
600             FrameFlags::empty(),
601             Payload::Goaway(go_away_payload.clone()),
602         );
603         // Avoid sending the same GO_AWAY frame multiple times.
604         if let Some(ref go_away) = self.controller.go_away_sync.going_away {
605             if go_away.get_error_code() == go_away_payload.get_error_code()
606                 && go_away.get_last_stream_id() == go_away_payload.get_last_stream_id()
607             {
608                 return Poll::Ready(Ok(()));
609             }
610         }
611         self.controller.go_away_sync.going_away = Some(go_away_payload);
612         self.input_tx
613             .send(frame)
614             .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
615 
616         let blocked =
617             self.exit_with_error(cx, DispatchErrorKind::H2(H2Error::ConnectionError(code)));
618 
619         if blocked {
620             self.state = ManagerState::Send;
621             self.next_state = ManagerState::Exit(H2Error::ConnectionError(code).into());
622             Poll::Pending
623         } else {
624             // TODO When current client has an error,
625             // it always sends the GO_AWAY frame at the first time and exits directly.
626             // Should we consider letting part of the unfinished stream complete?
627             Poll::Ready(Err(H2Error::ConnectionError(code).into()))
628         }
629     }
630 
poll_deal_with_go_awaynull631     fn poll_deal_with_go_away(&mut self, error_code: u32) -> Result<(), DispatchErrorKind> {
632         // The client that receives GO_AWAY needs to return a GO_AWAY to the server
633         // before closed. The preceding operations before receiving the frame
634         // ensure that the connection is in the closing state.
635         if self.controller.streams.is_closed() {
636             let last_stream_id = self.controller.streams.latest_remote_id;
637             let go_away_payload = Goaway::new(error_code, last_stream_id, vec![]);
638             let frame = Frame::new(
639                 0,
640                 FrameFlags::empty(),
641                 Payload::Goaway(go_away_payload.clone()),
642             );
643 
644             self.send_peer_goaway(frame, go_away_payload, error_code)?;
645             return Err(H2Error::ConnectionError(ErrorCode::try_from(error_code)?).into());
646         }
647         Ok(())
648     }
649 
send_peer_goawaynull650     fn send_peer_goaway(
651         &mut self,
652         frame: Frame,
653         payload: Goaway,
654         err_code: u32,
655     ) -> Result<(), DispatchErrorKind> {
656         match self.controller.go_away_sync.going_away {
657             None => {
658                 self.controller.go_away_sync.going_away = Some(payload);
659                 self.input_tx
660                     .send(frame)
661                     .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
662             }
663             Some(ref go_away) => {
664                 // Whether the same GOAWAY Frame has been sent before.
665                 if !(go_away.get_error_code() == err_code
666                     && go_away.get_last_stream_id() == self.controller.streams.latest_remote_id)
667                 {
668                     self.controller.go_away_sync.going_away = Some(payload);
669                     self.input_tx
670                         .send(frame)
671                         .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
672                 }
673             }
674         }
675         Ok(())
676     }
677 
poll_recv_messagenull678     fn poll_recv_message(
679         &mut self,
680         cx: &mut Context<'_>,
681         frame: Frame,
682     ) -> Poll<Result<(), DispatchErrorKind>> {
683         match self.poll_recv_frame(cx, frame) {
684             Poll::Ready(Err(kind)) => self.manage_resp_error(cx, kind),
685             Poll::Pending => {
686                 self.state = ManagerState::Send;
687                 self.next_state = ManagerState::Receive;
688                 Poll::Pending
689             }
690             Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
691         }
692     }
693 
poll_channel_closed_exitnull694     fn poll_channel_closed_exit(
695         &mut self,
696         cx: &mut Context<'_>,
697     ) -> Poll<Result<(), DispatchErrorKind>> {
698         if self.exit_with_error(cx, DispatchErrorKind::ChannelClosed) {
699             self.state = ManagerState::Send;
700             self.next_state = ManagerState::Exit(DispatchErrorKind::ChannelClosed);
701             Poll::Pending
702         } else {
703             Poll::Ready(Err(DispatchErrorKind::ChannelClosed))
704         }
705     }
706 
poll_blocked_framesnull707     fn poll_blocked_frames(&mut self, cx: &mut Context<'_>) -> Poll<()> {
708         match self.controller.poll_blocked_message(cx, &self.input_tx) {
709             Poll::Ready(_) => {
710                 self.state = self.next_state;
711                 // Reset state.
712                 self.next_state = ManagerState::Receive;
713                 Poll::Ready(())
714             }
715             Poll::Pending => Poll::Pending,
716         }
717     }
718 
719     pub(crate) fn exit_with_error(
720         &mut self,
721         cx: &mut Context<'_>,
722         error: DispatchErrorKind,
723     ) -> bool {
724         self.controller.shutdown();
725         self.req_rx.close();
726         self.controller.streams.clear_streams_states();
727 
728         let ids = self.controller.streams.get_all_unclosed_streams();
729         let mut blocked = false;
730         for stream_id in ids {
731             match self.controller.send_message_to_stream(
732                 cx,
733                 stream_id,
734                 RespMessage::OutputExit(error),
735             ) {
736                 // ignore error when going away.
737                 Poll::Ready(_) => {}
738                 Poll::Pending => {
739                     blocked = true;
740                 }
741             }
742         }
743         blocked
744     }
745 }
746