1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::future::Future;
15 use std::pin::Pin;
16 use std::sync::{Arc, Mutex};
17 use std::task::{Context, Poll};
18 
19 use ylong_runtime::time::{sleep, Sleep};
20 
21 use crate::async_impl::{ConnInfo, QuicConn};
22 use crate::runtime::{AsyncRead, AsyncWrite, ReadBuf, UnboundedReceiver, UnboundedSender};
23 use crate::util::dispatcher::http3::DispatchErrorKind;
24 use crate::util::h3::stream_manager::UPD_RECV_BUF_SIZE;
25 
26 const UDP_SEND_BUF_SIZE: usize = 1350;
27 
28 enum IOManagerState {
29     IORecving,
30     Timeout,
31     IOSending,
32     ChannelRecving,
33 }
34 
35 pub(crate) struct IOManager<S> {
36     io: S,
37     conn: Arc<Mutex<QuicConn>>,
38     io_manager_rx: UnboundedReceiver<Result<(), DispatchErrorKind>>,
39     stream_manager_tx: UnboundedSender<Result<(), DispatchErrorKind>>,
40     recv_timeout: Option<Pin<Box<Sleep>>>,
41     state: IOManagerState,
42     recv_buf: [u8; UPD_RECV_BUF_SIZE],
43     send_data: SendData,
44 }
45 
46 impl<S: AsyncRead + AsyncWrite + ConnInfo + Unpin + Sync + Send + 'static> IOManager<S> {
47     pub(crate) fn new(
48         io: S,
49         conn: Arc<Mutex<QuicConn>>,
50         io_manager_rx: UnboundedReceiver<Result<(), DispatchErrorKind>>,
51         stream_manager_tx: UnboundedSender<Result<(), DispatchErrorKind>>,
52     ) -> Self {
53         Self {
54             io,
55             conn,
56             io_manager_rx,
57             stream_manager_tx,
58             recv_timeout: None,
59             state: IOManagerState::IORecving,
60             recv_buf: [0u8; UPD_RECV_BUF_SIZE],
61             send_data: SendData::new(),
62         }
63     }
poll_recv_signalnull64     fn poll_recv_signal(
65         &mut self,
66         cx: &mut Context<'_>,
67     ) -> Poll<Result<Result<(), DispatchErrorKind>, DispatchErrorKind>> {
68         #[cfg(feature = "tokio_base")]
69         match self.io_manager_rx.poll_recv(cx) {
70             Poll::Ready(None) => Poll::Ready(Err(DispatchErrorKind::ChannelClosed)),
71             Poll::Ready(Some(data)) => Poll::Ready(Ok(data)),
72             Poll::Pending => Poll::Pending,
73         }
74         #[cfg(feature = "ylong_base")]
75         match self.io_manager_rx.poll_recv(cx) {
76             Poll::Ready(Err(_e)) => Poll::Ready(Err(DispatchErrorKind::ChannelClosed)),
77             Poll::Ready(Ok(data)) => Poll::Ready(Ok(data)),
78             Poll::Pending => Poll::Pending,
79         }
80     }
81 
poll_io_recvnull82     fn poll_io_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), DispatchErrorKind>> {
83         let mut buf = ReadBuf::new(&mut self.recv_buf);
84         if self.recv_timeout.is_none() {
85             if let Some(time) = self.conn.lock().unwrap().timeout() {
86                 self.recv_timeout = Some(Box::pin(sleep(time)));
87             };
88         }
89 
90         if let Some(delay) = self.recv_timeout.as_mut() {
91             if let Poll::Ready(()) = delay.as_mut().poll(cx) {
92                 self.recv_timeout = None;
93                 self.conn.lock().unwrap().on_timeout();
94                 self.state = IOManagerState::Timeout;
95                 return Poll::Ready(Ok(()));
96             }
97         }
98         match Pin::new(&mut self.io).poll_read(cx, &mut buf) {
99             Poll::Ready(Ok(())) => {
100                 let info = self.io.conn_detail();
101                 self.recv_timeout = None;
102                 let recv_info = quiche::RecvInfo {
103                     to: info.local,
104                     from: info.peer,
105                 };
106                 return match self.conn.lock().unwrap().recv(buf.filled_mut(), recv_info) {
107                     Ok(_) => {
108                         let _ = self.stream_manager_tx.send(Ok(()));
109                         // io recv once again
110                         Poll::Ready(Ok(()))
111                     }
112                     Err(e) => Poll::Ready(Err(DispatchErrorKind::Quic(e))),
113                 };
114             }
115             Poll::Ready(Err(e)) => Poll::Ready(Err(DispatchErrorKind::Io(e.kind()))),
116             Poll::Pending => {
117                 self.state = IOManagerState::IOSending;
118                 Poll::Pending
119             }
120         }
121     }
122 
poll_io_sendnull123     fn poll_io_send(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), DispatchErrorKind>> {
124         loop {
125             // UDP buf has not been sent to the peer, send rest UDP buf first
126             if self.send_data.buf_size == self.send_data.offset {
127                 // Retrieve the data to be sent via UDP from the connection
128                 let size = match self.conn.lock().unwrap().send(&mut self.send_data.buf) {
129                     Ok((size, _)) => size,
130                     Err(quiche::Error::Done) => {
131                         self.state = IOManagerState::ChannelRecving;
132                         return Poll::Ready(Ok(()));
133                     }
134                     Err(e) => {
135                         return Poll::Ready(Err(DispatchErrorKind::Quic(e)));
136                     }
137                 };
138                 self.send_data.buf_size = size;
139                 self.send_data.offset = 0;
140             }
141 
142             match Pin::new(&mut self.io).poll_write(
143                 cx,
144                 &self.send_data.buf[self.send_data.offset..self.send_data.buf_size],
145             ) {
146                 Poll::Ready(Ok(size)) => {
147                     self.send_data.offset += size;
148                     if self.send_data.offset != self.send_data.buf_size {
149                         // loop to send UDP buf
150                         continue;
151                     } else {
152                         self.send_data.offset = 0;
153                         self.send_data.buf_size = 0;
154                     }
155                 }
156                 Poll::Ready(Err(e)) => {
157                     return Poll::Ready(Err(DispatchErrorKind::Io(e.kind())));
158                 }
159                 Poll::Pending => {
160                     self.state = IOManagerState::ChannelRecving;
161                     return Poll::Pending;
162                 }
163             }
164         }
165     }
166 }
167 
168 impl<S: AsyncRead + AsyncWrite + ConnInfo + Unpin + Sync + Send + 'static> Future for IOManager<S> {
169     type Output = Result<(), DispatchErrorKind>;
170 
pollnull171     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
172         let this = self.get_mut();
173         this.state = IOManagerState::IORecving;
174         loop {
175             match this.state {
176                 IOManagerState::IORecving => {
177                     if let Poll::Ready(Err(e)) = this.poll_io_recv(cx) {
178                         return Poll::Ready(Err(e));
179                     }
180                 }
181                 IOManagerState::IOSending => {
182                     if let Poll::Ready(Err(e)) = this.poll_io_send(cx) {
183                         return Poll::Ready(Err(e));
184                     }
185                 }
186                 IOManagerState::Timeout => {
187                     if let Poll::Ready(Err(e)) = this.poll_io_send(cx) {
188                         return Poll::Ready(Err(e));
189                     }
190                     // ensure pending at io recv
191                     this.state = IOManagerState::IORecving;
192                 }
193                 IOManagerState::ChannelRecving => match this.poll_recv_signal(cx) {
194                     // won't recv Err now
195                     Poll::Ready(Ok(_)) => {
196                         continue;
197                     }
198                     Poll::Ready(Err(e)) => {
199                         return Poll::Ready(Err(e));
200                     }
201                     Poll::Pending => {
202                         this.state = IOManagerState::IORecving;
203                         return Poll::Pending;
204                     }
205                 },
206             }
207         }
208     }
209 }
210 
211 pub(crate) struct SendData {
212     pub(crate) buf: [u8; UDP_SEND_BUF_SIZE],
213     pub(crate) buf_size: usize,
214     pub(crate) offset: usize,
215 }
216 
217 impl SendData {
218     pub(crate) fn new() -> Self {
219         Self {
220             buf: [0u8; UDP_SEND_BUF_SIZE],
221             buf_size: 0,
222             offset: 0,
223         }
224     }
225 }
226