1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::future::Future; 15 use std::pin::Pin; 16 use std::sync::{Arc, Mutex}; 17 use std::task::{Context, Poll}; 18 19 use ylong_runtime::time::{sleep, Sleep}; 20 21 use crate::async_impl::{ConnInfo, QuicConn}; 22 use crate::runtime::{AsyncRead, AsyncWrite, ReadBuf, UnboundedReceiver, UnboundedSender}; 23 use crate::util::dispatcher::http3::DispatchErrorKind; 24 use crate::util::h3::stream_manager::UPD_RECV_BUF_SIZE; 25 26 const UDP_SEND_BUF_SIZE: usize = 1350; 27 28 enum IOManagerState { 29 IORecving, 30 Timeout, 31 IOSending, 32 ChannelRecving, 33 } 34 35 pub(crate) struct IOManager<S> { 36 io: S, 37 conn: Arc<Mutex<QuicConn>>, 38 io_manager_rx: UnboundedReceiver<Result<(), DispatchErrorKind>>, 39 stream_manager_tx: UnboundedSender<Result<(), DispatchErrorKind>>, 40 recv_timeout: Option<Pin<Box<Sleep>>>, 41 state: IOManagerState, 42 recv_buf: [u8; UPD_RECV_BUF_SIZE], 43 send_data: SendData, 44 } 45 46 impl<S: AsyncRead + AsyncWrite + ConnInfo + Unpin + Sync + Send + 'static> IOManager<S> { 47 pub(crate) fn new( 48 io: S, 49 conn: Arc<Mutex<QuicConn>>, 50 io_manager_rx: UnboundedReceiver<Result<(), DispatchErrorKind>>, 51 stream_manager_tx: UnboundedSender<Result<(), DispatchErrorKind>>, 52 ) -> Self { 53 Self { 54 io, 55 conn, 56 io_manager_rx, 57 stream_manager_tx, 58 recv_timeout: None, 59 state: IOManagerState::IORecving, 60 recv_buf: [0u8; UPD_RECV_BUF_SIZE], 61 send_data: SendData::new(), 62 } 63 } poll_recv_signalnull64 fn poll_recv_signal( 65 &mut self, 66 cx: &mut Context<'_>, 67 ) -> Poll<Result<Result<(), DispatchErrorKind>, DispatchErrorKind>> { 68 #[cfg(feature = "tokio_base")] 69 match self.io_manager_rx.poll_recv(cx) { 70 Poll::Ready(None) => Poll::Ready(Err(DispatchErrorKind::ChannelClosed)), 71 Poll::Ready(Some(data)) => Poll::Ready(Ok(data)), 72 Poll::Pending => Poll::Pending, 73 } 74 #[cfg(feature = "ylong_base")] 75 match self.io_manager_rx.poll_recv(cx) { 76 Poll::Ready(Err(_e)) => Poll::Ready(Err(DispatchErrorKind::ChannelClosed)), 77 Poll::Ready(Ok(data)) => Poll::Ready(Ok(data)), 78 Poll::Pending => Poll::Pending, 79 } 80 } 81 poll_io_recvnull82 fn poll_io_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), DispatchErrorKind>> { 83 let mut buf = ReadBuf::new(&mut self.recv_buf); 84 if self.recv_timeout.is_none() { 85 if let Some(time) = self.conn.lock().unwrap().timeout() { 86 self.recv_timeout = Some(Box::pin(sleep(time))); 87 }; 88 } 89 90 if let Some(delay) = self.recv_timeout.as_mut() { 91 if let Poll::Ready(()) = delay.as_mut().poll(cx) { 92 self.recv_timeout = None; 93 self.conn.lock().unwrap().on_timeout(); 94 self.state = IOManagerState::Timeout; 95 return Poll::Ready(Ok(())); 96 } 97 } 98 match Pin::new(&mut self.io).poll_read(cx, &mut buf) { 99 Poll::Ready(Ok(())) => { 100 let info = self.io.conn_detail(); 101 self.recv_timeout = None; 102 let recv_info = quiche::RecvInfo { 103 to: info.local, 104 from: info.peer, 105 }; 106 return match self.conn.lock().unwrap().recv(buf.filled_mut(), recv_info) { 107 Ok(_) => { 108 let _ = self.stream_manager_tx.send(Ok(())); 109 // io recv once again 110 Poll::Ready(Ok(())) 111 } 112 Err(e) => Poll::Ready(Err(DispatchErrorKind::Quic(e))), 113 }; 114 } 115 Poll::Ready(Err(e)) => Poll::Ready(Err(DispatchErrorKind::Io(e.kind()))), 116 Poll::Pending => { 117 self.state = IOManagerState::IOSending; 118 Poll::Pending 119 } 120 } 121 } 122 poll_io_sendnull123 fn poll_io_send(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), DispatchErrorKind>> { 124 loop { 125 // UDP buf has not been sent to the peer, send rest UDP buf first 126 if self.send_data.buf_size == self.send_data.offset { 127 // Retrieve the data to be sent via UDP from the connection 128 let size = match self.conn.lock().unwrap().send(&mut self.send_data.buf) { 129 Ok((size, _)) => size, 130 Err(quiche::Error::Done) => { 131 self.state = IOManagerState::ChannelRecving; 132 return Poll::Ready(Ok(())); 133 } 134 Err(e) => { 135 return Poll::Ready(Err(DispatchErrorKind::Quic(e))); 136 } 137 }; 138 self.send_data.buf_size = size; 139 self.send_data.offset = 0; 140 } 141 142 match Pin::new(&mut self.io).poll_write( 143 cx, 144 &self.send_data.buf[self.send_data.offset..self.send_data.buf_size], 145 ) { 146 Poll::Ready(Ok(size)) => { 147 self.send_data.offset += size; 148 if self.send_data.offset != self.send_data.buf_size { 149 // loop to send UDP buf 150 continue; 151 } else { 152 self.send_data.offset = 0; 153 self.send_data.buf_size = 0; 154 } 155 } 156 Poll::Ready(Err(e)) => { 157 return Poll::Ready(Err(DispatchErrorKind::Io(e.kind()))); 158 } 159 Poll::Pending => { 160 self.state = IOManagerState::ChannelRecving; 161 return Poll::Pending; 162 } 163 } 164 } 165 } 166 } 167 168 impl<S: AsyncRead + AsyncWrite + ConnInfo + Unpin + Sync + Send + 'static> Future for IOManager<S> { 169 type Output = Result<(), DispatchErrorKind>; 170 pollnull171 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 172 let this = self.get_mut(); 173 this.state = IOManagerState::IORecving; 174 loop { 175 match this.state { 176 IOManagerState::IORecving => { 177 if let Poll::Ready(Err(e)) = this.poll_io_recv(cx) { 178 return Poll::Ready(Err(e)); 179 } 180 } 181 IOManagerState::IOSending => { 182 if let Poll::Ready(Err(e)) = this.poll_io_send(cx) { 183 return Poll::Ready(Err(e)); 184 } 185 } 186 IOManagerState::Timeout => { 187 if let Poll::Ready(Err(e)) = this.poll_io_send(cx) { 188 return Poll::Ready(Err(e)); 189 } 190 // ensure pending at io recv 191 this.state = IOManagerState::IORecving; 192 } 193 IOManagerState::ChannelRecving => match this.poll_recv_signal(cx) { 194 // won't recv Err now 195 Poll::Ready(Ok(_)) => { 196 continue; 197 } 198 Poll::Ready(Err(e)) => { 199 return Poll::Ready(Err(e)); 200 } 201 Poll::Pending => { 202 this.state = IOManagerState::IORecving; 203 return Poll::Pending; 204 } 205 }, 206 } 207 } 208 } 209 } 210 211 pub(crate) struct SendData { 212 pub(crate) buf: [u8; UDP_SEND_BUF_SIZE], 213 pub(crate) buf_size: usize, 214 pub(crate) offset: usize, 215 } 216 217 impl SendData { 218 pub(crate) fn new() -> Self { 219 Self { 220 buf: [0u8; UDP_SEND_BUF_SIZE], 221 buf_size: 0, 222 offset: 0, 223 } 224 } 225 } 226