1f857971dSopenharmony_ci/* 2f857971dSopenharmony_ci * Copyright (C) 2023 Huawei Device Co., Ltd. 3f857971dSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 4f857971dSopenharmony_ci * you may not use this file except in compliance with the License. 5f857971dSopenharmony_ci * You may obtain a copy of the License at 6f857971dSopenharmony_ci * 7f857971dSopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 8f857971dSopenharmony_ci * 9f857971dSopenharmony_ci * Unless required by applicable law or agreed to in writing, software 10f857971dSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 11f857971dSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12f857971dSopenharmony_ci * See the License for the specific language governing permissions and 13f857971dSopenharmony_ci * limitations under the License. 14f857971dSopenharmony_ci */ 15f857971dSopenharmony_ci 16f857971dSopenharmony_ci//! Implementation of epoll event loop. 17f857971dSopenharmony_ci 18f857971dSopenharmony_ci#![allow(dead_code)] 19f857971dSopenharmony_ci#![allow(unused_variables)] 20f857971dSopenharmony_ci 21f857971dSopenharmony_ciuse std::collections::BTreeMap; 22f857971dSopenharmony_ciuse std::ffi::{ c_void, c_char, c_int, CString }; 23f857971dSopenharmony_ciuse std::future::Future; 24f857971dSopenharmony_ciuse std::io::Error; 25f857971dSopenharmony_ciuse std::os::fd::RawFd; 26f857971dSopenharmony_ciuse std::pin::Pin; 27f857971dSopenharmony_ciuse std::sync::{ Arc, Mutex }; 28f857971dSopenharmony_ciuse std::sync::atomic::{ AtomicBool, Ordering }; 29f857971dSopenharmony_ciuse std::task::{ Context, Poll, Waker }; 30f857971dSopenharmony_ciuse fusion_utils_rust::{ call_debug_enter, FusionErrorCode, FusionResult }; 31f857971dSopenharmony_ciuse hilog_rust::{ debug, info, error, hilog, HiLogLabel, LogType }; 32f857971dSopenharmony_ci 33f857971dSopenharmony_ci/// Indicating data other than high-priority data can be read. 34f857971dSopenharmony_cipub const LIBC_EPOLLIN: u32 = libc::EPOLLIN as u32; 35f857971dSopenharmony_ciconst LIBC_EPOLLONESHOT: u32 = libc::EPOLLONESHOT as u32; 36f857971dSopenharmony_ci/// Indicating an error has occurred. 37f857971dSopenharmony_cipub const LIBC_EPOLLERR: u32 = libc::EPOLLERR as u32; 38f857971dSopenharmony_ci/// Indicating a hangup has occurred. 39f857971dSopenharmony_cipub const LIBC_EPOLLHUP: u32 = libc::EPOLLHUP as u32; 40f857971dSopenharmony_ciconst LIBC_EPOLLALL: u32 = LIBC_EPOLLIN | LIBC_EPOLLERR | LIBC_EPOLLHUP; 41f857971dSopenharmony_ciconst LIBC_EPOLLNONE: u32 = 0; 42f857971dSopenharmony_ciconst MAX_EPOLL_EVENTS: c_int = 128; 43f857971dSopenharmony_ciconst EPOLL_SUCCESS: c_int = 0; 44f857971dSopenharmony_ciconst EPOLL_FAILURE: c_int = -1; 45f857971dSopenharmony_ciconst NO_TIMEOUT: c_int = -1; 46f857971dSopenharmony_ciconst SYSTEM_IO_FAILURE: libc::ssize_t = -1; 47f857971dSopenharmony_ciconst INVALID_FD: RawFd = -1; 48f857971dSopenharmony_ciconst LOG_LABEL: HiLogLabel = HiLogLabel { 49f857971dSopenharmony_ci log_type: LogType::LogCore, 50f857971dSopenharmony_ci domain: 0xD002220, 51f857971dSopenharmony_ci tag: "Scheduler", 52f857971dSopenharmony_ci}; 53f857971dSopenharmony_ci 54f857971dSopenharmony_ci/// Abstraction of epoll handler. 55f857971dSopenharmony_cipub trait IEpollHandler: Send + Sync { 56f857971dSopenharmony_ci /// Return file descriptor of this epoll handler. 57f857971dSopenharmony_ci fn fd(&self) -> RawFd; 58f857971dSopenharmony_ci /// Dispatch epoll events to this epoll handler. 59f857971dSopenharmony_ci fn dispatch(&self, events: u32); 60f857971dSopenharmony_ci} 61f857971dSopenharmony_ci 62f857971dSopenharmony_cistruct EpollEvent { 63f857971dSopenharmony_ci fd: RawFd, 64f857971dSopenharmony_ci events: u32, 65f857971dSopenharmony_ci} 66f857971dSopenharmony_ci 67f857971dSopenharmony_cistruct EpollHandler { 68f857971dSopenharmony_ci raw: Arc<dyn IEpollHandler>, 69f857971dSopenharmony_ci handle: ylong_runtime::task::JoinHandle<()>, 70f857971dSopenharmony_ci waker: Option<Waker>, 71f857971dSopenharmony_ci events: u32, 72f857971dSopenharmony_ci} 73f857971dSopenharmony_ci 74f857971dSopenharmony_ciimpl EpollHandler { 75f857971dSopenharmony_ci fn new(raw: Arc<dyn IEpollHandler>, handle: ylong_runtime::task::JoinHandle<()>) -> Self 76f857971dSopenharmony_ci { 77f857971dSopenharmony_ci Self { 78f857971dSopenharmony_ci raw, 79f857971dSopenharmony_ci handle, 80f857971dSopenharmony_ci waker: Default::default(), 81f857971dSopenharmony_ci events: Default::default(), 82f857971dSopenharmony_ci } 83f857971dSopenharmony_ci } 84f857971dSopenharmony_ci 85f857971dSopenharmony_ci #[inline] 86f857971dSopenharmony_ci fn fd(&self) -> RawFd 87f857971dSopenharmony_ci { 88f857971dSopenharmony_ci self.raw.fd() 89f857971dSopenharmony_ci } 90f857971dSopenharmony_ci 91f857971dSopenharmony_ci #[inline] 92f857971dSopenharmony_ci fn raw_handler(&self) -> Arc<dyn IEpollHandler> 93f857971dSopenharmony_ci { 94f857971dSopenharmony_ci self.raw.clone() 95f857971dSopenharmony_ci } 96f857971dSopenharmony_ci 97f857971dSopenharmony_ci #[inline] 98f857971dSopenharmony_ci fn set_waker(&mut self, waker: &Waker) 99f857971dSopenharmony_ci { 100f857971dSopenharmony_ci self.waker.replace(waker.clone()); 101f857971dSopenharmony_ci } 102f857971dSopenharmony_ci 103f857971dSopenharmony_ci #[inline] 104f857971dSopenharmony_ci fn take_events(&mut self) -> u32 105f857971dSopenharmony_ci { 106f857971dSopenharmony_ci let events = self.events; 107f857971dSopenharmony_ci self.events = Default::default(); 108f857971dSopenharmony_ci events 109f857971dSopenharmony_ci } 110f857971dSopenharmony_ci} 111f857971dSopenharmony_ci 112f857971dSopenharmony_ciimpl Drop for EpollHandler { 113f857971dSopenharmony_ci fn drop(&mut self) 114f857971dSopenharmony_ci { 115f857971dSopenharmony_ci self.handle.cancel(); 116f857971dSopenharmony_ci } 117f857971dSopenharmony_ci} 118f857971dSopenharmony_ci 119f857971dSopenharmony_ci/// `Driver` encapsulate event loop of epoll. 120f857971dSopenharmony_cistruct Driver { 121f857971dSopenharmony_ci epoll: Arc<Epoll>, 122f857971dSopenharmony_ci is_running: Arc<AtomicBool>, 123f857971dSopenharmony_ci} 124f857971dSopenharmony_ci 125f857971dSopenharmony_ciimpl Driver { 126f857971dSopenharmony_ci fn new(epoll: Arc<Epoll>, is_running: Arc<AtomicBool>) -> Self 127f857971dSopenharmony_ci { 128f857971dSopenharmony_ci Self { epoll, is_running } 129f857971dSopenharmony_ci } 130f857971dSopenharmony_ci 131f857971dSopenharmony_ci #[inline] 132f857971dSopenharmony_ci fn is_running(&self) -> bool 133f857971dSopenharmony_ci { 134f857971dSopenharmony_ci self.is_running.load(Ordering::Relaxed) 135f857971dSopenharmony_ci } 136f857971dSopenharmony_ci 137f857971dSopenharmony_ci fn run(&self) 138f857971dSopenharmony_ci { 139f857971dSopenharmony_ci call_debug_enter!("Driver::run"); 140f857971dSopenharmony_ci while self.is_running() { 141f857971dSopenharmony_ci if let Some(epoll_events) = self.epoll.epoll_wait() { 142f857971dSopenharmony_ci if !self.is_running() { 143f857971dSopenharmony_ci info!(LOG_LABEL, "Driver stopped running"); 144f857971dSopenharmony_ci break; 145f857971dSopenharmony_ci } 146f857971dSopenharmony_ci self.epoll.wake(&epoll_events); 147f857971dSopenharmony_ci } 148f857971dSopenharmony_ci } 149f857971dSopenharmony_ci } 150f857971dSopenharmony_ci} 151f857971dSopenharmony_ci 152f857971dSopenharmony_cistruct Epoll { 153f857971dSopenharmony_ci epoll_fd: RawFd, 154f857971dSopenharmony_ci handlers: Mutex<BTreeMap<RawFd, EpollHandler>>, 155f857971dSopenharmony_ci} 156f857971dSopenharmony_ci 157f857971dSopenharmony_ciimpl Epoll { 158f857971dSopenharmony_ci fn new() -> Self 159f857971dSopenharmony_ci { 160f857971dSopenharmony_ci // SAFETY: 161f857971dSopenharmony_ci // The epoll API is multi-thread safe. 162f857971dSopenharmony_ci // This is a normal system call, no safety pitfall. 163f857971dSopenharmony_ci let epoll_fd = unsafe { libc::epoll_create1(libc::EPOLL_CLOEXEC) }; 164f857971dSopenharmony_ci assert_ne!(epoll_fd, INVALID_FD, "epoll_create1 fail: {:?}", Error::last_os_error()); 165f857971dSopenharmony_ci Self { 166f857971dSopenharmony_ci epoll_fd, 167f857971dSopenharmony_ci handlers: Mutex::default(), 168f857971dSopenharmony_ci } 169f857971dSopenharmony_ci } 170f857971dSopenharmony_ci 171f857971dSopenharmony_ci #[inline] 172f857971dSopenharmony_ci fn fd(&self) -> RawFd 173f857971dSopenharmony_ci { 174f857971dSopenharmony_ci self.epoll_fd 175f857971dSopenharmony_ci } 176f857971dSopenharmony_ci 177f857971dSopenharmony_ci fn epoll_add(&self, fd: RawFd) -> FusionResult<()> 178f857971dSopenharmony_ci { 179f857971dSopenharmony_ci call_debug_enter!("Epoll::epoll_add"); 180f857971dSopenharmony_ci let mut ev = libc::epoll_event { 181f857971dSopenharmony_ci events: LIBC_EPOLLIN | LIBC_EPOLLONESHOT | LIBC_EPOLLHUP | LIBC_EPOLLERR, 182f857971dSopenharmony_ci u64: fd as u64, 183f857971dSopenharmony_ci }; 184f857971dSopenharmony_ci // SAFETY: 185f857971dSopenharmony_ci // The epoll API is multi-thread safe. 186f857971dSopenharmony_ci // We have carefully ensure that parameters are as required by system interface. 187f857971dSopenharmony_ci let ret = unsafe { 188f857971dSopenharmony_ci libc::epoll_ctl(self.epoll_fd, libc::EPOLL_CTL_ADD, fd, &mut ev) 189f857971dSopenharmony_ci }; 190f857971dSopenharmony_ci if ret != EPOLL_SUCCESS { 191f857971dSopenharmony_ci error!(LOG_LABEL, "epoll_ctl_add({},{}) fail: {:?}", 192f857971dSopenharmony_ci @public(self.epoll_fd), @public(fd), @public(Error::last_os_error())); 193f857971dSopenharmony_ci Err(FusionErrorCode::Fail) 194f857971dSopenharmony_ci } else { 195f857971dSopenharmony_ci Ok(()) 196f857971dSopenharmony_ci } 197f857971dSopenharmony_ci } 198f857971dSopenharmony_ci 199f857971dSopenharmony_ci fn epoll_del(&self, fd: RawFd) -> FusionResult<()> 200f857971dSopenharmony_ci { 201f857971dSopenharmony_ci call_debug_enter!("Epoll::epoll_del"); 202f857971dSopenharmony_ci // SAFETY: 203f857971dSopenharmony_ci // The epoll API is multi-thread safe. 204f857971dSopenharmony_ci // We have carefully ensure that parameters are as required by system interface. 205f857971dSopenharmony_ci let ret = unsafe { 206f857971dSopenharmony_ci libc::epoll_ctl(self.epoll_fd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut()) 207f857971dSopenharmony_ci }; 208f857971dSopenharmony_ci if ret != EPOLL_SUCCESS { 209f857971dSopenharmony_ci error!(LOG_LABEL, "epoll_ctl_remove({},{}) fail: {:?}", 210f857971dSopenharmony_ci @public(self.epoll_fd), @public(fd), @public(Error::last_os_error())); 211f857971dSopenharmony_ci Err(FusionErrorCode::Fail) 212f857971dSopenharmony_ci } else { 213f857971dSopenharmony_ci Ok(()) 214f857971dSopenharmony_ci } 215f857971dSopenharmony_ci } 216f857971dSopenharmony_ci 217f857971dSopenharmony_ci fn epoll_wait(&self) -> Option<Vec<EpollEvent>> 218f857971dSopenharmony_ci { 219f857971dSopenharmony_ci call_debug_enter!("Epoll::epoll_wait"); 220f857971dSopenharmony_ci let mut events: Vec<libc::epoll_event> = Vec::with_capacity(MAX_EPOLL_EVENTS as usize); 221f857971dSopenharmony_ci // SAFETY: 222f857971dSopenharmony_ci // The epoll API is multi-thread safe. 223f857971dSopenharmony_ci // We have carefully ensure that parameters are as required by system interface. 224f857971dSopenharmony_ci let ret = unsafe { 225f857971dSopenharmony_ci libc::epoll_wait(self.epoll_fd, events.as_mut_ptr(), MAX_EPOLL_EVENTS, NO_TIMEOUT) 226f857971dSopenharmony_ci }; 227f857971dSopenharmony_ci if ret < 0 { 228f857971dSopenharmony_ci error!(LOG_LABEL, "epoll_wait({}) fail: {:?}", 229f857971dSopenharmony_ci @public(self.epoll_fd), 230f857971dSopenharmony_ci @public(Error::last_os_error())); 231f857971dSopenharmony_ci return None; 232f857971dSopenharmony_ci } 233f857971dSopenharmony_ci let num_of_events = ret as usize; 234f857971dSopenharmony_ci // SAFETY: 235f857971dSopenharmony_ci // `epoll_wait` returns the number of events and promise it is within the limit of 236f857971dSopenharmony_ci // `MAX_EPOLL_EVENTS`. 237f857971dSopenharmony_ci let epoll_events = unsafe { 238f857971dSopenharmony_ci std::slice::from_raw_parts(events.as_ptr(), num_of_events) 239f857971dSopenharmony_ci }; 240f857971dSopenharmony_ci let epoll_events: Vec<EpollEvent> = epoll_events.iter().map(|e| { 241f857971dSopenharmony_ci EpollEvent { 242f857971dSopenharmony_ci fd: e.u64 as RawFd, 243f857971dSopenharmony_ci events: e.events, 244f857971dSopenharmony_ci } 245f857971dSopenharmony_ci }).collect(); 246f857971dSopenharmony_ci Some(epoll_events) 247f857971dSopenharmony_ci } 248f857971dSopenharmony_ci 249f857971dSopenharmony_ci fn epoll_reset(&self, fd: RawFd) -> FusionResult<()> 250f857971dSopenharmony_ci { 251f857971dSopenharmony_ci call_debug_enter!("Epoll::epoll_reset"); 252f857971dSopenharmony_ci let mut ev = libc::epoll_event { 253f857971dSopenharmony_ci events: LIBC_EPOLLIN | LIBC_EPOLLONESHOT | LIBC_EPOLLHUP | LIBC_EPOLLERR, 254f857971dSopenharmony_ci u64: fd as u64, 255f857971dSopenharmony_ci }; 256f857971dSopenharmony_ci // SAFETY: 257f857971dSopenharmony_ci // The epoll API is multi-thread safe. 258f857971dSopenharmony_ci // We have carefully ensure that parameters are as required by system interface. 259f857971dSopenharmony_ci let ret = unsafe { 260f857971dSopenharmony_ci libc::epoll_ctl(self.epoll_fd, libc::EPOLL_CTL_MOD, fd, &mut ev) 261f857971dSopenharmony_ci }; 262f857971dSopenharmony_ci if ret != EPOLL_SUCCESS { 263f857971dSopenharmony_ci error!(LOG_LABEL, "In reset_fd, epoll_ctl_mod({},{}) fail: {:?}", 264f857971dSopenharmony_ci @public(self.epoll_fd), @public(fd), @public(Error::last_os_error())); 265f857971dSopenharmony_ci Err(FusionErrorCode::Fail) 266f857971dSopenharmony_ci } else { 267f857971dSopenharmony_ci Ok(()) 268f857971dSopenharmony_ci } 269f857971dSopenharmony_ci } 270f857971dSopenharmony_ci 271f857971dSopenharmony_ci fn add_epoll_handler(&self, fd: RawFd, epoll_handler: EpollHandler) 272f857971dSopenharmony_ci -> FusionResult<Arc<dyn IEpollHandler>> 273f857971dSopenharmony_ci { 274f857971dSopenharmony_ci call_debug_enter!("Epoll::add_epoll_handler"); 275f857971dSopenharmony_ci let mut guard = self.handlers.lock().unwrap(); 276f857971dSopenharmony_ci if guard.contains_key(&fd) { 277f857971dSopenharmony_ci error!(LOG_LABEL, "Epoll handler ({}) has been added", @public(fd)); 278f857971dSopenharmony_ci return Err(FusionErrorCode::Fail); 279f857971dSopenharmony_ci } 280f857971dSopenharmony_ci debug!(LOG_LABEL, "Add epoll handler ({})", @public(fd)); 281f857971dSopenharmony_ci let raw = epoll_handler.raw_handler(); 282f857971dSopenharmony_ci guard.insert(fd, epoll_handler); 283f857971dSopenharmony_ci let _ = self.epoll_add(fd); 284f857971dSopenharmony_ci Ok(raw) 285f857971dSopenharmony_ci } 286f857971dSopenharmony_ci 287f857971dSopenharmony_ci fn remove_epoll_handler(&self, fd: RawFd) -> FusionResult<Arc<dyn IEpollHandler>> 288f857971dSopenharmony_ci { 289f857971dSopenharmony_ci call_debug_enter!("Epoll::remove_epoll_handler"); 290f857971dSopenharmony_ci let mut guard = self.handlers.lock().unwrap(); 291f857971dSopenharmony_ci let _ = self.epoll_del(fd); 292f857971dSopenharmony_ci if let Some(h) = guard.remove(&fd) { 293f857971dSopenharmony_ci debug!(LOG_LABEL, "Remove epoll handler ({})", @public(fd)); 294f857971dSopenharmony_ci Ok(h.raw_handler()) 295f857971dSopenharmony_ci } else { 296f857971dSopenharmony_ci error!(LOG_LABEL, "No epoll handler ({})", @public(fd)); 297f857971dSopenharmony_ci Err(FusionErrorCode::Fail) 298f857971dSopenharmony_ci } 299f857971dSopenharmony_ci } 300f857971dSopenharmony_ci 301f857971dSopenharmony_ci fn wake(&self, events: &[EpollEvent]) 302f857971dSopenharmony_ci { 303f857971dSopenharmony_ci call_debug_enter!("Epoll::wake"); 304f857971dSopenharmony_ci let mut guard = self.handlers.lock().unwrap(); 305f857971dSopenharmony_ci for e in events { 306f857971dSopenharmony_ci if let Some(handler) = guard.get_mut(&e.fd) { 307f857971dSopenharmony_ci debug!(LOG_LABEL, "Wake epoll handler ({})", @public(e.fd)); 308f857971dSopenharmony_ci handler.events = e.events; 309f857971dSopenharmony_ci if let Some(waker) = &handler.waker { 310f857971dSopenharmony_ci waker.wake_by_ref(); 311f857971dSopenharmony_ci } 312f857971dSopenharmony_ci } else { 313f857971dSopenharmony_ci error!(LOG_LABEL, "No epoll handler ({})", @public(e.fd)); 314f857971dSopenharmony_ci } 315f857971dSopenharmony_ci } 316f857971dSopenharmony_ci } 317f857971dSopenharmony_ci 318f857971dSopenharmony_ci fn dispatch_inner(&self, fd: RawFd, waker: &Waker) -> Option<(Arc<dyn IEpollHandler>, u32)> 319f857971dSopenharmony_ci { 320f857971dSopenharmony_ci call_debug_enter!("Epoll::dispatch_inner"); 321f857971dSopenharmony_ci let mut guard = self.handlers.lock().unwrap(); 322f857971dSopenharmony_ci if let Some(handler) = guard.get_mut(&fd) { 323f857971dSopenharmony_ci handler.set_waker(waker); 324f857971dSopenharmony_ci let events = handler.take_events() & LIBC_EPOLLALL; 325f857971dSopenharmony_ci if events != LIBC_EPOLLNONE { 326f857971dSopenharmony_ci Some((handler.raw_handler(), events)) 327f857971dSopenharmony_ci } else { 328f857971dSopenharmony_ci debug!(LOG_LABEL, "No epoll event"); 329f857971dSopenharmony_ci None 330f857971dSopenharmony_ci } 331f857971dSopenharmony_ci } else { 332f857971dSopenharmony_ci error!(LOG_LABEL, "No epoll handler with ({})", @public(fd)); 333f857971dSopenharmony_ci None 334f857971dSopenharmony_ci } 335f857971dSopenharmony_ci } 336f857971dSopenharmony_ci 337f857971dSopenharmony_ci fn dispatch(&self, fd: RawFd, waker: &Waker) 338f857971dSopenharmony_ci { 339f857971dSopenharmony_ci call_debug_enter!("Epoll::dispatch"); 340f857971dSopenharmony_ci if let Some((handler, events)) = self.dispatch_inner(fd, waker) { 341f857971dSopenharmony_ci handler.dispatch(events); 342f857971dSopenharmony_ci let _ = self.epoll_reset(fd); 343f857971dSopenharmony_ci } 344f857971dSopenharmony_ci } 345f857971dSopenharmony_ci} 346f857971dSopenharmony_ci 347f857971dSopenharmony_ciimpl Default for Epoll { 348f857971dSopenharmony_ci fn default() -> Self 349f857971dSopenharmony_ci { 350f857971dSopenharmony_ci Self::new() 351f857971dSopenharmony_ci } 352f857971dSopenharmony_ci} 353f857971dSopenharmony_ci 354f857971dSopenharmony_ciimpl Drop for Epoll { 355f857971dSopenharmony_ci fn drop(&mut self) 356f857971dSopenharmony_ci { 357f857971dSopenharmony_ci // SAFETY: 358f857971dSopenharmony_ci // Parameter is as required by system, so consider it safe here. 359f857971dSopenharmony_ci let ret = unsafe { libc::close(self.epoll_fd) }; 360f857971dSopenharmony_ci if ret != 0 { 361f857971dSopenharmony_ci error!(LOG_LABEL, "close({}) fail: {:?}", 362f857971dSopenharmony_ci @public(self.epoll_fd), 363f857971dSopenharmony_ci @public(Error::last_os_error())); 364f857971dSopenharmony_ci } 365f857971dSopenharmony_ci } 366f857971dSopenharmony_ci} 367f857971dSopenharmony_ci 368f857971dSopenharmony_cistruct EpollHandlerFuture { 369f857971dSopenharmony_ci fd: RawFd, 370f857971dSopenharmony_ci epoll: Arc<Epoll>, 371f857971dSopenharmony_ci} 372f857971dSopenharmony_ci 373f857971dSopenharmony_ciimpl EpollHandlerFuture { 374f857971dSopenharmony_ci fn new(fd: RawFd, epoll: Arc<Epoll>) -> Self 375f857971dSopenharmony_ci { 376f857971dSopenharmony_ci Self { fd, epoll } 377f857971dSopenharmony_ci } 378f857971dSopenharmony_ci} 379f857971dSopenharmony_ci 380f857971dSopenharmony_ciimpl Future for EpollHandlerFuture { 381f857971dSopenharmony_ci type Output = (); 382f857971dSopenharmony_ci 383f857971dSopenharmony_ci fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> 384f857971dSopenharmony_ci { 385f857971dSopenharmony_ci call_debug_enter!("EpollHandlerFuture::poll"); 386f857971dSopenharmony_ci self.epoll.dispatch(self.fd, cx.waker()); 387f857971dSopenharmony_ci Poll::Pending 388f857971dSopenharmony_ci } 389f857971dSopenharmony_ci} 390f857971dSopenharmony_ci 391f857971dSopenharmony_cistruct EpollWaker { 392f857971dSopenharmony_ci fds: [RawFd; 2], 393f857971dSopenharmony_ci} 394f857971dSopenharmony_ci 395f857971dSopenharmony_ciimpl EpollWaker { 396f857971dSopenharmony_ci fn new() -> Self 397f857971dSopenharmony_ci { 398f857971dSopenharmony_ci let mut fds: [c_int; 2] = [-1; 2]; 399f857971dSopenharmony_ci // SAFETY: 400f857971dSopenharmony_ci // The pipe API is multi-thread safe. 401f857971dSopenharmony_ci // We have carefully checked that parameters are as required by system interface. 402f857971dSopenharmony_ci let ret = unsafe { 403f857971dSopenharmony_ci libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) 404f857971dSopenharmony_ci }; 405f857971dSopenharmony_ci if ret != 0 { 406f857971dSopenharmony_ci error!(LOG_LABEL, "pipe2 fail: {:?}", @public(Error::last_os_error())); 407f857971dSopenharmony_ci } 408f857971dSopenharmony_ci Self { fds } 409f857971dSopenharmony_ci } 410f857971dSopenharmony_ci 411f857971dSopenharmony_ci fn wake(&self) 412f857971dSopenharmony_ci { 413f857971dSopenharmony_ci call_debug_enter!("EpollWaker::wake"); 414f857971dSopenharmony_ci let data: i32 = 0; 415f857971dSopenharmony_ci // SAFETY: 416f857971dSopenharmony_ci // We have carefully checked that parameters are as required by system interface. 417f857971dSopenharmony_ci let ret = unsafe { 418f857971dSopenharmony_ci libc::write(self.fds[1], 419f857971dSopenharmony_ci std::ptr::addr_of!(data) as *const c_void, 420f857971dSopenharmony_ci std::mem::size_of_val(&data)) 421f857971dSopenharmony_ci }; 422f857971dSopenharmony_ci if ret == SYSTEM_IO_FAILURE { 423f857971dSopenharmony_ci error!(LOG_LABEL, "write fail: {:?}", @public(Error::last_os_error())); 424f857971dSopenharmony_ci } 425f857971dSopenharmony_ci } 426f857971dSopenharmony_ci} 427f857971dSopenharmony_ci 428f857971dSopenharmony_ciimpl IEpollHandler for EpollWaker { 429f857971dSopenharmony_ci fn fd(&self) -> RawFd 430f857971dSopenharmony_ci { 431f857971dSopenharmony_ci self.fds[0] 432f857971dSopenharmony_ci } 433f857971dSopenharmony_ci 434f857971dSopenharmony_ci fn dispatch(&self, events: u32) 435f857971dSopenharmony_ci { 436f857971dSopenharmony_ci if (events & LIBC_EPOLLIN) == LIBC_EPOLLIN { 437f857971dSopenharmony_ci let data: i32 = 0; 438f857971dSopenharmony_ci // SAFETY: 439f857971dSopenharmony_ci // Parameters are as required by system and business logic, so it can be trusted. 440f857971dSopenharmony_ci let ret = unsafe { 441f857971dSopenharmony_ci libc::read(self.fd(), 442f857971dSopenharmony_ci std::ptr::addr_of!(data) as *mut c_void, 443f857971dSopenharmony_ci std::mem::size_of_val(&data)) 444f857971dSopenharmony_ci }; 445f857971dSopenharmony_ci if ret == SYSTEM_IO_FAILURE { 446f857971dSopenharmony_ci error!(LOG_LABEL, "read fail: {:?}", @public(Error::last_os_error())); 447f857971dSopenharmony_ci } 448f857971dSopenharmony_ci } 449f857971dSopenharmony_ci } 450f857971dSopenharmony_ci} 451f857971dSopenharmony_ci 452f857971dSopenharmony_ciimpl Default for EpollWaker { 453f857971dSopenharmony_ci fn default() -> Self 454f857971dSopenharmony_ci { 455f857971dSopenharmony_ci Self::new() 456f857971dSopenharmony_ci } 457f857971dSopenharmony_ci} 458f857971dSopenharmony_ci 459f857971dSopenharmony_ciimpl Drop for EpollWaker { 460f857971dSopenharmony_ci fn drop(&mut self) 461f857971dSopenharmony_ci { 462f857971dSopenharmony_ci for fd in &mut self.fds { 463f857971dSopenharmony_ci if *fd != INVALID_FD { 464f857971dSopenharmony_ci // SAFETY: 465f857971dSopenharmony_ci // Parameter is as required by system, so consider it safe here. 466f857971dSopenharmony_ci let ret = unsafe { libc::close(*fd) }; 467f857971dSopenharmony_ci if ret != EPOLL_SUCCESS { 468f857971dSopenharmony_ci error!(LOG_LABEL, "close({}) fail: {:?}", 469f857971dSopenharmony_ci @public(*fd), 470f857971dSopenharmony_ci @public(Error::last_os_error())); 471f857971dSopenharmony_ci } 472f857971dSopenharmony_ci } 473f857971dSopenharmony_ci } 474f857971dSopenharmony_ci } 475f857971dSopenharmony_ci} 476f857971dSopenharmony_ci 477f857971dSopenharmony_ci/// Bookkeeping of epoll handling. 478f857971dSopenharmony_cipub struct Scheduler { 479f857971dSopenharmony_ci epoll: Arc<Epoll>, 480f857971dSopenharmony_ci epoll_waker: Arc<EpollWaker>, 481f857971dSopenharmony_ci is_running: Arc<AtomicBool>, 482f857971dSopenharmony_ci join_handle: Option<std::thread::JoinHandle<()>>, 483f857971dSopenharmony_ci} 484f857971dSopenharmony_ci 485f857971dSopenharmony_ciimpl Scheduler { 486f857971dSopenharmony_ci pub(crate) fn new() -> Self 487f857971dSopenharmony_ci { 488f857971dSopenharmony_ci call_debug_enter!("Scheduler::new"); 489f857971dSopenharmony_ci let epoll: Arc<Epoll> = Arc::default(); 490f857971dSopenharmony_ci let is_running = Arc::new(AtomicBool::new(true)); 491f857971dSopenharmony_ci let driver = Driver::new(epoll.clone(), is_running.clone()); 492f857971dSopenharmony_ci let join_handle = std::thread::spawn(move || { 493f857971dSopenharmony_ci driver.run(); 494f857971dSopenharmony_ci }); 495f857971dSopenharmony_ci let scheduler = Self { 496f857971dSopenharmony_ci epoll, 497f857971dSopenharmony_ci epoll_waker: Arc::default(), 498f857971dSopenharmony_ci is_running, 499f857971dSopenharmony_ci join_handle: Some(join_handle), 500f857971dSopenharmony_ci }; 501f857971dSopenharmony_ci let _ = scheduler.add_epoll_handler(scheduler.epoll_waker.clone()); 502f857971dSopenharmony_ci scheduler 503f857971dSopenharmony_ci } 504f857971dSopenharmony_ci 505f857971dSopenharmony_ci pub(crate) fn add_epoll_handler(&self, handler: Arc<dyn IEpollHandler>) 506f857971dSopenharmony_ci -> FusionResult<Arc<dyn IEpollHandler>> 507f857971dSopenharmony_ci { 508f857971dSopenharmony_ci call_debug_enter!("Scheduler::add_epoll_handler"); 509f857971dSopenharmony_ci let fd: RawFd = handler.fd(); 510f857971dSopenharmony_ci let join_handle = ylong_runtime::spawn( 511f857971dSopenharmony_ci EpollHandlerFuture::new(fd, self.epoll.clone()) 512f857971dSopenharmony_ci ); 513f857971dSopenharmony_ci self.epoll.add_epoll_handler(fd, EpollHandler::new(handler, join_handle)) 514f857971dSopenharmony_ci } 515f857971dSopenharmony_ci 516f857971dSopenharmony_ci pub(crate) fn remove_epoll_handler(&self, handler: Arc<dyn IEpollHandler>) 517f857971dSopenharmony_ci -> FusionResult<Arc<dyn IEpollHandler>> 518f857971dSopenharmony_ci { 519f857971dSopenharmony_ci call_debug_enter!("Scheduler::remove_epoll_handler"); 520f857971dSopenharmony_ci self.epoll.remove_epoll_handler(handler.fd()) 521f857971dSopenharmony_ci } 522f857971dSopenharmony_ci} 523f857971dSopenharmony_ci 524f857971dSopenharmony_ciimpl Default for Scheduler { 525f857971dSopenharmony_ci fn default() -> Self 526f857971dSopenharmony_ci { 527f857971dSopenharmony_ci Self::new() 528f857971dSopenharmony_ci } 529f857971dSopenharmony_ci} 530f857971dSopenharmony_ci 531f857971dSopenharmony_ciimpl Drop for Scheduler { 532f857971dSopenharmony_ci fn drop(&mut self) 533f857971dSopenharmony_ci { 534f857971dSopenharmony_ci call_debug_enter!("Scheduler::drop"); 535f857971dSopenharmony_ci self.is_running.store(false, Ordering::Relaxed); 536f857971dSopenharmony_ci self.epoll_waker.wake(); 537f857971dSopenharmony_ci if let Some(join_handle) = self.join_handle.take() { 538f857971dSopenharmony_ci let _ = join_handle.join(); 539f857971dSopenharmony_ci } 540f857971dSopenharmony_ci } 541f857971dSopenharmony_ci} 542