1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 //! Stream Manager module.
15 
16 use std::future::Future;
17 use std::pin::Pin;
18 use std::sync::atomic::{AtomicBool, Ordering};
19 use std::sync::{Arc, Mutex};
20 use std::task::{Context, Poll};
21 
22 use quiche::Shutdown;
23 use ylong_http::h3::{
24     Frame, FrameDecoder, FrameEncoder, FrameKind, Frames, H3Error, H3ErrorCode, Headers, Payload,
25     Settings, StreamMessage, CONTROL_STREAM_TYPE, QPACK_DECODER_STREAM_TYPE,
26     QPACK_ENCODER_STREAM_TYPE, SETTINGS_FRAME_TYPE,
27 };
28 
29 use crate::async_impl::QuicConn;
30 use crate::runtime::{UnboundedReceiver, UnboundedSender};
31 use crate::util::config::H3Config;
32 use crate::util::dispatcher::http3::{DispatchErrorKind, ReqMessage, RespMessage};
33 use crate::util::h3::streams::{DataReadState, QUICStreamType, Streams};
34 
35 pub(crate) const UPD_RECV_BUF_SIZE: usize = 65535;
36 const DECODE_BUF_SIZE: usize = 1024;
37 
38 pub(crate) struct StreamManager {
39     pub(crate) streams: Streams,
40     pub(crate) quic_conn: Arc<Mutex<QuicConn>>,
41     pub(crate) io_manager_tx: UnboundedSender<Result<(), DispatchErrorKind>>,
42     pub(crate) stream_manager_rx: UnboundedReceiver<Result<(), DispatchErrorKind>>,
43     pub(crate) req_rx: UnboundedReceiver<ReqMessage>,
44     pub(crate) stream_recv_buf: [u8; UPD_RECV_BUF_SIZE],
45     pub(crate) encoder: FrameEncoder,
46     pub(crate) decoder: FrameDecoder,
47     pub(crate) encoder_buf: [u8; DECODE_BUF_SIZE],
48     pub(crate) inst_buf: [u8; DECODE_BUF_SIZE],
49     pub(crate) peer_settings: Option<Settings>,
50     pub(crate) io_shutdown: Arc<AtomicBool>,
51     pub(crate) io_goaway: Arc<AtomicBool>,
52 }
53 
54 impl StreamManager {
55     pub(crate) fn new(
56         quic_conn: Arc<Mutex<QuicConn>>,
57         io_manager_tx: UnboundedSender<Result<(), DispatchErrorKind>>,
58         stream_manager_rx: UnboundedReceiver<Result<(), DispatchErrorKind>>,
59         req_rx: UnboundedReceiver<ReqMessage>,
60         decoder: FrameDecoder,
61         io_shutdown: Arc<AtomicBool>,
62         io_goaway: Arc<AtomicBool>,
63     ) -> Self {
64         Self {
65             streams: Streams::new(),
66             quic_conn,
67             io_manager_tx,
68             stream_manager_rx,
69             req_rx,
70             stream_recv_buf: [0u8; UPD_RECV_BUF_SIZE],
71             encoder_buf: [0u8; DECODE_BUF_SIZE],
72             inst_buf: [0u8; DECODE_BUF_SIZE],
73             encoder: FrameEncoder::default(),
74             decoder,
75             peer_settings: None,
76             io_shutdown,
77             io_goaway,
78         }
79     }
80 
poll_recv_signalnull81     fn poll_recv_signal(
82         &mut self,
83         cx: &mut Context<'_>,
84     ) -> Poll<Result<Result<(), DispatchErrorKind>, DispatchErrorKind>> {
85         #[cfg(feature = "tokio_base")]
86         match self.stream_manager_rx.poll_recv(cx) {
87             Poll::Ready(None) => Poll::Ready(Err(DispatchErrorKind::ChannelClosed)),
88             Poll::Ready(Some(data)) => Poll::Ready(Ok(data)),
89             Poll::Pending => Poll::Pending,
90         }
91         #[cfg(feature = "ylong_base")]
92         match self.stream_manager_rx.poll_recv(cx) {
93             Poll::Ready(Err(_e)) => Poll::Ready(Err(DispatchErrorKind::ChannelClosed)),
94             Poll::Ready(Ok(data)) => Poll::Ready(Ok(data)),
95             Poll::Pending => Poll::Pending,
96         }
97     }
98 
poll_recv_requestnull99     fn poll_recv_request(
100         &mut self,
101         cx: &mut Context<'_>,
102     ) -> Poll<Result<ReqMessage, DispatchErrorKind>> {
103         #[cfg(feature = "tokio_base")]
104         match self.req_rx.poll_recv(cx) {
105             Poll::Ready(None) => Poll::Ready(Err(DispatchErrorKind::ChannelClosed)),
106             Poll::Ready(Some(data)) => Poll::Ready(Ok(data)),
107             Poll::Pending => Poll::Pending,
108         }
109         #[cfg(feature = "ylong_base")]
110         match self.req_rx.poll_recv(cx) {
111             Poll::Ready(Err(_e)) => Poll::Ready(Err(DispatchErrorKind::ChannelClosed)),
112             Poll::Ready(Ok(data)) => Poll::Ready(Ok(data)),
113             Poll::Pending => Poll::Pending,
114         }
115     }
116 
send_inst_to_peernull117     fn send_inst_to_peer(
118         &mut self,
119         headers: &Headers,
120         quic_conn: &mut QuicConn,
121     ) -> Result<(), DispatchErrorKind> {
122         if let Some(vec) = headers.get_instruction() {
123             let qpack_decode_stream_id =
124                 self.streams
125                     .qpack_decode_stream_id()
126                     .ok_or(DispatchErrorKind::H3(H3Error::Connection(
127                         H3ErrorCode::H3InternalError,
128                     )))?;
129             quic_conn.stream_send(qpack_decode_stream_id, vec, false)?;
130         }
131         Ok(())
132     }
133 
transmit_errornull134     fn transmit_error(
135         &mut self,
136         cx: &mut Context<'_>,
137         id: u64,
138         error: DispatchErrorKind,
139     ) -> Result<(), DispatchErrorKind> {
140         self.streams.send_error(cx, id, error)
141     }
142 
poll_input_requestnull143     fn poll_input_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind> {
144         self.streams.try_consume_pending_concurrency();
145         let len = self.streams.pending_stream_len();
146         // Some streams may be blocked due to the server not reading the message. Avoid
147         // reading these streams twice in one loop
148         for _ in 0..len {
149             if let Some(id) = self.streams.next_stream() {
150                 self.input_stream_frame(cx, id)?;
151             } else {
152                 break;
153             }
154         }
155         Ok(())
156     }
157 
input_stream_framenull158     fn input_stream_frame(
159         &mut self,
160         cx: &mut Context<'_>,
161         id: u64,
162     ) -> Result<(), DispatchErrorKind> {
163         if let Some(header) = self.streams.get_header(id)? {
164             self.poll_send_header(id, header)?;
165         }
166 
167         // encoding means last frame is still encoding, can not create new frame before
168         // consumed.
169         if self.streams.encoding(id)? {
170             if let Err(e) = self.poll_send_frame(id, None) {
171                 return match e {
172                     DispatchErrorKind::Quic(quiche::Error::StreamStopped(_)) => Ok(()),
173                     e => Err(e),
174                 };
175             }
176             if self.streams.encoding(id)? {
177                 self.streams.push_back_pending_send(id);
178                 return Ok(());
179             }
180         }
181 
182         loop {
183             match self.poll_read_body(cx, id)? {
184                 DataReadState::Closed | DataReadState::Pending => {
185                     break;
186                 }
187                 DataReadState::Ready(data) => {
188                     if let Err(e) = self.poll_send_frame(id, Some(*data)) {
189                         return match e {
190                             DispatchErrorKind::Quic(quiche::Error::StreamStopped(_)) => Ok(()),
191                             e => Err(e),
192                         };
193                     }
194                     if self.streams.encoding(id)? {
195                         self.streams.push_back_pending_send(id);
196                         break;
197                     }
198                 }
199                 DataReadState::Finish => {
200                     let mut quic_conn = self.quic_conn.lock().unwrap();
201                     quic_conn.stream_send(id, b"", true)?;
202                     let _ = self.io_manager_tx.send(Ok(()));
203                     break;
204                 }
205             }
206         }
207         Ok(())
208     }
209 
poll_send_headernull210     fn poll_send_header(&mut self, id: u64, frame: Frame) -> Result<(), DispatchErrorKind> {
211         self.streams.set_encoding(id, true)?;
212         self.encoder.set_frame(id, frame)?;
213         let quic_conn = self.quic_conn.clone();
214         let qpack_encode_stream_id =
215             self.streams
216                 .qpack_encode_stream_id()
217                 .ok_or(DispatchErrorKind::H3(H3Error::Connection(
218                     H3ErrorCode::H3InternalError,
219                 )))?;
220         let mut quic_conn = quic_conn.lock().unwrap();
221 
222         // invalid means stream has not been created, create it first
223         if let Err(quiche::Error::InvalidStreamState(_)) =
224             quic_conn.stream_writable(id, DECODE_BUF_SIZE)
225         {
226             quic_conn.stream_send(id, b"", false)?;
227         }
228         while quic_conn.stream_writable(id, DECODE_BUF_SIZE)?
229             && quic_conn.stream_writable(qpack_encode_stream_id, DECODE_BUF_SIZE)?
230         {
231             let (data_size, inst_size) =
232                 self.encoder
233                     .encode(id, &mut self.encoder_buf, &mut self.inst_buf)?;
234             if inst_size != 0 {
235                 quic_conn.stream_send(
236                     qpack_encode_stream_id,
237                     &self.inst_buf[..inst_size],
238                     false,
239                 )?;
240             }
241             if data_size != 0 {
242                 quic_conn.stream_send(id, &self.encoder_buf[..data_size], false)?;
243             }
244             if inst_size == 0 && data_size == 0 {
245                 self.streams.set_encoding(id, false)?;
246                 break;
247             }
248         }
249 
250         let _ = self.io_manager_tx.send(Ok(()));
251         Ok(())
252     }
253 
poll_send_framenull254     fn poll_send_frame(&mut self, id: u64, frame: Option<Frame>) -> Result<(), DispatchErrorKind> {
255         if let Some(frame) = frame {
256             self.streams.set_encoding(id, true)?;
257             self.encoder.set_frame(id, frame)?;
258         }
259         let mut quic_conn = self.quic_conn.lock().unwrap();
260 
261         loop {
262             if !quic_conn.stream_writable(id, DECODE_BUF_SIZE)? {
263                 break;
264             }
265             let (data_size, _) =
266                 self.encoder
267                     .encode(id, &mut self.encoder_buf, &mut self.inst_buf)?;
268             if data_size != 0 {
269                 quic_conn.stream_send(id, &self.encoder_buf[..data_size], false)?;
270                 let _ = self.io_manager_tx.send(Ok(()));
271             } else {
272                 self.streams.set_encoding(id, false)?;
273                 break;
274             }
275         }
276         Ok(())
277     }
278 
279     pub(crate) fn poll_read_body(
280         &mut self,
281         cx: &mut Context<'_>,
282         id: u64,
283     ) -> Result<DataReadState, DispatchErrorKind> {
284         const DEFAULT_MAX_FRAME_SIZE: usize = 16 * 1024;
285         let len = std::cmp::min(
286             self.quic_conn
287                 .lock()
288                 .unwrap()
289                 .stream_capacity(id)
290                 .map_err(|_| {
291                     DispatchErrorKind::H3(H3Error::Stream(id, H3ErrorCode::H3InternalError))
292                 })?,
293             DEFAULT_MAX_FRAME_SIZE,
294         );
295         let mut buf = [0u8; DEFAULT_MAX_FRAME_SIZE];
296         self.streams.poll_sized_data(cx, id, &mut buf[..len])
297     }
298 
299     pub(crate) fn init(&mut self, config: H3Config) -> Result<(), DispatchErrorKind> {
300         self.decoder
301             .local_allowed_max_field_section_size(config.max_field_section_size() as usize);
302         self.send_settings(config)?;
303         self.open_uni_stream(QPACK_ENCODER_STREAM_TYPE)?;
304         self.open_uni_stream(QPACK_DECODER_STREAM_TYPE)?;
305         Ok(())
306     }
307 
308     pub(crate) fn open_uni_stream(&mut self, stream_type: u8) -> Result<u64, DispatchErrorKind> {
309         let buf = [stream_type];
310         let id = match stream_type {
311             CONTROL_STREAM_TYPE => {
312                 self.streams
313                     .control_stream_id()
314                     .ok_or(DispatchErrorKind::H3(H3Error::Connection(
315                         H3ErrorCode::H3InternalError,
316                     )))?
317             }
318             QPACK_ENCODER_STREAM_TYPE => {
319                 self.streams
320                     .qpack_encode_stream_id()
321                     .ok_or(DispatchErrorKind::H3(H3Error::Connection(
322                         H3ErrorCode::H3InternalError,
323                     )))?
324             }
325             QPACK_DECODER_STREAM_TYPE => {
326                 self.streams
327                     .qpack_decode_stream_id()
328                     .ok_or(DispatchErrorKind::H3(H3Error::Connection(
329                         H3ErrorCode::H3InternalError,
330                     )))?
331             }
332             _ => {
333                 return Err(DispatchErrorKind::H3(H3Error::Connection(
334                     H3ErrorCode::H3InternalError,
335                 )))
336             }
337         };
338         let mut quic_conn = self.quic_conn.lock().unwrap();
339 
340         quic_conn.stream_send(id, &buf, false)?;
341         let _ = quic_conn.stream_priority(id, 0, false);
342         Ok(id)
343     }
344 
345     pub(crate) fn send_settings(&mut self, config: H3Config) -> Result<(), DispatchErrorKind> {
346         let control_stream_id = self.open_uni_stream(CONTROL_STREAM_TYPE)?;
347 
348         let mut settings = Settings::default();
349         settings.set_max_field_section_size(config.max_field_section_size());
350         settings.set_qpack_max_table_capacity(config.qpack_max_table_capacity());
351         settings.set_qpack_block_stream(config.qpack_blocked_streams());
352 
353         let mut quic_conn = self.quic_conn.lock().unwrap();
354         let settings = Frame::new(SETTINGS_FRAME_TYPE, Payload::Settings(settings));
355         self.encoder.set_frame(control_stream_id, settings)?;
356         loop {
357             let (size, _) = self.encoder.encode(
358                 control_stream_id,
359                 &mut self.encoder_buf,
360                 &mut self.inst_buf,
361             )?;
362             if size == 0 {
363                 return Ok(());
364             }
365             quic_conn.stream_send(control_stream_id, &self.encoder_buf[..size], false)?;
366         }
367     }
368 
369     pub(crate) fn poll_stream_recv(
370         &mut self,
371         cx: &mut Context<'_>,
372     ) -> Result<(), DispatchErrorKind> {
373         let mut need_send = false;
374         let lock = self.quic_conn.clone();
375         let mut quic_conn = lock.lock().unwrap();
376 
377         if let Some(stream_id) = self.streams.peer_control_stream_id() {
378             need_send |= self.try_recv_uni_stream(cx, &mut quic_conn, stream_id)?;
379         };
380         if let Some(stream_id) = self.streams.peer_qpack_encode_stream_id() {
381             need_send |= self.try_recv_uni_stream(cx, &mut quic_conn, stream_id)?;
382         };
383         if let Some(stream_id) = self.streams.peer_qpack_decode_stream_id() {
384             need_send |= self.try_recv_uni_stream(cx, &mut quic_conn, stream_id)?;
385         };
386         for id in quic_conn.readable() {
387             if !self.streams.frame_acceptable(id) {
388                 continue;
389             }
390             need_send |= self.read_stream(cx, &mut quic_conn, id)?;
391         }
392 
393         if quic_conn.is_closed() {
394             self.shutdown(cx, &DispatchErrorKind::Disconnect);
395         }
396 
397         if need_send {
398             let _ = self.io_manager_tx.send(Ok(()));
399         }
400         Ok(())
401     }
402 
try_recv_uni_streamnull403     fn try_recv_uni_stream(
404         &mut self,
405         cx: &mut Context<'_>,
406         quic_conn: &mut QuicConn,
407         stream_id: u64,
408     ) -> Result<bool, DispatchErrorKind> {
409         if quic_conn.stream_finished(stream_id) {
410             return Err(DispatchErrorKind::H3(H3Error::Connection(
411                 H3ErrorCode::H3ClosedCriticalStream,
412             )));
413         }
414 
415         match self.read_stream(cx, quic_conn, stream_id) {
416             Ok(need_send) => {
417                 if quic_conn.stream_finished(stream_id) {
418                     return Err(DispatchErrorKind::H3(H3Error::Connection(
419                         H3ErrorCode::H3ClosedCriticalStream,
420                     )));
421                 }
422                 Ok(need_send)
423             }
424             Err(e) => Err(e),
425         }
426     }
427 
read_streamnull428     fn read_stream(
429         &mut self,
430         cx: &mut Context<'_>,
431         quic_conn: &mut QuicConn,
432         id: u64,
433     ) -> Result<bool, DispatchErrorKind> {
434         if QUICStreamType::from(id) == QUICStreamType::ServerInitialBidirectional {
435             return Err(DispatchErrorKind::H3(H3Error::Connection(
436                 H3ErrorCode::H3StreamCreationError,
437             )));
438         }
439         let mut need_send = false;
440         loop {
441             let (size, fin) = match quic_conn.stream_recv(id, &mut self.stream_recv_buf) {
442                 Ok((size, fin)) => {
443                     need_send = true;
444                     (size, fin)
445                 }
446                 Err(quiche::Error::Done) => {
447                     return Ok(need_send);
448                 }
449                 Err(quiche::Error::StreamStopped(err)) | Err(quiche::Error::StreamReset(err)) => {
450                     if err != H3ErrorCode::H3NoError as u64 {
451                         return Err(DispatchErrorKind::H3(H3Error::Stream(id, err.into())));
452                     } else {
453                         return Ok(false);
454                     }
455                 }
456                 Err(e) => {
457                     return Err(DispatchErrorKind::Quic(e));
458                 }
459             };
460             self.process_recv_data(cx, id, size, quic_conn)?;
461             if fin {
462                 self.finish_stream(cx, id)?;
463                 return Ok(true);
464             }
465         }
466     }
467 
process_recv_datanull468     fn process_recv_data(
469         &mut self,
470         cx: &mut Context<'_>,
471         id: u64,
472         size: usize,
473         quic_conn: &mut QuicConn,
474     ) -> Result<(), DispatchErrorKind> {
475         let mut stream_id = id;
476         let mut size = size;
477         loop {
478             match self
479                 .decoder
480                 .decode(stream_id, &self.stream_recv_buf[..size])
481             {
482                 Ok(StreamMessage::Request(frames)) => {
483                     self.recv_request_stream(cx, stream_id, frames, quic_conn)?;
484                 }
485                 Ok(StreamMessage::Push(_id, _frames)) => {
486                     // MAX_PUSH_ID not send, Push Stream means error
487                     return Err(DispatchErrorKind::H3(H3Error::Connection(
488                         H3ErrorCode::H3IdError,
489                     )));
490                 }
491                 Ok(StreamMessage::QpackDecoder(order)) => {
492                     self.recv_qpack_decode_stream(stream_id, order)?;
493                 }
494                 Ok(StreamMessage::Control(frames)) => {
495                     self.recv_control_stream(cx, stream_id, frames)?;
496                 }
497                 Ok(StreamMessage::WaitingMore) | Ok(StreamMessage::Unknown) => {}
498                 Ok(StreamMessage::QpackEncoder(vec)) => {
499                     self.recv_qpack_encode_stream(stream_id, vec)?;
500                 }
501                 Err(e) => {
502                     self.transmit_error(cx, stream_id, DispatchErrorKind::H3(e))?;
503                 }
504             }
505             if let Some(id) = self.streams.get_resume_stream_id() {
506                 stream_id = id;
507             } else {
508                 return Ok(());
509             };
510             size = 0;
511         }
512     }
513 
recv_qpack_encode_streamnull514     fn recv_qpack_encode_stream(
515         &mut self,
516         stream_id: u64,
517         vec: Vec<u64>,
518     ) -> Result<(), DispatchErrorKind> {
519         self.streams.set_peer_qpack_encode_stream_id(stream_id)?;
520         for resume_id in vec {
521             self.streams.resume_stream_recv(resume_id);
522         }
523         Ok(())
524     }
525 
recv_request_streamnull526     fn recv_request_stream(
527         &mut self,
528         cx: &mut Context<'_>,
529         id: u64,
530         frames: Frames,
531         quic_conn: &mut QuicConn,
532     ) -> Result<(), DispatchErrorKind> {
533         for kind in frames.into_iter() {
534             let frame = match kind {
535                 FrameKind::Complete(frame) => frame,
536                 FrameKind::Blocked => {
537                     self.streams.pend_stream_recv(id);
538                     return Ok(());
539                 }
540                 FrameKind::Partial => return Ok(()),
541             };
542             match frame.payload() {
543                 Payload::Headers(headers) => {
544                     self.send_inst_to_peer(headers, quic_conn)?;
545                     self.streams.send_frame(cx, id, *frame)?;
546                 }
547                 Payload::Data(_) => {
548                     self.streams.send_frame(cx, id, *frame)?;
549                 }
550                 Payload::PushPromise(_) => {
551                     return Err(DispatchErrorKind::H3(H3Error::Connection(
552                         H3ErrorCode::H3IdError,
553                     )))
554                 }
555                 _ => {
556                     return Err(DispatchErrorKind::H3(H3Error::Connection(
557                         H3ErrorCode::H3FrameUnexpected,
558                     )))
559                 }
560             }
561         }
562         Ok(())
563     }
564 
recv_control_streamnull565     fn recv_control_stream(
566         &mut self,
567         cx: &mut Context<'_>,
568         id: u64,
569         frames: Frames,
570     ) -> Result<(), DispatchErrorKind> {
571         let mut is_first_frame = if let Some(stream_id) = self.streams.peer_control_stream_id() {
572             if stream_id != id {
573                 return Err(DispatchErrorKind::H3(H3Error::Connection(
574                     H3ErrorCode::H3StreamCreationError,
575                 )));
576             }
577             false
578         } else {
579             self.streams.set_peer_control_stream_id(id)?;
580             true
581         };
582         for frame in frames.iter() {
583             let FrameKind::Complete(frame) = frame else {
584                 continue;
585             };
586             match frame.payload() {
587                 Payload::Settings(settings) => {
588                     self.recv_setting_frame(settings)?;
589                     is_first_frame = false;
590                 }
591                 Payload::Goaway(goaway) => {
592                     self.recv_goaway_frame(cx, *goaway.get_id())?;
593                 }
594                 Payload::CancelPush(_cancel) => {
595                     return Err(DispatchErrorKind::H3(H3Error::Connection(
596                         H3ErrorCode::H3IdError,
597                     )));
598                 }
599                 _ => {
600                     return Err(DispatchErrorKind::H3(H3Error::Connection(
601                         H3ErrorCode::H3FrameUnexpected,
602                     )));
603                 }
604             }
605             if is_first_frame {
606                 return Err(DispatchErrorKind::H3(H3Error::Connection(
607                     H3ErrorCode::H3MissingSettings,
608                 )));
609             }
610         }
611         Ok(())
612     }
613 
recv_qpack_decode_streamnull614     fn recv_qpack_decode_stream(
615         &mut self,
616         stream_id: u64,
617         order: Vec<u8>,
618     ) -> Result<(), DispatchErrorKind> {
619         self.streams.set_peer_qpack_decode_stream_id(stream_id)?;
620         self.encoder.decode_remote_inst(&order)?;
621         Ok(())
622     }
623 
recv_setting_framenull624     fn recv_setting_frame(&mut self, settings: &Settings) -> Result<(), DispatchErrorKind> {
625         if self.peer_settings.is_some() {
626             return Err(DispatchErrorKind::H3(H3Error::Connection(
627                 H3ErrorCode::H3FrameUnexpected,
628             )));
629         }
630         self.peer_settings = Some(settings.clone());
631         if let Some(value) = settings.qpack_max_table_capacity() {
632             self.encoder.set_max_table_capacity(value as usize)?;
633         }
634         if let Some(value) = settings.qpack_block_stream() {
635             self.encoder.set_max_blocked_stream_size(value as usize);
636         }
637         Ok(())
638     }
639 
recv_goaway_framenull640     fn recv_goaway_frame(
641         &mut self,
642         cx: &mut Context<'_>,
643         goaway_id: u64,
644     ) -> Result<(), DispatchErrorKind> {
645         self.io_goaway.store(true, Ordering::Relaxed);
646         self.req_rx.close();
647         self.streams.goaway(cx, goaway_id)?;
648         Ok(())
649     }
650 
handle_errornull651     fn handle_error(&mut self, cx: &mut Context<'_>, err: &DispatchErrorKind) -> bool {
652         match err {
653             DispatchErrorKind::H3(H3Error::Stream(id, e)) => {
654                 self.handle_stream_error(cx, *id, e);
655                 false
656             }
657             DispatchErrorKind::Quic(quiche::Error::InvalidStreamState(id)) => {
658                 self.handle_stream_error(cx, *id, &H3ErrorCode::H3NoError);
659                 false
660             }
661             err => {
662                 self.handle_connection_error(cx, err);
663                 true
664             }
665         }
666     }
667 
handle_stream_errornull668     fn handle_stream_error(&mut self, cx: &mut Context<'_>, id: u64, err: &H3ErrorCode) {
669         let _ = self
670             .quic_conn
671             .lock()
672             .unwrap()
673             .stream_shutdown(id, Shutdown::Read, *err as u64);
674         self.streams.shutdown_stream(cx, id, err);
675     }
676 
handle_connection_errornull677     fn handle_connection_error(&mut self, cx: &mut Context<'_>, err: &DispatchErrorKind) {
678         self.shutdown(cx, err);
679         let err = match err {
680             DispatchErrorKind::H3(H3Error::Connection(err)) => *err,
681             _ => H3ErrorCode::H3InternalError,
682         };
683         let _ = self.quic_conn.lock().unwrap().close(true, err as u64, b"");
684         let _ = self.io_manager_tx.send(Ok(()));
685         self.req_rx.close();
686     }
687 
shutdownnull688     fn shutdown(&mut self, cx: &mut Context<'_>, err: &DispatchErrorKind) {
689         self.io_shutdown.store(true, Ordering::Relaxed);
690         self.streams.shutdown(cx, err);
691     }
692 
finish_streamnull693     fn finish_stream(&mut self, cx: &mut Context<'_>, id: u64) -> Result<(), DispatchErrorKind> {
694         self.streams.finish_stream(cx, id)?;
695         self.encoder.finish_stream(id)?;
696         self.decoder.finish_stream(id)?;
697         if self.streams.goaway_id().is_some() && self.streams.current_concurrency() == 0 {
698             self.io_shutdown.store(true, Ordering::Relaxed);
699         }
700         Ok(())
701     }
702 
703     pub(crate) fn poll_blocked_message(
704         &mut self,
705         cx: &mut Context<'_>,
706     ) -> Poll<Result<(), DispatchErrorKind>> {
707         self.streams.poll_blocked_message(cx)
708     }
709 }
710 
711 impl Future for StreamManager {
712     type Output = Result<(), DispatchErrorKind>;
713 
pollnull714     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
715         let this = self.get_mut();
716         loop {
717             // 1 recv stream_manager_rx, meaning data to send/recv
718             match this.poll_recv_signal(cx) {
719                 // consume all the signals
720                 Poll::Ready(Ok(Ok(()))) => continue,
721                 Poll::Ready(Ok(Err(e))) | Poll::Ready(Err(e)) => {
722                     if this.handle_error(cx, &e) {
723                         return Poll::Ready(Err(e));
724                     }
725                 }
726                 Poll::Pending => {}
727             }
728 
729             // 2 check id's channel sendable / control/qpack/push, decode, send or cache
730             // frame if stream_recv, send io_manager_tx to io manager
731             if let Err(e) = this.poll_stream_recv(cx) {
732                 if this.handle_error(cx, &e) {
733                     return Poll::Ready(Err(e));
734                 }
735             }
736 
737             if let Poll::Ready(Err(e)) = this.poll_blocked_message(cx) {
738                 if this.handle_error(cx, &e) {
739                     return Poll::Ready(Err(e));
740                 }
741             }
742 
743             // 3 recv req_rx, check concurrency
744             loop {
745                 let req = match this.poll_recv_request(cx) {
746                     Poll::Ready(Ok(req)) => req,
747                     Poll::Ready(Err(e)) => {
748                         if this.handle_error(cx, &e) {
749                             return Poll::Ready(Err(e));
750                         }
751                         break;
752                     }
753                     Poll::Pending => break,
754                 };
755                 if let Err(e) = this.streams.new_unidirectional_stream(
756                     req.request.header,
757                     req.request.data,
758                     req.frame_tx.clone(),
759                 ) {
760                     let _ = req.frame_tx.try_send(RespMessage::OutputExit(e));
761                 }
762             }
763 
764             // 4 in concurrency stream, set frame to encoder, set encoding flag, get encode
765             // result send flag to io manager
766             if let Err(e) = this.poll_input_request(cx) {
767                 if this.handle_error(cx, &e) {
768                     return Poll::Ready(Err(e));
769                 }
770             }
771             return Poll::Pending;
772         }
773     }
774 }
775