1// Copyright (c) 2023 Huawei Device Co., Ltd.
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6//     http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use std::ffi::c_void;
15use std::ops::{Deref, DerefMut};
16use std::os::unix::io::RawFd;
17use std::time::Duration;
18use std::{cmp, io, mem, ptr};
19
20use libc::{c_int, uintptr_t};
21
22use crate::{EventTrait, Interest, Token};
23
24/// An wrapper for different OS polling system.
25/// Linux: epoll
26/// Windows: iocp
27/// macos: kqueue
28#[derive(Debug)]
29pub struct Selector {
30    kq: RawFd,
31}
32
33impl Selector {
34    /// Creates a new Selector.
35    ///
36    /// # Error
37    /// If the underlying syscall fails, returns the corresponding error.
38    pub fn new() -> io::Result<Selector> {
39        let kq = syscall!(kqueue())?;
40        // make sure the fd closed when child process executes
41        syscall!(fcntl(kq, libc::F_SETFD, libc::FD_CLOEXEC))?;
42
43        Ok(Selector { kq })
44    }
45
46    /// Waits for io events to come within a time limit.
47    pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
48        events.clear();
49
50        let timeout = timeout.map(|time| libc::timespec {
51            tv_sec: cmp::min(time.as_secs(), libc::time_t::MAX as u64) as libc::time_t,
52            // the cast is safe cause c_long::max > nanoseconds per second
53            tv_nsec: libc::c_long::from(time.subsec_nanos() as i32),
54        });
55
56        let timeout_ptr = match timeout.as_ref() {
57            Some(t) => t as *const libc::timespec,
58            None => ptr::null_mut(),
59        };
60
61        let n_events = syscall!(kevent(
62            self.kq,
63            ptr::null(),
64            0,
65            events.as_mut_ptr(),
66            events.capacity() as c_int,
67            timeout_ptr,
68        ))?;
69        unsafe { events.set_len(n_events as usize) };
70
71        Ok(())
72    }
73
74    /// Registers the fd with specific interested events
75    pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
76        let flags = libc::EV_CLEAR | libc::EV_RECEIPT | libc::EV_ADD;
77        let mut events = Vec::with_capacity(2);
78        if interests.is_readable() {
79            let kevent = kevent_new(fd, libc::EVFILT_READ, flags, token.0);
80            events.push(kevent);
81        }
82
83        if interests.is_writable() {
84            let kevent = kevent_new(fd, libc::EVFILT_WRITE, flags, token.0);
85            events.push(kevent);
86        }
87
88        kevent_register(self.kq, events.as_mut_slice())?;
89        kevent_check_error(events.as_mut_slice(), &[libc::EPIPE as i64])
90    }
91
92    /// Re-registers the fd with specific interested events
93    pub fn reregister(&self, fd: i32, token: Token, interests: Interest) -> io::Result<()> {
94        let flags = libc::EV_CLEAR | libc::EV_RECEIPT;
95        let mut events = Vec::with_capacity(2);
96
97        let r_flags = match interests.is_readable() {
98            true => flags | libc::EV_ADD,
99            false => flags | libc::EV_DELETE,
100        };
101
102        let w_flags = match interests.is_writable() {
103            true => flags | libc::EV_ADD,
104            false => flags | libc::EV_DELETE,
105        };
106
107        events.push(kevent_new(fd, libc::EVFILT_READ, r_flags, token.0));
108        events.push(kevent_new(fd, libc::EVFILT_WRITE, w_flags, token.0));
109        kevent_register(self.kq, events.as_mut_slice())?;
110        kevent_check_error(events.as_mut_slice(), &[libc::EPIPE as i64])
111    }
112
113    /// De-registers the fd.
114    pub fn deregister(&self, fd: i32) -> io::Result<()> {
115        let flags = libc::EV_DELETE | libc::EV_RECEIPT;
116        let mut events = vec![
117            kevent_new(fd, libc::EVFILT_READ, flags, 0),
118            kevent_new(fd, libc::EVFILT_WRITE, flags, 0),
119        ];
120        kevent_register(self.kq, events.as_mut_slice())?;
121        kevent_check_error(events.as_mut_slice(), &[libc::ENOENT as i64])
122    }
123
124    /// Try-clones the kqueue.
125    ///
126    /// If succeeds, returns a duplicate of the kqueue.
127    /// If fails, returns the last OS error.
128    pub fn try_clone(&self) -> io::Result<Selector> {
129        const LOWEST_FD: c_int = 3;
130
131        let kq = syscall!(fcntl(self.kq, libc::F_DUPFD_CLOEXEC, LOWEST_FD))?;
132        Ok(Selector { kq })
133    }
134
135    /// Allows the kqueue to accept user-space notifications. Should be called
136    /// before `Selector::wake`
137    pub fn register_waker(&self, token: Token) -> io::Result<()> {
138        let event = kevent_new(
139            0,
140            libc::EVFILT_USER,
141            libc::EV_ADD | libc::EV_CLEAR | libc::EV_RECEIPT,
142            token.0,
143        );
144
145        self.kevent_notify(event)
146    }
147
148    /// Sends a notification to wakeup the kqueue. Should be called after
149    /// `Selector::register_waker`.
150    pub fn wake(&self, token: Token) -> io::Result<()> {
151        let mut event = kevent_new(
152            0,
153            libc::EVFILT_USER,
154            libc::EV_ADD | libc::EV_RECEIPT,
155            token.0,
156        );
157        event.fflags = libc::NOTE_TRIGGER;
158        self.kevent_notify(event)
159    }
160
161    #[inline]
162    fn kevent_notify(&self, mut event: Event) -> io::Result<()> {
163        syscall!(kevent(self.kq, &event, 1, &mut event, 1, ptr::null())).map(|_| {
164            if (event.flags & libc::EV_ERROR != 0) && event.data != 0 {
165                Err(io::Error::from_raw_os_error(event.data as i32))
166            } else {
167                Ok(())
168            }
169        })?
170    }
171}
172
173#[inline]
174fn kevent_register(kq: RawFd, events: &mut [Event]) -> io::Result<()> {
175    match syscall!(kevent(
176        kq,
177        events.as_ptr(),
178        events.len() as c_int,
179        events.as_mut_ptr(),
180        events.len() as c_int,
181        ptr::null(),
182    )) {
183        Ok(_) => Ok(()),
184        Err(e) => {
185            if let Some(libc::EINTR) = e.raw_os_error() {
186                Ok(())
187            } else {
188                Err(e)
189            }
190        }
191    }
192}
193
194// this function should be called right after register
195fn kevent_check_error(events: &mut [Event], ignored: &[i64]) -> io::Result<()> {
196    for event in events {
197        let data = event.data as _;
198        if (event.flags & libc::EV_ERROR != 0) && data != 0 && !ignored.contains(&data) {
199            return Err(io::Error::from_raw_os_error(data as i32));
200        }
201    }
202    Ok(())
203}
204
205#[inline]
206fn kevent_new(ident: RawFd, filter: i16, flags: u16, udata: usize) -> Event {
207    Event {
208        ident: ident as uintptr_t,
209        filter,
210        flags,
211        udata: udata as *mut c_void,
212        ..unsafe { mem::zeroed() }
213    }
214}
215
216/// An io event
217pub type Event = libc::kevent;
218
219/// A vector of events
220pub struct Events(Vec<Event>);
221
222impl Events {
223    /// Initializes a vector of events with an initial capacity
224    pub fn with_capacity(capacity: usize) -> Self {
225        Events(Vec::with_capacity(capacity))
226    }
227}
228
229impl Deref for Events {
230    type Target = Vec<Event>;
231
232    fn deref(&self) -> &Self::Target {
233        &self.0
234    }
235}
236
237impl DerefMut for Events {
238    fn deref_mut(&mut self) -> &mut Self::Target {
239        &mut self.0
240    }
241}
242
243// kevent has a member `udata` which has type of `*mut c_void`, therefore it
244// does not automatically derive Sync/Send.
245unsafe impl Send for Events {}
246unsafe impl Sync for Events {}
247
248impl EventTrait for Event {
249    fn token(&self) -> Token {
250        Token(self.udata as usize)
251    }
252
253    fn is_readable(&self) -> bool {
254        self.filter == libc::EVFILT_READ || self.filter == libc::EVFILT_USER
255    }
256
257    fn is_writable(&self) -> bool {
258        self.filter == libc::EVFILT_WRITE
259    }
260
261    fn is_read_closed(&self) -> bool {
262        self.filter == libc::EVFILT_READ && self.flags & libc::EV_EOF != 0
263    }
264
265    fn is_write_closed(&self) -> bool {
266        self.filter == libc::EVFILT_WRITE && self.flags & libc::EV_EOF != 0
267    }
268
269    fn is_error(&self) -> bool {
270        (self.flags & libc::EV_ERROR) != 0 || ((self.flags & libc::EV_EOF) != 0 && self.fflags != 0)
271    }
272}
273
274impl Drop for Selector {
275    fn drop(&mut self) {
276        if let Err(e) = syscall!(close(self.kq)) {
277            panic!("kqueue release failed: {e}");
278        }
279    }
280}
281