1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 //! Streams operations utils.
15 
16 use std::cmp::{min, Ordering};
17 use std::collections::{HashMap, HashSet, VecDeque};
18 use std::task::{Context, Poll};
19 
20 use ylong_http::h2::{Data, ErrorCode, Frame, FrameFlags, H2Error, Payload, StreamId};
21 
22 use crate::runtime::UnboundedSender;
23 use crate::util::data_ref::BodyDataRef;
24 use crate::util::dispatcher::http2::DispatchErrorKind;
25 use crate::util::h2::buffer::{FlowControl, RecvWindow, SendWindow};
26 
27 pub(crate) const INITIAL_MAX_SEND_STREAM_ID: StreamId = u32::MAX >> 1;
28 pub(crate) const INITIAL_MAX_RECV_STREAM_ID: StreamId = u32::MAX >> 1;
29 
30 const DEFAULT_MAX_STREAM_ID: StreamId = u32::MAX >> 1;
31 const INITIAL_LATEST_REMOTE_ID: StreamId = 0;
32 const DEFAULT_MAX_CONCURRENT_STREAMS: u32 = 100;
33 
34 pub(crate) enum FrameRecvState {
35     OK,
36     Ignore,
37     Err(H2Error),
38 }
39 
40 pub(crate) enum DataReadState {
41     Closed,
42     // Wait for poll_read or wait for window.
43     Pending,
44     Ready(Frame),
45     Finish(Frame),
46 }
47 
48 pub(crate) enum StreamEndState {
49     OK,
50     Ignore,
51     Err(H2Error),
52 }
53 
54 //                              +--------+
55 //                      send PP |        | recv PP
56 //                     ,--------|  idle  |--------.
57 //                    /         |        |         \
58 //                   v          +--------+          v
59 //            +----------+          |           +----------+
60 //            |          |          | send H /  |          |
61 //     ,------| reserved |          | recv H    | reserved |------.
62 //     |      | (local)  |          |           | (remote) |      |
63 //     |      +----------+          v           +----------+      |
64 //     |          |             +--------+             |          |
65 //     |          |     recv ES |        | send ES     |          |
66 //     |   send H |     ,-------|  open  |-------.     | recv H   |
67 //     |          |    /        |        |        \    |          |
68 //     |          v   v         +--------+         v   v          |
69 //     |      +----------+          |           +----------+      |
70 //     |      |   half   |          |           |   half   |      |
71 //     |      |  closed  |          | send R /  |  closed  |      |
72 //     |      | (remote) |          | recv R    | (local)  |      |
73 //     |      +----------+          |           +----------+      |
74 //     |           |                |                 |           |
75 //     |           | send ES /      |       recv ES / |           |
76 //     |           | send R /       v        send R / |           |
77 //     |           | recv R     +--------+   recv R   |           |
78 //     | send R /  `----------->|        |<-----------'  send R / |
79 //     | recv R                 | closed |               recv R   |
80 //     `----------------------->|        |<----------------------'
81 //                              +--------+
82 #[derive(Copy, Clone, Debug)]
83 pub(crate) enum H2StreamState {
84     Idle,
85     // When response does not depend on request,
86     // the server can send response directly without waiting for the request to finish receiving.
87     // Therefore, the sending and receiving states of the client have their own states
88     Open {
89         send: ActiveState,
90         recv: ActiveState,
91     },
92     #[allow(dead_code)]
93     ReservedRemote,
94     // After the request is sent, the state is waiting for the response to be received.
95     LocalHalfClosed(ActiveState),
96     // When the response is received but the request is not fully sent,
97     // this indicates the status of the request being sent
98     RemoteHalfClosed(ActiveState),
99     Closed(CloseReason),
100 }
101 
102 #[derive(Copy, Clone, Debug)]
103 pub(crate) enum CloseReason {
104     LocalRst,
105     RemoteRst,
106     RemoteGoAway,
107     LocalGoAway,
108     EndStream,
109 }
110 
111 #[derive(Copy, Clone, Debug)]
112 pub(crate) enum ActiveState {
113     WaitHeaders,
114     WaitData,
115 }
116 
117 pub(crate) struct Stream {
118     pub(crate) recv_window: RecvWindow,
119     pub(crate) send_window: SendWindow,
120     pub(crate) state: H2StreamState,
121     pub(crate) header: Option<Frame>,
122     pub(crate) data: BodyDataRef,
123 }
124 
125 pub(crate) struct RequestWrapper {
126     pub(crate) flag: FrameFlags,
127     pub(crate) payload: Payload,
128     pub(crate) data: BodyDataRef,
129 }
130 
131 pub(crate) struct Streams {
132     // Records the received goaway last_stream_id.
133     pub(crate) max_send_id: StreamId,
134     // Records the send goaway last_stream_id.
135     pub(crate) max_recv_id: StreamId,
136     // Currently the client doesn't support push promise, so this value is always 0.
137     pub(crate) latest_remote_id: StreamId,
138     pub(crate) stream_recv_window_size: u32,
139     pub(crate) stream_send_window_size: u32,
140     max_concurrent_streams: u32,
141     current_concurrent_streams: u32,
142     flow_control: FlowControl,
143     pending_concurrency: VecDeque<StreamId>,
144     pending_stream_window: HashSet<u32>,
145     pending_conn_window: VecDeque<u32>,
146     pending_send: VecDeque<StreamId>,
147     window_updating_streams: VecDeque<StreamId>,
148     pub(crate) stream_map: HashMap<StreamId, Stream>,
149     pub(crate) next_stream_id: StreamId,
150 }
151 
152 macro_rules! change_stream_state {
153     (Idle: $eos: expr, $state: expr) => {
154         $state = if $eos {
155             H2StreamState::RemoteHalfClosed(ActiveState::WaitHeaders)
156         } else {
157             H2StreamState::Open {
158                 send: ActiveState::WaitHeaders,
159                 recv: ActiveState::WaitData,
160             }
161         };
162     };
163     (Open: $eos: expr, $state: expr, $send: expr) => {
164         $state = if $eos {
165             H2StreamState::RemoteHalfClosed($send.clone())
166         } else {
167             H2StreamState::Open {
168                 send: $send.clone(),
169                 recv: ActiveState::WaitData,
170             }
171         };
172     };
173     (HalfClosed: $eos: expr, $state: expr) => {
174         $state = if $eos {
175             H2StreamState::Closed(CloseReason::EndStream)
176         } else {
177             H2StreamState::LocalHalfClosed(ActiveState::WaitData)
178         };
179     };
180 }
181 
182 impl Streams {
183     pub(crate) fn new(
184         recv_window_size: u32,
185         send_window_size: u32,
186         flow_control: FlowControl,
187     ) -> Self {
188         Self {
189             max_send_id: INITIAL_MAX_SEND_STREAM_ID,
190             max_recv_id: INITIAL_MAX_RECV_STREAM_ID,
191             latest_remote_id: INITIAL_LATEST_REMOTE_ID,
192             max_concurrent_streams: DEFAULT_MAX_CONCURRENT_STREAMS,
193             current_concurrent_streams: 0,
194             stream_recv_window_size: recv_window_size,
195             stream_send_window_size: send_window_size,
196             flow_control,
197             pending_concurrency: VecDeque::new(),
198             pending_stream_window: HashSet::new(),
199             pending_conn_window: VecDeque::new(),
200             pending_send: VecDeque::new(),
201             window_updating_streams: VecDeque::new(),
202             stream_map: HashMap::new(),
203             next_stream_id: 1,
204         }
205     }
206 
207     pub(crate) fn decrease_current_concurrency(&mut self) {
208         self.current_concurrent_streams -= 1;
209     }
210 
211     pub(crate) fn increase_current_concurrency(&mut self) {
212         self.current_concurrent_streams += 1;
213     }
214 
215     pub(crate) fn reach_max_concurrency(&mut self) -> bool {
216         self.current_concurrent_streams >= self.max_concurrent_streams
217     }
218 
219     pub(crate) fn apply_max_concurrent_streams(&mut self, num: u32) {
220         self.max_concurrent_streams = num;
221     }
222 
223     pub(crate) fn apply_send_initial_window_size(&mut self, size: u32) -> Result<(), H2Error> {
224         let current = self.stream_send_window_size;
225         self.stream_send_window_size = size;
226 
227         match current.cmp(&size) {
228             Ordering::Less => {
229                 let excess = size - current;
230                 for (_id, stream) in self.stream_map.iter_mut() {
231                     stream.send_window.increase_size(excess)?;
232                 }
233                 for id in self.pending_stream_window.iter() {
234                     self.pending_send.push_back(*id);
235                 }
236                 self.pending_stream_window.clear();
237             }
238             Ordering::Greater => {
239                 let excess = current - size;
240                 for (_id, stream) in self.stream_map.iter_mut() {
241                     stream.send_window.reduce_size(excess);
242                 }
243             }
244             Ordering::Equal => {}
245         }
246         Ok(())
247     }
248 
249     pub(crate) fn apply_recv_initial_window_size(&mut self, size: u32) {
250         let current = self.stream_recv_window_size;
251         self.stream_recv_window_size = size;
252         match current.cmp(&size) {
253             Ordering::Less => {
254                 for (_id, stream) in self.stream_map.iter_mut() {
255                     let extra = size - current;
256                     stream.recv_window.increase_notification(extra);
257                     stream.recv_window.increase_actual(extra);
258                 }
259             }
260             Ordering::Greater => {
261                 for (_id, stream) in self.stream_map.iter_mut() {
262                     stream.recv_window.reduce_notification(current - size);
263                 }
264             }
265             Ordering::Equal => {}
266         }
267     }
268 
269     pub(crate) fn release_stream_recv_window(
270         &mut self,
271         id: StreamId,
272         size: u32,
273     ) -> Result<(), H2Error> {
274         if let Some(stream) = self.stream_map.get_mut(&id) {
275             if stream.recv_window.notification_available() < size {
276                 return Err(H2Error::StreamError(id, ErrorCode::FlowControlError));
277             }
278             stream.recv_window.recv_data(size);
279             if stream.recv_window.unreleased_size().is_some() {
280                 self.window_updating_streams.push_back(id);
281             }
282         }
283         Ok(())
284     }
285 
286     pub(crate) fn release_conn_recv_window(&mut self, size: u32) -> Result<(), H2Error> {
287         if self.flow_control.recv_notification_size_available() < size {
288             return Err(H2Error::ConnectionError(ErrorCode::FlowControlError));
289         }
290         self.flow_control.recv_data(size);
291         Ok(())
292     }
293 
294     pub(crate) fn is_closed(&self) -> bool {
295         for (_id, stream) in self.stream_map.iter() {
296             match stream.state {
297                 H2StreamState::Closed(_) => {}
298                 _ => {
299                     return false;
300                 }
301             }
302         }
303         true
304     }
305 
306     pub(crate) fn stream_state(&self, id: StreamId) -> Option<H2StreamState> {
307         self.stream_map.get(&id).map(|stream| stream.state)
308     }
309 
310     pub(crate) fn insert(&mut self, id: StreamId, headers: Frame, data: BodyDataRef) {
311         let send_window = SendWindow::new(self.stream_send_window_size as i32);
312         let recv_window = RecvWindow::new(self.stream_recv_window_size as i32);
313         let stream = Stream::new(recv_window, send_window, headers, data);
314         self.stream_map.insert(id, stream);
315     }
316 
317     pub(crate) fn push_back_pending_send(&mut self, id: StreamId) {
318         self.pending_send.push_back(id);
319     }
320 
321     pub(crate) fn push_pending_concurrency(&mut self, id: StreamId) {
322         self.pending_concurrency.push_back(id);
323     }
324 
325     pub(crate) fn is_pending_concurrency_empty(&self) -> bool {
326         self.pending_concurrency.is_empty()
327     }
328 
329     pub(crate) fn next_pending_stream(&mut self) -> Option<StreamId> {
330         self.pending_send.pop_front()
331     }
332 
333     pub(crate) fn pending_stream_num(&self) -> usize {
334         self.pending_send.len()
335     }
336 
337     pub(crate) fn try_consume_pending_concurrency(&mut self) {
338         while !self.reach_max_concurrency() {
339             match self.pending_concurrency.pop_front() {
340                 None => {
341                     return;
342                 }
343                 Some(id) => {
344                     self.increase_current_concurrency();
345                     self.push_back_pending_send(id);
346                 }
347             }
348         }
349     }
350 
351     pub(crate) fn increase_conn_send_window(&mut self, size: u32) -> Result<(), H2Error> {
352         self.flow_control.increase_send_size(size)
353     }
354 
355     pub(crate) fn reassign_conn_send_window(&mut self) {
356         // Since the data structure of the body is a stream,
357         // the size of a body cannot be obtained,
358         // so all streams in pending_conn_window are added to the pending_send queue
359         // again.
360         loop {
361             match self.pending_conn_window.pop_front() {
362                 None => break,
363                 Some(id) => {
364                     self.push_back_pending_send(id);
365                 }
366             }
367         }
368     }
369 
370     pub(crate) fn reassign_stream_send_window(
371         &mut self,
372         id: StreamId,
373         size: u32,
374     ) -> Result<(), H2Error> {
375         if let Some(stream) = self.stream_map.get_mut(&id) {
376             stream.send_window.increase_size(size)?;
377         }
378         if self.pending_stream_window.take(&id).is_some() {
379             self.pending_send.push_back(id);
380         }
381         Ok(())
382     }
383 
384     pub(crate) fn window_update_conn(
385         &mut self,
386         sender: &UnboundedSender<Frame>,
387     ) -> Result<(), DispatchErrorKind> {
388         if let Some(window_update) = self.flow_control.check_conn_recv_window_update() {
389             sender
390                 .send(window_update)
391                 .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
392         }
393         Ok(())
394     }
395 
396     pub(crate) fn window_update_streams(
397         &mut self,
398         sender: &UnboundedSender<Frame>,
399     ) -> Result<(), DispatchErrorKind> {
400         loop {
401             match self.window_updating_streams.pop_front() {
402                 None => return Ok(()),
403                 Some(id) => {
404                     if let Some(stream) = self.stream_map.get_mut(&id) {
405                         if !stream.is_init_or_active_flow_control() {
406                             return Ok(());
407                         }
408                         if let Some(window_update) = stream.recv_window.check_window_update(id) {
409                             sender
410                                 .send(window_update)
411                                 .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
412                         }
413                     }
414                 }
415             }
416         }
417     }
418 
419     pub(crate) fn headers(&mut self, id: StreamId) -> Result<Option<Frame>, H2Error> {
420         match self.stream_map.get_mut(&id) {
421             None => Err(H2Error::ConnectionError(ErrorCode::IntervalError)),
422             Some(stream) => match stream.state {
423                 H2StreamState::Closed(_) => Ok(None),
424                 _ => Ok(stream.header.take()),
425             },
426         }
427     }
428 
429     pub(crate) fn poll_read_body(
430         &mut self,
431         cx: &mut Context<'_>,
432         id: StreamId,
433     ) -> Result<DataReadState, H2Error> {
434         // TODO Since the Array length needs to be a constant,
435         // the minimum value is used here, which can be optimized to the MAX_FRAME_SIZE
436         // updated in SETTINGS
437         const DEFAULT_MAX_FRAME_SIZE: usize = 16 * 1024;
438 
439         match self.stream_map.get_mut(&id) {
440             None => Err(H2Error::ConnectionError(ErrorCode::IntervalError)),
441             Some(stream) => match stream.state {
442                 H2StreamState::Closed(_) => Ok(DataReadState::Closed),
443                 _ => {
444                     let stream_send_vacant = stream.send_window.size_available() as usize;
445                     if stream_send_vacant == 0 {
446                         self.pending_stream_window.insert(id);
447                         return Ok(DataReadState::Pending);
448                     }
449                     let conn_send_vacant = self.flow_control.send_size_available();
450                     if conn_send_vacant == 0 {
451                         self.pending_conn_window.push_back(id);
452                         return Ok(DataReadState::Pending);
453                     }
454 
455                     let available = min(stream_send_vacant, conn_send_vacant);
456                     let len = min(available, DEFAULT_MAX_FRAME_SIZE);
457 
458                     let mut buf = [0u8; DEFAULT_MAX_FRAME_SIZE];
459                     self.poll_sized_data(cx, id, &mut buf[..len])
460                 }
461             },
462         }
463     }
464 
poll_sized_datanull465     fn poll_sized_data(
466         &mut self,
467         cx: &mut Context<'_>,
468         id: StreamId,
469         buf: &mut [u8],
470     ) -> Result<DataReadState, H2Error> {
471         let stream = if let Some(stream) = self.stream_map.get_mut(&id) {
472             stream
473         } else {
474             return Err(H2Error::ConnectionError(ErrorCode::IntervalError));
475         };
476         match stream.data.poll_read(cx, buf) {
477             Poll::Ready(Some(size)) => {
478                 if size > 0 {
479                     stream.send_window.send_data(size as u32);
480                     self.flow_control.send_data(size as u32);
481                     let data_vec = Vec::from(&buf[..size]);
482                     let flag = FrameFlags::new(0);
483 
484                     Ok(DataReadState::Ready(Frame::new(
485                         id,
486                         flag,
487                         Payload::Data(Data::new(data_vec)),
488                     )))
489                 } else {
490                     let data_vec = vec![];
491                     let mut flag = FrameFlags::new(1);
492                     flag.set_end_stream(true);
493                     Ok(DataReadState::Finish(Frame::new(
494                         id,
495                         flag,
496                         Payload::Data(Data::new(data_vec)),
497                     )))
498                 }
499             }
500             Poll::Ready(None) => Err(H2Error::ConnectionError(ErrorCode::IntervalError)),
501             Poll::Pending => {
502                 self.push_back_pending_send(id);
503                 Ok(DataReadState::Pending)
504             }
505         }
506     }
507 
508     pub(crate) fn get_go_away_streams(&mut self, last_stream_id: StreamId) -> Vec<StreamId> {
509         let mut ids = vec![];
510         for (id, unsent_stream) in self.stream_map.iter_mut() {
511             if *id >= last_stream_id {
512                 match unsent_stream.state {
513                     // TODO Whether the close state needs to be selected.
514                     H2StreamState::Closed(_) => {}
515                     H2StreamState::Idle => {
516                         unsent_stream.state = H2StreamState::Closed(CloseReason::RemoteGoAway);
517                         unsent_stream.header = None;
518                         unsent_stream.data.clear();
519                     }
520                     _ => {
521                         self.current_concurrent_streams -= 1;
522                         unsent_stream.state = H2StreamState::Closed(CloseReason::RemoteGoAway);
523                         unsent_stream.header = None;
524                         unsent_stream.data.clear();
525                     }
526                 };
527                 ids.push(*id);
528             }
529         }
530         ids
531     }
532 
533     pub(crate) fn get_all_unclosed_streams(&mut self) -> Vec<StreamId> {
534         let mut ids = vec![];
535         for (id, stream) in self.stream_map.iter_mut() {
536             match stream.state {
537                 H2StreamState::Closed(_) => {}
538                 _ => {
539                     stream.header = None;
540                     stream.data.clear();
541                     stream.state = H2StreamState::Closed(CloseReason::LocalGoAway);
542                     ids.push(*id);
543                 }
544             }
545         }
546         ids
547     }
548 
549     pub(crate) fn clear_streams_states(&mut self) {
550         self.window_updating_streams.clear();
551         self.pending_stream_window.clear();
552         self.pending_send.clear();
553         self.pending_conn_window.clear();
554         self.pending_concurrency.clear();
555     }
556 
557     pub(crate) fn send_local_reset(&mut self, id: StreamId) -> StreamEndState {
558         return match self.stream_map.get_mut(&id) {
559             None => StreamEndState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
560             Some(stream) => match stream.state {
561                 H2StreamState::Closed(
562                     CloseReason::LocalRst
563                     | CloseReason::LocalGoAway
564                     | CloseReason::RemoteRst
565                     | CloseReason::RemoteGoAway,
566                 ) => StreamEndState::Ignore,
567                 H2StreamState::Closed(CloseReason::EndStream) => {
568                     stream.state = H2StreamState::Closed(CloseReason::LocalRst);
569                     StreamEndState::Ignore
570                 }
571                 _ => {
572                     stream.state = H2StreamState::Closed(CloseReason::LocalRst);
573                     stream.header = None;
574                     stream.data.clear();
575                     self.decrease_current_concurrency();
576                     StreamEndState::OK
577                 }
578             },
579         };
580     }
581 
582     pub(crate) fn send_headers_frame(&mut self, id: StreamId, eos: bool) -> FrameRecvState {
583         match self.stream_map.get_mut(&id) {
584             None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
585             Some(stream) => match &stream.state {
586                 H2StreamState::Idle => {
587                     stream.state = if eos {
588                         H2StreamState::LocalHalfClosed(ActiveState::WaitHeaders)
589                     } else {
590                         H2StreamState::Open {
591                             send: ActiveState::WaitData,
592                             recv: ActiveState::WaitHeaders,
593                         }
594                     };
595                 }
596                 H2StreamState::Open {
597                     send: ActiveState::WaitHeaders,
598                     recv,
599                 } => {
600                     stream.state = if eos {
601                         H2StreamState::LocalHalfClosed(*recv)
602                     } else {
603                         H2StreamState::Open {
604                             send: ActiveState::WaitData,
605                             recv: *recv,
606                         }
607                     };
608                 }
609                 H2StreamState::RemoteHalfClosed(ActiveState::WaitHeaders) => {
610                     stream.state = if eos {
611                         self.current_concurrent_streams -= 1;
612                         H2StreamState::Closed(CloseReason::EndStream)
613                     } else {
614                         H2StreamState::RemoteHalfClosed(ActiveState::WaitData)
615                     };
616                 }
617                 _ => {
618                     return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError));
619                 }
620             },
621         }
622         FrameRecvState::OK
623     }
624 
625     pub(crate) fn send_data_frame(&mut self, id: StreamId, eos: bool) -> FrameRecvState {
626         match self.stream_map.get_mut(&id) {
627             None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
628             Some(stream) => match &stream.state {
629                 H2StreamState::Open {
630                     send: ActiveState::WaitData,
631                     recv,
632                 } => {
633                     if eos {
634                         stream.state = H2StreamState::LocalHalfClosed(*recv);
635                     }
636                 }
637                 H2StreamState::RemoteHalfClosed(ActiveState::WaitData) => {
638                     if eos {
639                         self.current_concurrent_streams -= 1;
640                         stream.state = H2StreamState::Closed(CloseReason::EndStream);
641                     }
642                 }
643                 _ => {
644                     return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError));
645                 }
646             },
647         }
648         FrameRecvState::OK
649     }
650 
651     pub(crate) fn recv_remote_reset(&mut self, id: StreamId) -> StreamEndState {
652         if id > self.max_recv_id {
653             return StreamEndState::Ignore;
654         }
655         return match self.stream_map.get_mut(&id) {
656             None => StreamEndState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
657             Some(stream) => match stream.state {
658                 H2StreamState::Closed(..) => StreamEndState::Ignore,
659                 _ => {
660                     stream.state = H2StreamState::Closed(CloseReason::RemoteRst);
661                     stream.header = None;
662                     stream.data.clear();
663                     self.decrease_current_concurrency();
664                     StreamEndState::OK
665                 }
666             },
667         };
668     }
669 
670     pub(crate) fn recv_headers(&mut self, id: StreamId, eos: bool) -> FrameRecvState {
671         if id > self.max_recv_id {
672             return FrameRecvState::Ignore;
673         }
674 
675         match self.stream_map.get_mut(&id) {
676             None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
677             Some(stream) => match &stream.state {
678                 H2StreamState::Idle => {
679                     change_stream_state!(Idle: eos, stream.state);
680                 }
681                 H2StreamState::ReservedRemote => {
682                     change_stream_state!(HalfClosed: eos, stream.state);
683                     if eos {
684                         self.decrease_current_concurrency();
685                     }
686                 }
687                 H2StreamState::Open {
688                     send,
689                     recv: ActiveState::WaitHeaders,
690                 } => {
691                     change_stream_state!(Open: eos, stream.state, send);
692                 }
693                 H2StreamState::LocalHalfClosed(ActiveState::WaitHeaders) => {
694                     change_stream_state!(HalfClosed: eos, stream.state);
695                     if eos {
696                         self.decrease_current_concurrency();
697                     }
698                 }
699                 H2StreamState::Closed(CloseReason::LocalGoAway | CloseReason::LocalRst) => {
700                     return FrameRecvState::Ignore;
701                 }
702                 _ => {
703                     return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError));
704                 }
705             },
706         }
707         FrameRecvState::OK
708     }
709 
710     pub(crate) fn recv_data(&mut self, id: StreamId, eos: bool) -> FrameRecvState {
711         if id > self.max_recv_id {
712             return FrameRecvState::Ignore;
713         }
714         match self.stream_map.get_mut(&id) {
715             None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
716             Some(stream) => match &stream.state {
717                 H2StreamState::Open {
718                     send,
719                     recv: ActiveState::WaitData,
720                 } => {
721                     if eos {
722                         stream.state = H2StreamState::RemoteHalfClosed(*send);
723                     }
724                 }
725                 H2StreamState::LocalHalfClosed(ActiveState::WaitData) => {
726                     if eos {
727                         stream.state = H2StreamState::Closed(CloseReason::EndStream);
728                         self.decrease_current_concurrency();
729                     }
730                 }
731                 H2StreamState::Closed(CloseReason::LocalGoAway | CloseReason::LocalRst) => {
732                     return FrameRecvState::Ignore;
733                 }
734                 _ => {
735                     return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError));
736                 }
737             },
738         }
739         FrameRecvState::OK
740     }
741 
742     pub(crate) fn generate_id(&mut self) -> Result<StreamId, DispatchErrorKind> {
743         let id = self.next_stream_id;
744         if self.next_stream_id < DEFAULT_MAX_STREAM_ID {
745             self.next_stream_id += 2;
746             Ok(id)
747         } else {
748             Err(DispatchErrorKind::H2(H2Error::ConnectionError(
749                 ErrorCode::ProtocolError,
750             )))
751         }
752     }
753 }
754 
755 impl Stream {
756     pub(crate) fn new(
757         recv_window: RecvWindow,
758         send_window: SendWindow,
759         headers: Frame,
760         data: BodyDataRef,
761     ) -> Self {
762         Self {
763             recv_window,
764             send_window,
765             state: H2StreamState::Idle,
766             header: Some(headers),
767             data,
768         }
769     }
770 
771     pub(crate) fn is_init_or_active_flow_control(&self) -> bool {
772         matches!(
773             self.state,
774             H2StreamState::Idle
775                 | H2StreamState::Open {
776                     recv: ActiveState::WaitData,
777                     ..
778                 }
779                 | H2StreamState::LocalHalfClosed(ActiveState::WaitData)
780         )
781     }
782 }
783