1// Copyright (c) 2023 Huawei Device Co., Ltd. 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14use std::io; 15use std::os::raw::c_int; 16use std::sync::atomic::{AtomicUsize, Ordering}; 17use std::time::Duration; 18 19use crate::{EventTrait, Interest, Token}; 20 21static NEXT_ID: AtomicUsize = AtomicUsize::new(1); 22 23/// An wrapper for different OS polling system. 24/// Linux: epoll 25/// Windows: iocp 26/// macos: kqueue 27pub struct Selector { 28 // selector id 29 id: usize, 30 // epoll fd 31 ep: i32, 32} 33 34impl Selector { 35 /// Creates a new Selector. 36 /// 37 /// # Error 38 /// If the underlying syscall fails, returns the corresponding error. 39 pub fn new() -> io::Result<Selector> { 40 let ep = match syscall!(epoll_create1(libc::EPOLL_CLOEXEC)) { 41 Ok(ep_sys) => ep_sys, 42 Err(err) => { 43 return Err(err); 44 } 45 }; 46 47 Ok(Selector { 48 id: NEXT_ID.fetch_add(1, Ordering::Relaxed), 49 ep, 50 }) 51 } 52 53 /// Waits for io events to come within a time limit. 54 pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> { 55 // Convert to milliseconds, if input time is none, it means the timeout is -1 56 // and wait permanently. 57 let timeout = timeout.map(|time| time.as_millis() as c_int).unwrap_or(-1); 58 59 events.clear(); 60 61 match syscall!(epoll_wait( 62 self.ep, 63 events.as_mut_ptr(), 64 events.capacity() as i32, 65 timeout 66 )) { 67 Ok(n_events) => unsafe { events.set_len(n_events as usize) }, 68 Err(err) => { 69 return Err(err); 70 } 71 } 72 Ok(()) 73 } 74 75 /// Registers the fd with specific interested events 76 pub fn register(&self, fd: i32, token: Token, interests: Interest) -> io::Result<()> { 77 let mut sys_event = libc::epoll_event { 78 events: interests.into_io_event(), 79 u64: usize::from(token) as u64, 80 }; 81 82 match syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_ADD, fd, &mut sys_event)) { 83 Ok(_) => Ok(()), 84 Err(err) => Err(err), 85 } 86 } 87 88 /// Re-registers the fd with specific interested events 89 pub fn reregister(&self, fd: i32, token: Token, interests: Interest) -> io::Result<()> { 90 let mut sys_event = libc::epoll_event { 91 events: interests.into_io_event(), 92 u64: usize::from(token) as u64, 93 }; 94 95 match syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_MOD, fd, &mut sys_event)) { 96 Ok(_) => Ok(()), 97 Err(err) => Err(err), 98 } 99 } 100 101 /// De-registers the fd. 102 pub fn deregister(&self, fd: i32) -> io::Result<()> { 103 match syscall!(epoll_ctl( 104 self.ep, 105 libc::EPOLL_CTL_DEL, 106 fd, 107 std::ptr::null_mut() 108 )) { 109 Ok(_) => Ok(()), 110 Err(err) => Err(err), 111 } 112 } 113} 114 115impl Drop for Selector { 116 fn drop(&mut self) { 117 if let Err(_err) = syscall!(close(self.ep)) { 118 // todo: log the error 119 } 120 } 121} 122 123impl std::fmt::Debug for Selector { 124 fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 125 write!(fmt, "epoll fd: {}, Select id: {}", self.ep, self.id) 126 } 127} 128 129/// A vector of events 130pub type Events = Vec<Event>; 131 132/// An io event 133pub type Event = libc::epoll_event; 134 135impl EventTrait for Event { 136 fn token(&self) -> Token { 137 Token(self.u64 as usize) 138 } 139 140 fn is_readable(&self) -> bool { 141 (self.events as libc::c_int & libc::EPOLLIN) != 0 142 || (self.events as libc::c_int & libc::EPOLLPRI) != 0 143 } 144 145 fn is_writable(&self) -> bool { 146 (self.events as libc::c_int & libc::EPOLLOUT) != 0 147 } 148 149 fn is_read_closed(&self) -> bool { 150 self.events as libc::c_int & libc::EPOLLHUP != 0 151 || (self.events as libc::c_int & libc::EPOLLIN != 0 152 && self.events as libc::c_int & libc::EPOLLRDHUP != 0) 153 } 154 155 fn is_write_closed(&self) -> bool { 156 self.events as libc::c_int & libc::EPOLLHUP != 0 157 || (self.events as libc::c_int & libc::EPOLLOUT != 0 158 && self.events as libc::c_int & libc::EPOLLERR != 0) 159 || self.events as libc::c_int == libc::EPOLLERR 160 } 161 162 fn is_error(&self) -> bool { 163 (self.events as libc::c_int & libc::EPOLLERR) != 0 164 } 165} 166 167#[cfg(test)] 168mod test { 169 use crate::sys::socket; 170 use crate::{Event, EventTrait, Interest, Selector, Token}; 171 172 /// UT cases for `Selector::reregister`. 173 /// 174 /// # Brief 175 /// 1. Create a Selector 176 /// 2. Reregister the selector 177 #[test] 178 fn ut_epoll_reregister() { 179 let selector = Selector::new().unwrap(); 180 let sock = socket::socket_new(libc::AF_UNIX, libc::SOCK_STREAM).unwrap(); 181 let ret = selector.register(sock, Token::from_usize(0), Interest::READABLE); 182 assert!(ret.is_ok()); 183 let ret = selector.reregister(sock, Token::from_usize(0), Interest::WRITABLE); 184 assert!(ret.is_ok()); 185 } 186 187 /// UT case for `Event::is_error` 188 /// 189 /// # Brief 190 /// 1. Create an event from libc::EPOLLERR 191 /// 2. Check if it's an error 192 #[test] 193 fn ut_event_is_err() { 194 let event = Event { 195 events: libc::EPOLLERR as u32, 196 u64: 0, 197 }; 198 assert!(event.is_error()); 199 } 200} 201