1cac7dca0Sopenharmony_ci// Copyright (c) 2023 Huawei Device Co., Ltd.
2cac7dca0Sopenharmony_ci// Licensed under the Apache License, Version 2.0 (the "License");
3cac7dca0Sopenharmony_ci// you may not use this file except in compliance with the License.
4cac7dca0Sopenharmony_ci// You may obtain a copy of the License at
5cac7dca0Sopenharmony_ci//
6cac7dca0Sopenharmony_ci//     http://www.apache.org/licenses/LICENSE-2.0
7cac7dca0Sopenharmony_ci//
8cac7dca0Sopenharmony_ci// Unless required by applicable law or agreed to in writing, software
9cac7dca0Sopenharmony_ci// distributed under the License is distributed on an "AS IS" BASIS,
10cac7dca0Sopenharmony_ci// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11cac7dca0Sopenharmony_ci// See the License for the specific language governing permissions and
12cac7dca0Sopenharmony_ci// limitations under the License.
13cac7dca0Sopenharmony_ci
14cac7dca0Sopenharmony_ciuse std::ffi::c_void;
15cac7dca0Sopenharmony_ciuse std::ops::{Deref, DerefMut};
16cac7dca0Sopenharmony_ciuse std::os::unix::io::RawFd;
17cac7dca0Sopenharmony_ciuse std::time::Duration;
18cac7dca0Sopenharmony_ciuse std::{cmp, io, mem, ptr};
19cac7dca0Sopenharmony_ci
20cac7dca0Sopenharmony_ciuse libc::{c_int, uintptr_t};
21cac7dca0Sopenharmony_ci
22cac7dca0Sopenharmony_ciuse crate::{EventTrait, Interest, Token};
23cac7dca0Sopenharmony_ci
24cac7dca0Sopenharmony_ci/// An wrapper for different OS polling system.
25cac7dca0Sopenharmony_ci/// Linux: epoll
26cac7dca0Sopenharmony_ci/// Windows: iocp
27cac7dca0Sopenharmony_ci/// macos: kqueue
28cac7dca0Sopenharmony_ci#[derive(Debug)]
29cac7dca0Sopenharmony_cipub struct Selector {
30cac7dca0Sopenharmony_ci    kq: RawFd,
31cac7dca0Sopenharmony_ci}
32cac7dca0Sopenharmony_ci
33cac7dca0Sopenharmony_ciimpl Selector {
34cac7dca0Sopenharmony_ci    /// Creates a new Selector.
35cac7dca0Sopenharmony_ci    ///
36cac7dca0Sopenharmony_ci    /// # Error
37cac7dca0Sopenharmony_ci    /// If the underlying syscall fails, returns the corresponding error.
38cac7dca0Sopenharmony_ci    pub fn new() -> io::Result<Selector> {
39cac7dca0Sopenharmony_ci        let kq = syscall!(kqueue())?;
40cac7dca0Sopenharmony_ci        // make sure the fd closed when child process executes
41cac7dca0Sopenharmony_ci        syscall!(fcntl(kq, libc::F_SETFD, libc::FD_CLOEXEC))?;
42cac7dca0Sopenharmony_ci
43cac7dca0Sopenharmony_ci        Ok(Selector { kq })
44cac7dca0Sopenharmony_ci    }
45cac7dca0Sopenharmony_ci
46cac7dca0Sopenharmony_ci    /// Waits for io events to come within a time limit.
47cac7dca0Sopenharmony_ci    pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
48cac7dca0Sopenharmony_ci        events.clear();
49cac7dca0Sopenharmony_ci
50cac7dca0Sopenharmony_ci        let timeout = timeout.map(|time| libc::timespec {
51cac7dca0Sopenharmony_ci            tv_sec: cmp::min(time.as_secs(), libc::time_t::MAX as u64) as libc::time_t,
52cac7dca0Sopenharmony_ci            // the cast is safe cause c_long::max > nanoseconds per second
53cac7dca0Sopenharmony_ci            tv_nsec: libc::c_long::from(time.subsec_nanos() as i32),
54cac7dca0Sopenharmony_ci        });
55cac7dca0Sopenharmony_ci
56cac7dca0Sopenharmony_ci        let timeout_ptr = match timeout.as_ref() {
57cac7dca0Sopenharmony_ci            Some(t) => t as *const libc::timespec,
58cac7dca0Sopenharmony_ci            None => ptr::null_mut(),
59cac7dca0Sopenharmony_ci        };
60cac7dca0Sopenharmony_ci
61cac7dca0Sopenharmony_ci        let n_events = syscall!(kevent(
62cac7dca0Sopenharmony_ci            self.kq,
63cac7dca0Sopenharmony_ci            ptr::null(),
64cac7dca0Sopenharmony_ci            0,
65cac7dca0Sopenharmony_ci            events.as_mut_ptr(),
66cac7dca0Sopenharmony_ci            events.capacity() as c_int,
67cac7dca0Sopenharmony_ci            timeout_ptr,
68cac7dca0Sopenharmony_ci        ))?;
69cac7dca0Sopenharmony_ci        unsafe { events.set_len(n_events as usize) };
70cac7dca0Sopenharmony_ci
71cac7dca0Sopenharmony_ci        Ok(())
72cac7dca0Sopenharmony_ci    }
73cac7dca0Sopenharmony_ci
74cac7dca0Sopenharmony_ci    /// Registers the fd with specific interested events
75cac7dca0Sopenharmony_ci    pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
76cac7dca0Sopenharmony_ci        let flags = libc::EV_CLEAR | libc::EV_RECEIPT | libc::EV_ADD;
77cac7dca0Sopenharmony_ci        let mut events = Vec::with_capacity(2);
78cac7dca0Sopenharmony_ci        if interests.is_readable() {
79cac7dca0Sopenharmony_ci            let kevent = kevent_new(fd, libc::EVFILT_READ, flags, token.0);
80cac7dca0Sopenharmony_ci            events.push(kevent);
81cac7dca0Sopenharmony_ci        }
82cac7dca0Sopenharmony_ci
83cac7dca0Sopenharmony_ci        if interests.is_writable() {
84cac7dca0Sopenharmony_ci            let kevent = kevent_new(fd, libc::EVFILT_WRITE, flags, token.0);
85cac7dca0Sopenharmony_ci            events.push(kevent);
86cac7dca0Sopenharmony_ci        }
87cac7dca0Sopenharmony_ci
88cac7dca0Sopenharmony_ci        kevent_register(self.kq, events.as_mut_slice())?;
89cac7dca0Sopenharmony_ci        kevent_check_error(events.as_mut_slice(), &[libc::EPIPE as i64])
90cac7dca0Sopenharmony_ci    }
91cac7dca0Sopenharmony_ci
92cac7dca0Sopenharmony_ci    /// Re-registers the fd with specific interested events
93cac7dca0Sopenharmony_ci    pub fn reregister(&self, fd: i32, token: Token, interests: Interest) -> io::Result<()> {
94cac7dca0Sopenharmony_ci        let flags = libc::EV_CLEAR | libc::EV_RECEIPT;
95cac7dca0Sopenharmony_ci        let mut events = Vec::with_capacity(2);
96cac7dca0Sopenharmony_ci
97cac7dca0Sopenharmony_ci        let r_flags = match interests.is_readable() {
98cac7dca0Sopenharmony_ci            true => flags | libc::EV_ADD,
99cac7dca0Sopenharmony_ci            false => flags | libc::EV_DELETE,
100cac7dca0Sopenharmony_ci        };
101cac7dca0Sopenharmony_ci
102cac7dca0Sopenharmony_ci        let w_flags = match interests.is_writable() {
103cac7dca0Sopenharmony_ci            true => flags | libc::EV_ADD,
104cac7dca0Sopenharmony_ci            false => flags | libc::EV_DELETE,
105cac7dca0Sopenharmony_ci        };
106cac7dca0Sopenharmony_ci
107cac7dca0Sopenharmony_ci        events.push(kevent_new(fd, libc::EVFILT_READ, r_flags, token.0));
108cac7dca0Sopenharmony_ci        events.push(kevent_new(fd, libc::EVFILT_WRITE, w_flags, token.0));
109cac7dca0Sopenharmony_ci        kevent_register(self.kq, events.as_mut_slice())?;
110cac7dca0Sopenharmony_ci        kevent_check_error(events.as_mut_slice(), &[libc::EPIPE as i64])
111cac7dca0Sopenharmony_ci    }
112cac7dca0Sopenharmony_ci
113cac7dca0Sopenharmony_ci    /// De-registers the fd.
114cac7dca0Sopenharmony_ci    pub fn deregister(&self, fd: i32) -> io::Result<()> {
115cac7dca0Sopenharmony_ci        let flags = libc::EV_DELETE | libc::EV_RECEIPT;
116cac7dca0Sopenharmony_ci        let mut events = vec![
117cac7dca0Sopenharmony_ci            kevent_new(fd, libc::EVFILT_READ, flags, 0),
118cac7dca0Sopenharmony_ci            kevent_new(fd, libc::EVFILT_WRITE, flags, 0),
119cac7dca0Sopenharmony_ci        ];
120cac7dca0Sopenharmony_ci        kevent_register(self.kq, events.as_mut_slice())?;
121cac7dca0Sopenharmony_ci        kevent_check_error(events.as_mut_slice(), &[libc::ENOENT as i64])
122cac7dca0Sopenharmony_ci    }
123cac7dca0Sopenharmony_ci
124cac7dca0Sopenharmony_ci    /// Try-clones the kqueue.
125cac7dca0Sopenharmony_ci    ///
126cac7dca0Sopenharmony_ci    /// If succeeds, returns a duplicate of the kqueue.
127cac7dca0Sopenharmony_ci    /// If fails, returns the last OS error.
128cac7dca0Sopenharmony_ci    pub fn try_clone(&self) -> io::Result<Selector> {
129cac7dca0Sopenharmony_ci        const LOWEST_FD: c_int = 3;
130cac7dca0Sopenharmony_ci
131cac7dca0Sopenharmony_ci        let kq = syscall!(fcntl(self.kq, libc::F_DUPFD_CLOEXEC, LOWEST_FD))?;
132cac7dca0Sopenharmony_ci        Ok(Selector { kq })
133cac7dca0Sopenharmony_ci    }
134cac7dca0Sopenharmony_ci
135cac7dca0Sopenharmony_ci    /// Allows the kqueue to accept user-space notifications. Should be called
136cac7dca0Sopenharmony_ci    /// before `Selector::wake`
137cac7dca0Sopenharmony_ci    pub fn register_waker(&self, token: Token) -> io::Result<()> {
138cac7dca0Sopenharmony_ci        let event = kevent_new(
139cac7dca0Sopenharmony_ci            0,
140cac7dca0Sopenharmony_ci            libc::EVFILT_USER,
141cac7dca0Sopenharmony_ci            libc::EV_ADD | libc::EV_CLEAR | libc::EV_RECEIPT,
142cac7dca0Sopenharmony_ci            token.0,
143cac7dca0Sopenharmony_ci        );
144cac7dca0Sopenharmony_ci
145cac7dca0Sopenharmony_ci        self.kevent_notify(event)
146cac7dca0Sopenharmony_ci    }
147cac7dca0Sopenharmony_ci
148cac7dca0Sopenharmony_ci    /// Sends a notification to wakeup the kqueue. Should be called after
149cac7dca0Sopenharmony_ci    /// `Selector::register_waker`.
150cac7dca0Sopenharmony_ci    pub fn wake(&self, token: Token) -> io::Result<()> {
151cac7dca0Sopenharmony_ci        let mut event = kevent_new(
152cac7dca0Sopenharmony_ci            0,
153cac7dca0Sopenharmony_ci            libc::EVFILT_USER,
154cac7dca0Sopenharmony_ci            libc::EV_ADD | libc::EV_RECEIPT,
155cac7dca0Sopenharmony_ci            token.0,
156cac7dca0Sopenharmony_ci        );
157cac7dca0Sopenharmony_ci        event.fflags = libc::NOTE_TRIGGER;
158cac7dca0Sopenharmony_ci        self.kevent_notify(event)
159cac7dca0Sopenharmony_ci    }
160cac7dca0Sopenharmony_ci
161cac7dca0Sopenharmony_ci    #[inline]
162cac7dca0Sopenharmony_ci    fn kevent_notify(&self, mut event: Event) -> io::Result<()> {
163cac7dca0Sopenharmony_ci        syscall!(kevent(self.kq, &event, 1, &mut event, 1, ptr::null())).map(|_| {
164cac7dca0Sopenharmony_ci            if (event.flags & libc::EV_ERROR != 0) && event.data != 0 {
165cac7dca0Sopenharmony_ci                Err(io::Error::from_raw_os_error(event.data as i32))
166cac7dca0Sopenharmony_ci            } else {
167cac7dca0Sopenharmony_ci                Ok(())
168cac7dca0Sopenharmony_ci            }
169cac7dca0Sopenharmony_ci        })?
170cac7dca0Sopenharmony_ci    }
171cac7dca0Sopenharmony_ci}
172cac7dca0Sopenharmony_ci
173cac7dca0Sopenharmony_ci#[inline]
174cac7dca0Sopenharmony_cifn kevent_register(kq: RawFd, events: &mut [Event]) -> io::Result<()> {
175cac7dca0Sopenharmony_ci    match syscall!(kevent(
176cac7dca0Sopenharmony_ci        kq,
177cac7dca0Sopenharmony_ci        events.as_ptr(),
178cac7dca0Sopenharmony_ci        events.len() as c_int,
179cac7dca0Sopenharmony_ci        events.as_mut_ptr(),
180cac7dca0Sopenharmony_ci        events.len() as c_int,
181cac7dca0Sopenharmony_ci        ptr::null(),
182cac7dca0Sopenharmony_ci    )) {
183cac7dca0Sopenharmony_ci        Ok(_) => Ok(()),
184cac7dca0Sopenharmony_ci        Err(e) => {
185cac7dca0Sopenharmony_ci            if let Some(libc::EINTR) = e.raw_os_error() {
186cac7dca0Sopenharmony_ci                Ok(())
187cac7dca0Sopenharmony_ci            } else {
188cac7dca0Sopenharmony_ci                Err(e)
189cac7dca0Sopenharmony_ci            }
190cac7dca0Sopenharmony_ci        }
191cac7dca0Sopenharmony_ci    }
192cac7dca0Sopenharmony_ci}
193cac7dca0Sopenharmony_ci
194cac7dca0Sopenharmony_ci// this function should be called right after register
195cac7dca0Sopenharmony_cifn kevent_check_error(events: &mut [Event], ignored: &[i64]) -> io::Result<()> {
196cac7dca0Sopenharmony_ci    for event in events {
197cac7dca0Sopenharmony_ci        let data = event.data as _;
198cac7dca0Sopenharmony_ci        if (event.flags & libc::EV_ERROR != 0) && data != 0 && !ignored.contains(&data) {
199cac7dca0Sopenharmony_ci            return Err(io::Error::from_raw_os_error(data as i32));
200cac7dca0Sopenharmony_ci        }
201cac7dca0Sopenharmony_ci    }
202cac7dca0Sopenharmony_ci    Ok(())
203cac7dca0Sopenharmony_ci}
204cac7dca0Sopenharmony_ci
205cac7dca0Sopenharmony_ci#[inline]
206cac7dca0Sopenharmony_cifn kevent_new(ident: RawFd, filter: i16, flags: u16, udata: usize) -> Event {
207cac7dca0Sopenharmony_ci    Event {
208cac7dca0Sopenharmony_ci        ident: ident as uintptr_t,
209cac7dca0Sopenharmony_ci        filter,
210cac7dca0Sopenharmony_ci        flags,
211cac7dca0Sopenharmony_ci        udata: udata as *mut c_void,
212cac7dca0Sopenharmony_ci        ..unsafe { mem::zeroed() }
213cac7dca0Sopenharmony_ci    }
214cac7dca0Sopenharmony_ci}
215cac7dca0Sopenharmony_ci
216cac7dca0Sopenharmony_ci/// An io event
217cac7dca0Sopenharmony_cipub type Event = libc::kevent;
218cac7dca0Sopenharmony_ci
219cac7dca0Sopenharmony_ci/// A vector of events
220cac7dca0Sopenharmony_cipub struct Events(Vec<Event>);
221cac7dca0Sopenharmony_ci
222cac7dca0Sopenharmony_ciimpl Events {
223cac7dca0Sopenharmony_ci    /// Initializes a vector of events with an initial capacity
224cac7dca0Sopenharmony_ci    pub fn with_capacity(capacity: usize) -> Self {
225cac7dca0Sopenharmony_ci        Events(Vec::with_capacity(capacity))
226cac7dca0Sopenharmony_ci    }
227cac7dca0Sopenharmony_ci}
228cac7dca0Sopenharmony_ci
229cac7dca0Sopenharmony_ciimpl Deref for Events {
230cac7dca0Sopenharmony_ci    type Target = Vec<Event>;
231cac7dca0Sopenharmony_ci
232cac7dca0Sopenharmony_ci    fn deref(&self) -> &Self::Target {
233cac7dca0Sopenharmony_ci        &self.0
234cac7dca0Sopenharmony_ci    }
235cac7dca0Sopenharmony_ci}
236cac7dca0Sopenharmony_ci
237cac7dca0Sopenharmony_ciimpl DerefMut for Events {
238cac7dca0Sopenharmony_ci    fn deref_mut(&mut self) -> &mut Self::Target {
239cac7dca0Sopenharmony_ci        &mut self.0
240cac7dca0Sopenharmony_ci    }
241cac7dca0Sopenharmony_ci}
242cac7dca0Sopenharmony_ci
243cac7dca0Sopenharmony_ci// kevent has a member `udata` which has type of `*mut c_void`, therefore it
244cac7dca0Sopenharmony_ci// does not automatically derive Sync/Send.
245cac7dca0Sopenharmony_ciunsafe impl Send for Events {}
246cac7dca0Sopenharmony_ciunsafe impl Sync for Events {}
247cac7dca0Sopenharmony_ci
248cac7dca0Sopenharmony_ciimpl EventTrait for Event {
249cac7dca0Sopenharmony_ci    fn token(&self) -> Token {
250cac7dca0Sopenharmony_ci        Token(self.udata as usize)
251cac7dca0Sopenharmony_ci    }
252cac7dca0Sopenharmony_ci
253cac7dca0Sopenharmony_ci    fn is_readable(&self) -> bool {
254cac7dca0Sopenharmony_ci        self.filter == libc::EVFILT_READ || self.filter == libc::EVFILT_USER
255cac7dca0Sopenharmony_ci    }
256cac7dca0Sopenharmony_ci
257cac7dca0Sopenharmony_ci    fn is_writable(&self) -> bool {
258cac7dca0Sopenharmony_ci        self.filter == libc::EVFILT_WRITE
259cac7dca0Sopenharmony_ci    }
260cac7dca0Sopenharmony_ci
261cac7dca0Sopenharmony_ci    fn is_read_closed(&self) -> bool {
262cac7dca0Sopenharmony_ci        self.filter == libc::EVFILT_READ && self.flags & libc::EV_EOF != 0
263cac7dca0Sopenharmony_ci    }
264cac7dca0Sopenharmony_ci
265cac7dca0Sopenharmony_ci    fn is_write_closed(&self) -> bool {
266cac7dca0Sopenharmony_ci        self.filter == libc::EVFILT_WRITE && self.flags & libc::EV_EOF != 0
267cac7dca0Sopenharmony_ci    }
268cac7dca0Sopenharmony_ci
269cac7dca0Sopenharmony_ci    fn is_error(&self) -> bool {
270cac7dca0Sopenharmony_ci        (self.flags & libc::EV_ERROR) != 0 || ((self.flags & libc::EV_EOF) != 0 && self.fflags != 0)
271cac7dca0Sopenharmony_ci    }
272cac7dca0Sopenharmony_ci}
273cac7dca0Sopenharmony_ci
274cac7dca0Sopenharmony_ciimpl Drop for Selector {
275cac7dca0Sopenharmony_ci    fn drop(&mut self) {
276cac7dca0Sopenharmony_ci        if let Err(e) = syscall!(close(self.kq)) {
277cac7dca0Sopenharmony_ci            panic!("kqueue release failed: {e}");
278cac7dca0Sopenharmony_ci        }
279cac7dca0Sopenharmony_ci    }
280cac7dca0Sopenharmony_ci}
281