1// Copyright (c) 2023 Huawei Device Co., Ltd. 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14use std::ffi::c_void; 15use std::ops::{Deref, DerefMut}; 16use std::os::unix::io::RawFd; 17use std::time::Duration; 18use std::{cmp, io, mem, ptr}; 19 20use libc::{c_int, uintptr_t}; 21 22use crate::{EventTrait, Interest, Token}; 23 24/// An wrapper for different OS polling system. 25/// Linux: epoll 26/// Windows: iocp 27/// macos: kqueue 28#[derive(Debug)] 29pub struct Selector { 30 kq: RawFd, 31} 32 33impl Selector { 34 /// Creates a new Selector. 35 /// 36 /// # Error 37 /// If the underlying syscall fails, returns the corresponding error. 38 pub fn new() -> io::Result<Selector> { 39 let kq = syscall!(kqueue())?; 40 // make sure the fd closed when child process executes 41 syscall!(fcntl(kq, libc::F_SETFD, libc::FD_CLOEXEC))?; 42 43 Ok(Selector { kq }) 44 } 45 46 /// Waits for io events to come within a time limit. 47 pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> { 48 events.clear(); 49 50 let timeout = timeout.map(|time| libc::timespec { 51 tv_sec: cmp::min(time.as_secs(), libc::time_t::MAX as u64) as libc::time_t, 52 // the cast is safe cause c_long::max > nanoseconds per second 53 tv_nsec: libc::c_long::from(time.subsec_nanos() as i32), 54 }); 55 56 let timeout_ptr = match timeout.as_ref() { 57 Some(t) => t as *const libc::timespec, 58 None => ptr::null_mut(), 59 }; 60 61 let n_events = syscall!(kevent( 62 self.kq, 63 ptr::null(), 64 0, 65 events.as_mut_ptr(), 66 events.capacity() as c_int, 67 timeout_ptr, 68 ))?; 69 unsafe { events.set_len(n_events as usize) }; 70 71 Ok(()) 72 } 73 74 /// Registers the fd with specific interested events 75 pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { 76 let flags = libc::EV_CLEAR | libc::EV_RECEIPT | libc::EV_ADD; 77 let mut events = Vec::with_capacity(2); 78 if interests.is_readable() { 79 let kevent = kevent_new(fd, libc::EVFILT_READ, flags, token.0); 80 events.push(kevent); 81 } 82 83 if interests.is_writable() { 84 let kevent = kevent_new(fd, libc::EVFILT_WRITE, flags, token.0); 85 events.push(kevent); 86 } 87 88 kevent_register(self.kq, events.as_mut_slice())?; 89 kevent_check_error(events.as_mut_slice(), &[libc::EPIPE as i64]) 90 } 91 92 /// Re-registers the fd with specific interested events 93 pub fn reregister(&self, fd: i32, token: Token, interests: Interest) -> io::Result<()> { 94 let flags = libc::EV_CLEAR | libc::EV_RECEIPT; 95 let mut events = Vec::with_capacity(2); 96 97 let r_flags = match interests.is_readable() { 98 true => flags | libc::EV_ADD, 99 false => flags | libc::EV_DELETE, 100 }; 101 102 let w_flags = match interests.is_writable() { 103 true => flags | libc::EV_ADD, 104 false => flags | libc::EV_DELETE, 105 }; 106 107 events.push(kevent_new(fd, libc::EVFILT_READ, r_flags, token.0)); 108 events.push(kevent_new(fd, libc::EVFILT_WRITE, w_flags, token.0)); 109 kevent_register(self.kq, events.as_mut_slice())?; 110 kevent_check_error(events.as_mut_slice(), &[libc::EPIPE as i64]) 111 } 112 113 /// De-registers the fd. 114 pub fn deregister(&self, fd: i32) -> io::Result<()> { 115 let flags = libc::EV_DELETE | libc::EV_RECEIPT; 116 let mut events = vec![ 117 kevent_new(fd, libc::EVFILT_READ, flags, 0), 118 kevent_new(fd, libc::EVFILT_WRITE, flags, 0), 119 ]; 120 kevent_register(self.kq, events.as_mut_slice())?; 121 kevent_check_error(events.as_mut_slice(), &[libc::ENOENT as i64]) 122 } 123 124 /// Try-clones the kqueue. 125 /// 126 /// If succeeds, returns a duplicate of the kqueue. 127 /// If fails, returns the last OS error. 128 pub fn try_clone(&self) -> io::Result<Selector> { 129 const LOWEST_FD: c_int = 3; 130 131 let kq = syscall!(fcntl(self.kq, libc::F_DUPFD_CLOEXEC, LOWEST_FD))?; 132 Ok(Selector { kq }) 133 } 134 135 /// Allows the kqueue to accept user-space notifications. Should be called 136 /// before `Selector::wake` 137 pub fn register_waker(&self, token: Token) -> io::Result<()> { 138 let event = kevent_new( 139 0, 140 libc::EVFILT_USER, 141 libc::EV_ADD | libc::EV_CLEAR | libc::EV_RECEIPT, 142 token.0, 143 ); 144 145 self.kevent_notify(event) 146 } 147 148 /// Sends a notification to wakeup the kqueue. Should be called after 149 /// `Selector::register_waker`. 150 pub fn wake(&self, token: Token) -> io::Result<()> { 151 let mut event = kevent_new( 152 0, 153 libc::EVFILT_USER, 154 libc::EV_ADD | libc::EV_RECEIPT, 155 token.0, 156 ); 157 event.fflags = libc::NOTE_TRIGGER; 158 self.kevent_notify(event) 159 } 160 161 #[inline] 162 fn kevent_notify(&self, mut event: Event) -> io::Result<()> { 163 syscall!(kevent(self.kq, &event, 1, &mut event, 1, ptr::null())).map(|_| { 164 if (event.flags & libc::EV_ERROR != 0) && event.data != 0 { 165 Err(io::Error::from_raw_os_error(event.data as i32)) 166 } else { 167 Ok(()) 168 } 169 })? 170 } 171} 172 173#[inline] 174fn kevent_register(kq: RawFd, events: &mut [Event]) -> io::Result<()> { 175 match syscall!(kevent( 176 kq, 177 events.as_ptr(), 178 events.len() as c_int, 179 events.as_mut_ptr(), 180 events.len() as c_int, 181 ptr::null(), 182 )) { 183 Ok(_) => Ok(()), 184 Err(e) => { 185 if let Some(libc::EINTR) = e.raw_os_error() { 186 Ok(()) 187 } else { 188 Err(e) 189 } 190 } 191 } 192} 193 194// this function should be called right after register 195fn kevent_check_error(events: &mut [Event], ignored: &[i64]) -> io::Result<()> { 196 for event in events { 197 let data = event.data as _; 198 if (event.flags & libc::EV_ERROR != 0) && data != 0 && !ignored.contains(&data) { 199 return Err(io::Error::from_raw_os_error(data as i32)); 200 } 201 } 202 Ok(()) 203} 204 205#[inline] 206fn kevent_new(ident: RawFd, filter: i16, flags: u16, udata: usize) -> Event { 207 Event { 208 ident: ident as uintptr_t, 209 filter, 210 flags, 211 udata: udata as *mut c_void, 212 ..unsafe { mem::zeroed() } 213 } 214} 215 216/// An io event 217pub type Event = libc::kevent; 218 219/// A vector of events 220pub struct Events(Vec<Event>); 221 222impl Events { 223 /// Initializes a vector of events with an initial capacity 224 pub fn with_capacity(capacity: usize) -> Self { 225 Events(Vec::with_capacity(capacity)) 226 } 227} 228 229impl Deref for Events { 230 type Target = Vec<Event>; 231 232 fn deref(&self) -> &Self::Target { 233 &self.0 234 } 235} 236 237impl DerefMut for Events { 238 fn deref_mut(&mut self) -> &mut Self::Target { 239 &mut self.0 240 } 241} 242 243// kevent has a member `udata` which has type of `*mut c_void`, therefore it 244// does not automatically derive Sync/Send. 245unsafe impl Send for Events {} 246unsafe impl Sync for Events {} 247 248impl EventTrait for Event { 249 fn token(&self) -> Token { 250 Token(self.udata as usize) 251 } 252 253 fn is_readable(&self) -> bool { 254 self.filter == libc::EVFILT_READ || self.filter == libc::EVFILT_USER 255 } 256 257 fn is_writable(&self) -> bool { 258 self.filter == libc::EVFILT_WRITE 259 } 260 261 fn is_read_closed(&self) -> bool { 262 self.filter == libc::EVFILT_READ && self.flags & libc::EV_EOF != 0 263 } 264 265 fn is_write_closed(&self) -> bool { 266 self.filter == libc::EVFILT_WRITE && self.flags & libc::EV_EOF != 0 267 } 268 269 fn is_error(&self) -> bool { 270 (self.flags & libc::EV_ERROR) != 0 || ((self.flags & libc::EV_EOF) != 0 && self.fflags != 0) 271 } 272} 273 274impl Drop for Selector { 275 fn drop(&mut self) { 276 if let Err(e) = syscall!(close(self.kq)) { 277 panic!("kqueue release failed: {e}"); 278 } 279 } 280} 281