1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 //! Streams operations utils. 15 16 use std::cmp::{min, Ordering}; 17 use std::collections::{HashMap, HashSet, VecDeque}; 18 use std::task::{Context, Poll}; 19 20 use ylong_http::h2::{Data, ErrorCode, Frame, FrameFlags, H2Error, Payload, StreamId}; 21 22 use crate::runtime::UnboundedSender; 23 use crate::util::data_ref::BodyDataRef; 24 use crate::util::dispatcher::http2::DispatchErrorKind; 25 use crate::util::h2::buffer::{FlowControl, RecvWindow, SendWindow}; 26 27 pub(crate) const INITIAL_MAX_SEND_STREAM_ID: StreamId = u32::MAX >> 1; 28 pub(crate) const INITIAL_MAX_RECV_STREAM_ID: StreamId = u32::MAX >> 1; 29 30 const DEFAULT_MAX_STREAM_ID: StreamId = u32::MAX >> 1; 31 const INITIAL_LATEST_REMOTE_ID: StreamId = 0; 32 const DEFAULT_MAX_CONCURRENT_STREAMS: u32 = 100; 33 34 pub(crate) enum FrameRecvState { 35 OK, 36 Ignore, 37 Err(H2Error), 38 } 39 40 pub(crate) enum DataReadState { 41 Closed, 42 // Wait for poll_read or wait for window. 43 Pending, 44 Ready(Frame), 45 Finish(Frame), 46 } 47 48 pub(crate) enum StreamEndState { 49 OK, 50 Ignore, 51 Err(H2Error), 52 } 53 54 // +--------+ 55 // send PP | | recv PP 56 // ,--------| idle |--------. 57 // / | | \ 58 // v +--------+ v 59 // +----------+ | +----------+ 60 // | | | send H / | | 61 // ,------| reserved | | recv H | reserved |------. 62 // | | (local) | | | (remote) | | 63 // | +----------+ v +----------+ | 64 // | | +--------+ | | 65 // | | recv ES | | send ES | | 66 // | send H | ,-------| open |-------. | recv H | 67 // | | / | | \ | | 68 // | v v +--------+ v v | 69 // | +----------+ | +----------+ | 70 // | | half | | | half | | 71 // | | closed | | send R / | closed | | 72 // | | (remote) | | recv R | (local) | | 73 // | +----------+ | +----------+ | 74 // | | | | | 75 // | | send ES / | recv ES / | | 76 // | | send R / v send R / | | 77 // | | recv R +--------+ recv R | | 78 // | send R / `----------->| |<-----------' send R / | 79 // | recv R | closed | recv R | 80 // `----------------------->| |<----------------------' 81 // +--------+ 82 #[derive(Copy, Clone, Debug)] 83 pub(crate) enum H2StreamState { 84 Idle, 85 // When response does not depend on request, 86 // the server can send response directly without waiting for the request to finish receiving. 87 // Therefore, the sending and receiving states of the client have their own states 88 Open { 89 send: ActiveState, 90 recv: ActiveState, 91 }, 92 #[allow(dead_code)] 93 ReservedRemote, 94 // After the request is sent, the state is waiting for the response to be received. 95 LocalHalfClosed(ActiveState), 96 // When the response is received but the request is not fully sent, 97 // this indicates the status of the request being sent 98 RemoteHalfClosed(ActiveState), 99 Closed(CloseReason), 100 } 101 102 #[derive(Copy, Clone, Debug)] 103 pub(crate) enum CloseReason { 104 LocalRst, 105 RemoteRst, 106 RemoteGoAway, 107 LocalGoAway, 108 EndStream, 109 } 110 111 #[derive(Copy, Clone, Debug)] 112 pub(crate) enum ActiveState { 113 WaitHeaders, 114 WaitData, 115 } 116 117 pub(crate) struct Stream { 118 pub(crate) recv_window: RecvWindow, 119 pub(crate) send_window: SendWindow, 120 pub(crate) state: H2StreamState, 121 pub(crate) header: Option<Frame>, 122 pub(crate) data: BodyDataRef, 123 } 124 125 pub(crate) struct RequestWrapper { 126 pub(crate) flag: FrameFlags, 127 pub(crate) payload: Payload, 128 pub(crate) data: BodyDataRef, 129 } 130 131 pub(crate) struct Streams { 132 // Records the received goaway last_stream_id. 133 pub(crate) max_send_id: StreamId, 134 // Records the send goaway last_stream_id. 135 pub(crate) max_recv_id: StreamId, 136 // Currently the client doesn't support push promise, so this value is always 0. 137 pub(crate) latest_remote_id: StreamId, 138 pub(crate) stream_recv_window_size: u32, 139 pub(crate) stream_send_window_size: u32, 140 max_concurrent_streams: u32, 141 current_concurrent_streams: u32, 142 flow_control: FlowControl, 143 pending_concurrency: VecDeque<StreamId>, 144 pending_stream_window: HashSet<u32>, 145 pending_conn_window: VecDeque<u32>, 146 pending_send: VecDeque<StreamId>, 147 window_updating_streams: VecDeque<StreamId>, 148 pub(crate) stream_map: HashMap<StreamId, Stream>, 149 pub(crate) next_stream_id: StreamId, 150 } 151 152 macro_rules! change_stream_state { 153 (Idle: $eos: expr, $state: expr) => { 154 $state = if $eos { 155 H2StreamState::RemoteHalfClosed(ActiveState::WaitHeaders) 156 } else { 157 H2StreamState::Open { 158 send: ActiveState::WaitHeaders, 159 recv: ActiveState::WaitData, 160 } 161 }; 162 }; 163 (Open: $eos: expr, $state: expr, $send: expr) => { 164 $state = if $eos { 165 H2StreamState::RemoteHalfClosed($send.clone()) 166 } else { 167 H2StreamState::Open { 168 send: $send.clone(), 169 recv: ActiveState::WaitData, 170 } 171 }; 172 }; 173 (HalfClosed: $eos: expr, $state: expr) => { 174 $state = if $eos { 175 H2StreamState::Closed(CloseReason::EndStream) 176 } else { 177 H2StreamState::LocalHalfClosed(ActiveState::WaitData) 178 }; 179 }; 180 } 181 182 impl Streams { 183 pub(crate) fn new( 184 recv_window_size: u32, 185 send_window_size: u32, 186 flow_control: FlowControl, 187 ) -> Self { 188 Self { 189 max_send_id: INITIAL_MAX_SEND_STREAM_ID, 190 max_recv_id: INITIAL_MAX_RECV_STREAM_ID, 191 latest_remote_id: INITIAL_LATEST_REMOTE_ID, 192 max_concurrent_streams: DEFAULT_MAX_CONCURRENT_STREAMS, 193 current_concurrent_streams: 0, 194 stream_recv_window_size: recv_window_size, 195 stream_send_window_size: send_window_size, 196 flow_control, 197 pending_concurrency: VecDeque::new(), 198 pending_stream_window: HashSet::new(), 199 pending_conn_window: VecDeque::new(), 200 pending_send: VecDeque::new(), 201 window_updating_streams: VecDeque::new(), 202 stream_map: HashMap::new(), 203 next_stream_id: 1, 204 } 205 } 206 207 pub(crate) fn decrease_current_concurrency(&mut self) { 208 self.current_concurrent_streams -= 1; 209 } 210 211 pub(crate) fn increase_current_concurrency(&mut self) { 212 self.current_concurrent_streams += 1; 213 } 214 215 pub(crate) fn reach_max_concurrency(&mut self) -> bool { 216 self.current_concurrent_streams >= self.max_concurrent_streams 217 } 218 219 pub(crate) fn apply_max_concurrent_streams(&mut self, num: u32) { 220 self.max_concurrent_streams = num; 221 } 222 223 pub(crate) fn apply_send_initial_window_size(&mut self, size: u32) -> Result<(), H2Error> { 224 let current = self.stream_send_window_size; 225 self.stream_send_window_size = size; 226 227 match current.cmp(&size) { 228 Ordering::Less => { 229 let excess = size - current; 230 for (_id, stream) in self.stream_map.iter_mut() { 231 stream.send_window.increase_size(excess)?; 232 } 233 for id in self.pending_stream_window.iter() { 234 self.pending_send.push_back(*id); 235 } 236 self.pending_stream_window.clear(); 237 } 238 Ordering::Greater => { 239 let excess = current - size; 240 for (_id, stream) in self.stream_map.iter_mut() { 241 stream.send_window.reduce_size(excess); 242 } 243 } 244 Ordering::Equal => {} 245 } 246 Ok(()) 247 } 248 249 pub(crate) fn apply_recv_initial_window_size(&mut self, size: u32) { 250 let current = self.stream_recv_window_size; 251 self.stream_recv_window_size = size; 252 match current.cmp(&size) { 253 Ordering::Less => { 254 for (_id, stream) in self.stream_map.iter_mut() { 255 let extra = size - current; 256 stream.recv_window.increase_notification(extra); 257 stream.recv_window.increase_actual(extra); 258 } 259 } 260 Ordering::Greater => { 261 for (_id, stream) in self.stream_map.iter_mut() { 262 stream.recv_window.reduce_notification(current - size); 263 } 264 } 265 Ordering::Equal => {} 266 } 267 } 268 269 pub(crate) fn release_stream_recv_window( 270 &mut self, 271 id: StreamId, 272 size: u32, 273 ) -> Result<(), H2Error> { 274 if let Some(stream) = self.stream_map.get_mut(&id) { 275 if stream.recv_window.notification_available() < size { 276 return Err(H2Error::StreamError(id, ErrorCode::FlowControlError)); 277 } 278 stream.recv_window.recv_data(size); 279 if stream.recv_window.unreleased_size().is_some() { 280 self.window_updating_streams.push_back(id); 281 } 282 } 283 Ok(()) 284 } 285 286 pub(crate) fn release_conn_recv_window(&mut self, size: u32) -> Result<(), H2Error> { 287 if self.flow_control.recv_notification_size_available() < size { 288 return Err(H2Error::ConnectionError(ErrorCode::FlowControlError)); 289 } 290 self.flow_control.recv_data(size); 291 Ok(()) 292 } 293 294 pub(crate) fn is_closed(&self) -> bool { 295 for (_id, stream) in self.stream_map.iter() { 296 match stream.state { 297 H2StreamState::Closed(_) => {} 298 _ => { 299 return false; 300 } 301 } 302 } 303 true 304 } 305 306 pub(crate) fn stream_state(&self, id: StreamId) -> Option<H2StreamState> { 307 self.stream_map.get(&id).map(|stream| stream.state) 308 } 309 310 pub(crate) fn insert(&mut self, id: StreamId, headers: Frame, data: BodyDataRef) { 311 let send_window = SendWindow::new(self.stream_send_window_size as i32); 312 let recv_window = RecvWindow::new(self.stream_recv_window_size as i32); 313 let stream = Stream::new(recv_window, send_window, headers, data); 314 self.stream_map.insert(id, stream); 315 } 316 317 pub(crate) fn push_back_pending_send(&mut self, id: StreamId) { 318 self.pending_send.push_back(id); 319 } 320 321 pub(crate) fn push_pending_concurrency(&mut self, id: StreamId) { 322 self.pending_concurrency.push_back(id); 323 } 324 325 pub(crate) fn is_pending_concurrency_empty(&self) -> bool { 326 self.pending_concurrency.is_empty() 327 } 328 329 pub(crate) fn next_pending_stream(&mut self) -> Option<StreamId> { 330 self.pending_send.pop_front() 331 } 332 333 pub(crate) fn pending_stream_num(&self) -> usize { 334 self.pending_send.len() 335 } 336 337 pub(crate) fn try_consume_pending_concurrency(&mut self) { 338 while !self.reach_max_concurrency() { 339 match self.pending_concurrency.pop_front() { 340 None => { 341 return; 342 } 343 Some(id) => { 344 self.increase_current_concurrency(); 345 self.push_back_pending_send(id); 346 } 347 } 348 } 349 } 350 351 pub(crate) fn increase_conn_send_window(&mut self, size: u32) -> Result<(), H2Error> { 352 self.flow_control.increase_send_size(size) 353 } 354 355 pub(crate) fn reassign_conn_send_window(&mut self) { 356 // Since the data structure of the body is a stream, 357 // the size of a body cannot be obtained, 358 // so all streams in pending_conn_window are added to the pending_send queue 359 // again. 360 loop { 361 match self.pending_conn_window.pop_front() { 362 None => break, 363 Some(id) => { 364 self.push_back_pending_send(id); 365 } 366 } 367 } 368 } 369 370 pub(crate) fn reassign_stream_send_window( 371 &mut self, 372 id: StreamId, 373 size: u32, 374 ) -> Result<(), H2Error> { 375 if let Some(stream) = self.stream_map.get_mut(&id) { 376 stream.send_window.increase_size(size)?; 377 } 378 if self.pending_stream_window.take(&id).is_some() { 379 self.pending_send.push_back(id); 380 } 381 Ok(()) 382 } 383 384 pub(crate) fn window_update_conn( 385 &mut self, 386 sender: &UnboundedSender<Frame>, 387 ) -> Result<(), DispatchErrorKind> { 388 if let Some(window_update) = self.flow_control.check_conn_recv_window_update() { 389 sender 390 .send(window_update) 391 .map_err(|_e| DispatchErrorKind::ChannelClosed)?; 392 } 393 Ok(()) 394 } 395 396 pub(crate) fn window_update_streams( 397 &mut self, 398 sender: &UnboundedSender<Frame>, 399 ) -> Result<(), DispatchErrorKind> { 400 loop { 401 match self.window_updating_streams.pop_front() { 402 None => return Ok(()), 403 Some(id) => { 404 if let Some(stream) = self.stream_map.get_mut(&id) { 405 if !stream.is_init_or_active_flow_control() { 406 return Ok(()); 407 } 408 if let Some(window_update) = stream.recv_window.check_window_update(id) { 409 sender 410 .send(window_update) 411 .map_err(|_e| DispatchErrorKind::ChannelClosed)?; 412 } 413 } 414 } 415 } 416 } 417 } 418 419 pub(crate) fn headers(&mut self, id: StreamId) -> Result<Option<Frame>, H2Error> { 420 match self.stream_map.get_mut(&id) { 421 None => Err(H2Error::ConnectionError(ErrorCode::IntervalError)), 422 Some(stream) => match stream.state { 423 H2StreamState::Closed(_) => Ok(None), 424 _ => Ok(stream.header.take()), 425 }, 426 } 427 } 428 429 pub(crate) fn poll_read_body( 430 &mut self, 431 cx: &mut Context<'_>, 432 id: StreamId, 433 ) -> Result<DataReadState, H2Error> { 434 // TODO Since the Array length needs to be a constant, 435 // the minimum value is used here, which can be optimized to the MAX_FRAME_SIZE 436 // updated in SETTINGS 437 const DEFAULT_MAX_FRAME_SIZE: usize = 16 * 1024; 438 439 match self.stream_map.get_mut(&id) { 440 None => Err(H2Error::ConnectionError(ErrorCode::IntervalError)), 441 Some(stream) => match stream.state { 442 H2StreamState::Closed(_) => Ok(DataReadState::Closed), 443 _ => { 444 let stream_send_vacant = stream.send_window.size_available() as usize; 445 if stream_send_vacant == 0 { 446 self.pending_stream_window.insert(id); 447 return Ok(DataReadState::Pending); 448 } 449 let conn_send_vacant = self.flow_control.send_size_available(); 450 if conn_send_vacant == 0 { 451 self.pending_conn_window.push_back(id); 452 return Ok(DataReadState::Pending); 453 } 454 455 let available = min(stream_send_vacant, conn_send_vacant); 456 let len = min(available, DEFAULT_MAX_FRAME_SIZE); 457 458 let mut buf = [0u8; DEFAULT_MAX_FRAME_SIZE]; 459 self.poll_sized_data(cx, id, &mut buf[..len]) 460 } 461 }, 462 } 463 } 464 poll_sized_datanull465 fn poll_sized_data( 466 &mut self, 467 cx: &mut Context<'_>, 468 id: StreamId, 469 buf: &mut [u8], 470 ) -> Result<DataReadState, H2Error> { 471 let stream = if let Some(stream) = self.stream_map.get_mut(&id) { 472 stream 473 } else { 474 return Err(H2Error::ConnectionError(ErrorCode::IntervalError)); 475 }; 476 match stream.data.poll_read(cx, buf) { 477 Poll::Ready(Some(size)) => { 478 if size > 0 { 479 stream.send_window.send_data(size as u32); 480 self.flow_control.send_data(size as u32); 481 let data_vec = Vec::from(&buf[..size]); 482 let flag = FrameFlags::new(0); 483 484 Ok(DataReadState::Ready(Frame::new( 485 id, 486 flag, 487 Payload::Data(Data::new(data_vec)), 488 ))) 489 } else { 490 let data_vec = vec![]; 491 let mut flag = FrameFlags::new(1); 492 flag.set_end_stream(true); 493 Ok(DataReadState::Finish(Frame::new( 494 id, 495 flag, 496 Payload::Data(Data::new(data_vec)), 497 ))) 498 } 499 } 500 Poll::Ready(None) => Err(H2Error::ConnectionError(ErrorCode::IntervalError)), 501 Poll::Pending => { 502 self.push_back_pending_send(id); 503 Ok(DataReadState::Pending) 504 } 505 } 506 } 507 508 pub(crate) fn get_go_away_streams(&mut self, last_stream_id: StreamId) -> Vec<StreamId> { 509 let mut ids = vec![]; 510 for (id, unsent_stream) in self.stream_map.iter_mut() { 511 if *id >= last_stream_id { 512 match unsent_stream.state { 513 // TODO Whether the close state needs to be selected. 514 H2StreamState::Closed(_) => {} 515 H2StreamState::Idle => { 516 unsent_stream.state = H2StreamState::Closed(CloseReason::RemoteGoAway); 517 unsent_stream.header = None; 518 unsent_stream.data.clear(); 519 } 520 _ => { 521 self.current_concurrent_streams -= 1; 522 unsent_stream.state = H2StreamState::Closed(CloseReason::RemoteGoAway); 523 unsent_stream.header = None; 524 unsent_stream.data.clear(); 525 } 526 }; 527 ids.push(*id); 528 } 529 } 530 ids 531 } 532 533 pub(crate) fn get_all_unclosed_streams(&mut self) -> Vec<StreamId> { 534 let mut ids = vec![]; 535 for (id, stream) in self.stream_map.iter_mut() { 536 match stream.state { 537 H2StreamState::Closed(_) => {} 538 _ => { 539 stream.header = None; 540 stream.data.clear(); 541 stream.state = H2StreamState::Closed(CloseReason::LocalGoAway); 542 ids.push(*id); 543 } 544 } 545 } 546 ids 547 } 548 549 pub(crate) fn clear_streams_states(&mut self) { 550 self.window_updating_streams.clear(); 551 self.pending_stream_window.clear(); 552 self.pending_send.clear(); 553 self.pending_conn_window.clear(); 554 self.pending_concurrency.clear(); 555 } 556 557 pub(crate) fn send_local_reset(&mut self, id: StreamId) -> StreamEndState { 558 return match self.stream_map.get_mut(&id) { 559 None => StreamEndState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)), 560 Some(stream) => match stream.state { 561 H2StreamState::Closed( 562 CloseReason::LocalRst 563 | CloseReason::LocalGoAway 564 | CloseReason::RemoteRst 565 | CloseReason::RemoteGoAway, 566 ) => StreamEndState::Ignore, 567 H2StreamState::Closed(CloseReason::EndStream) => { 568 stream.state = H2StreamState::Closed(CloseReason::LocalRst); 569 StreamEndState::Ignore 570 } 571 _ => { 572 stream.state = H2StreamState::Closed(CloseReason::LocalRst); 573 stream.header = None; 574 stream.data.clear(); 575 self.decrease_current_concurrency(); 576 StreamEndState::OK 577 } 578 }, 579 }; 580 } 581 582 pub(crate) fn send_headers_frame(&mut self, id: StreamId, eos: bool) -> FrameRecvState { 583 match self.stream_map.get_mut(&id) { 584 None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)), 585 Some(stream) => match &stream.state { 586 H2StreamState::Idle => { 587 stream.state = if eos { 588 H2StreamState::LocalHalfClosed(ActiveState::WaitHeaders) 589 } else { 590 H2StreamState::Open { 591 send: ActiveState::WaitData, 592 recv: ActiveState::WaitHeaders, 593 } 594 }; 595 } 596 H2StreamState::Open { 597 send: ActiveState::WaitHeaders, 598 recv, 599 } => { 600 stream.state = if eos { 601 H2StreamState::LocalHalfClosed(*recv) 602 } else { 603 H2StreamState::Open { 604 send: ActiveState::WaitData, 605 recv: *recv, 606 } 607 }; 608 } 609 H2StreamState::RemoteHalfClosed(ActiveState::WaitHeaders) => { 610 stream.state = if eos { 611 self.current_concurrent_streams -= 1; 612 H2StreamState::Closed(CloseReason::EndStream) 613 } else { 614 H2StreamState::RemoteHalfClosed(ActiveState::WaitData) 615 }; 616 } 617 _ => { 618 return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)); 619 } 620 }, 621 } 622 FrameRecvState::OK 623 } 624 625 pub(crate) fn send_data_frame(&mut self, id: StreamId, eos: bool) -> FrameRecvState { 626 match self.stream_map.get_mut(&id) { 627 None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)), 628 Some(stream) => match &stream.state { 629 H2StreamState::Open { 630 send: ActiveState::WaitData, 631 recv, 632 } => { 633 if eos { 634 stream.state = H2StreamState::LocalHalfClosed(*recv); 635 } 636 } 637 H2StreamState::RemoteHalfClosed(ActiveState::WaitData) => { 638 if eos { 639 self.current_concurrent_streams -= 1; 640 stream.state = H2StreamState::Closed(CloseReason::EndStream); 641 } 642 } 643 _ => { 644 return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)); 645 } 646 }, 647 } 648 FrameRecvState::OK 649 } 650 651 pub(crate) fn recv_remote_reset(&mut self, id: StreamId) -> StreamEndState { 652 if id > self.max_recv_id { 653 return StreamEndState::Ignore; 654 } 655 return match self.stream_map.get_mut(&id) { 656 None => StreamEndState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)), 657 Some(stream) => match stream.state { 658 H2StreamState::Closed(..) => StreamEndState::Ignore, 659 _ => { 660 stream.state = H2StreamState::Closed(CloseReason::RemoteRst); 661 stream.header = None; 662 stream.data.clear(); 663 self.decrease_current_concurrency(); 664 StreamEndState::OK 665 } 666 }, 667 }; 668 } 669 670 pub(crate) fn recv_headers(&mut self, id: StreamId, eos: bool) -> FrameRecvState { 671 if id > self.max_recv_id { 672 return FrameRecvState::Ignore; 673 } 674 675 match self.stream_map.get_mut(&id) { 676 None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)), 677 Some(stream) => match &stream.state { 678 H2StreamState::Idle => { 679 change_stream_state!(Idle: eos, stream.state); 680 } 681 H2StreamState::ReservedRemote => { 682 change_stream_state!(HalfClosed: eos, stream.state); 683 if eos { 684 self.decrease_current_concurrency(); 685 } 686 } 687 H2StreamState::Open { 688 send, 689 recv: ActiveState::WaitHeaders, 690 } => { 691 change_stream_state!(Open: eos, stream.state, send); 692 } 693 H2StreamState::LocalHalfClosed(ActiveState::WaitHeaders) => { 694 change_stream_state!(HalfClosed: eos, stream.state); 695 if eos { 696 self.decrease_current_concurrency(); 697 } 698 } 699 H2StreamState::Closed(CloseReason::LocalGoAway | CloseReason::LocalRst) => { 700 return FrameRecvState::Ignore; 701 } 702 _ => { 703 return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)); 704 } 705 }, 706 } 707 FrameRecvState::OK 708 } 709 710 pub(crate) fn recv_data(&mut self, id: StreamId, eos: bool) -> FrameRecvState { 711 if id > self.max_recv_id { 712 return FrameRecvState::Ignore; 713 } 714 match self.stream_map.get_mut(&id) { 715 None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)), 716 Some(stream) => match &stream.state { 717 H2StreamState::Open { 718 send, 719 recv: ActiveState::WaitData, 720 } => { 721 if eos { 722 stream.state = H2StreamState::RemoteHalfClosed(*send); 723 } 724 } 725 H2StreamState::LocalHalfClosed(ActiveState::WaitData) => { 726 if eos { 727 stream.state = H2StreamState::Closed(CloseReason::EndStream); 728 self.decrease_current_concurrency(); 729 } 730 } 731 H2StreamState::Closed(CloseReason::LocalGoAway | CloseReason::LocalRst) => { 732 return FrameRecvState::Ignore; 733 } 734 _ => { 735 return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)); 736 } 737 }, 738 } 739 FrameRecvState::OK 740 } 741 742 pub(crate) fn generate_id(&mut self) -> Result<StreamId, DispatchErrorKind> { 743 let id = self.next_stream_id; 744 if self.next_stream_id < DEFAULT_MAX_STREAM_ID { 745 self.next_stream_id += 2; 746 Ok(id) 747 } else { 748 Err(DispatchErrorKind::H2(H2Error::ConnectionError( 749 ErrorCode::ProtocolError, 750 ))) 751 } 752 } 753 } 754 755 impl Stream { 756 pub(crate) fn new( 757 recv_window: RecvWindow, 758 send_window: SendWindow, 759 headers: Frame, 760 data: BodyDataRef, 761 ) -> Self { 762 Self { 763 recv_window, 764 send_window, 765 state: H2StreamState::Idle, 766 header: Some(headers), 767 data, 768 } 769 } 770 771 pub(crate) fn is_init_or_active_flow_control(&self) -> bool { 772 matches!( 773 self.state, 774 H2StreamState::Idle 775 | H2StreamState::Open { 776 recv: ActiveState::WaitData, 777 .. 778 } 779 | H2StreamState::LocalHalfClosed(ActiveState::WaitData) 780 ) 781 } 782 } 783