1/*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16//! Implementation of epoll event loop.
17
18#![allow(dead_code)]
19#![allow(unused_variables)]
20
21use std::collections::BTreeMap;
22use std::ffi::{ c_void, c_char, c_int, CString };
23use std::future::Future;
24use std::io::Error;
25use std::os::fd::RawFd;
26use std::pin::Pin;
27use std::sync::{ Arc, Mutex };
28use std::sync::atomic::{ AtomicBool, Ordering };
29use std::task::{ Context, Poll, Waker };
30use fusion_utils_rust::{ call_debug_enter, FusionErrorCode, FusionResult };
31use hilog_rust::{ debug, info, error, hilog, HiLogLabel, LogType };
32
33/// Indicating data other than high-priority data can be read.
34pub const LIBC_EPOLLIN: u32 = libc::EPOLLIN as u32;
35const LIBC_EPOLLONESHOT: u32 = libc::EPOLLONESHOT as u32;
36/// Indicating an error has occurred.
37pub const LIBC_EPOLLERR: u32 = libc::EPOLLERR as u32;
38/// Indicating a hangup has occurred.
39pub const LIBC_EPOLLHUP: u32 = libc::EPOLLHUP as u32;
40const LIBC_EPOLLALL: u32 = LIBC_EPOLLIN | LIBC_EPOLLERR | LIBC_EPOLLHUP;
41const LIBC_EPOLLNONE: u32 = 0;
42const MAX_EPOLL_EVENTS: c_int = 128;
43const EPOLL_SUCCESS: c_int = 0;
44const EPOLL_FAILURE: c_int = -1;
45const NO_TIMEOUT: c_int = -1;
46const SYSTEM_IO_FAILURE: libc::ssize_t = -1;
47const INVALID_FD: RawFd = -1;
48const LOG_LABEL: HiLogLabel = HiLogLabel {
49    log_type: LogType::LogCore,
50    domain: 0xD002220,
51    tag: "Scheduler",
52};
53
54/// Abstraction of epoll handler.
55pub trait IEpollHandler: Send + Sync {
56    /// Return file descriptor of this epoll handler.
57    fn fd(&self) -> RawFd;
58    /// Dispatch epoll events to this epoll handler.
59    fn dispatch(&self, events: u32);
60}
61
62struct EpollEvent {
63    fd: RawFd,
64    events: u32,
65}
66
67struct EpollHandler {
68    raw: Arc<dyn IEpollHandler>,
69    handle: ylong_runtime::task::JoinHandle<()>,
70    waker: Option<Waker>,
71    events: u32,
72}
73
74impl EpollHandler {
75    fn new(raw: Arc<dyn IEpollHandler>, handle: ylong_runtime::task::JoinHandle<()>) -> Self
76    {
77        Self {
78            raw,
79            handle,
80            waker: Default::default(),
81            events: Default::default(),
82        }
83    }
84
85    #[inline]
86    fn fd(&self) -> RawFd
87    {
88        self.raw.fd()
89    }
90
91    #[inline]
92    fn raw_handler(&self) -> Arc<dyn IEpollHandler>
93    {
94        self.raw.clone()
95    }
96
97    #[inline]
98    fn set_waker(&mut self, waker: &Waker)
99    {
100        self.waker.replace(waker.clone());
101    }
102
103    #[inline]
104    fn take_events(&mut self) -> u32
105    {
106        let events = self.events;
107        self.events = Default::default();
108        events
109    }
110}
111
112impl Drop for EpollHandler {
113    fn drop(&mut self)
114    {
115        self.handle.cancel();
116    }
117}
118
119/// `Driver` encapsulate event loop of epoll.
120struct Driver {
121    epoll: Arc<Epoll>,
122    is_running: Arc<AtomicBool>,
123}
124
125impl Driver {
126    fn new(epoll: Arc<Epoll>, is_running: Arc<AtomicBool>) -> Self
127    {
128        Self { epoll, is_running }
129    }
130
131    #[inline]
132    fn is_running(&self) -> bool
133    {
134        self.is_running.load(Ordering::Relaxed)
135    }
136
137    fn run(&self)
138    {
139        call_debug_enter!("Driver::run");
140        while self.is_running() {
141            if let Some(epoll_events) = self.epoll.epoll_wait() {
142                if !self.is_running() {
143                    info!(LOG_LABEL, "Driver stopped running");
144                    break;
145                }
146                self.epoll.wake(&epoll_events);
147            }
148        }
149    }
150}
151
152struct Epoll {
153    epoll_fd: RawFd,
154    handlers: Mutex<BTreeMap<RawFd, EpollHandler>>,
155}
156
157impl Epoll {
158    fn new() -> Self
159    {
160        // SAFETY:
161        // The epoll API is multi-thread safe.
162        // This is a normal system call, no safety pitfall.
163        let epoll_fd = unsafe { libc::epoll_create1(libc::EPOLL_CLOEXEC) };
164        assert_ne!(epoll_fd, INVALID_FD, "epoll_create1 fail: {:?}", Error::last_os_error());
165        Self {
166            epoll_fd,
167            handlers: Mutex::default(),
168        }
169    }
170
171    #[inline]
172    fn fd(&self) -> RawFd
173    {
174        self.epoll_fd
175    }
176
177    fn epoll_add(&self, fd: RawFd) -> FusionResult<()>
178    {
179        call_debug_enter!("Epoll::epoll_add");
180        let mut ev = libc::epoll_event {
181            events: LIBC_EPOLLIN | LIBC_EPOLLONESHOT | LIBC_EPOLLHUP | LIBC_EPOLLERR,
182            u64: fd as u64,
183        };
184        // SAFETY:
185        // The epoll API is multi-thread safe.
186        // We have carefully ensure that parameters are as required by system interface.
187        let ret = unsafe {
188            libc::epoll_ctl(self.epoll_fd, libc::EPOLL_CTL_ADD, fd, &mut ev)
189        };
190        if ret != EPOLL_SUCCESS {
191            error!(LOG_LABEL, "epoll_ctl_add({},{}) fail: {:?}",
192                   @public(self.epoll_fd), @public(fd), @public(Error::last_os_error()));
193            Err(FusionErrorCode::Fail)
194        } else {
195            Ok(())
196        }
197    }
198
199    fn epoll_del(&self, fd: RawFd) -> FusionResult<()>
200    {
201        call_debug_enter!("Epoll::epoll_del");
202        // SAFETY:
203        // The epoll API is multi-thread safe.
204        // We have carefully ensure that parameters are as required by system interface.
205        let ret = unsafe {
206            libc::epoll_ctl(self.epoll_fd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut())
207        };
208        if ret != EPOLL_SUCCESS {
209            error!(LOG_LABEL, "epoll_ctl_remove({},{}) fail: {:?}",
210                   @public(self.epoll_fd), @public(fd), @public(Error::last_os_error()));
211            Err(FusionErrorCode::Fail)
212        } else {
213            Ok(())
214        }
215    }
216
217    fn epoll_wait(&self) -> Option<Vec<EpollEvent>>
218    {
219        call_debug_enter!("Epoll::epoll_wait");
220        let mut events: Vec<libc::epoll_event> = Vec::with_capacity(MAX_EPOLL_EVENTS as usize);
221        // SAFETY:
222        // The epoll API is multi-thread safe.
223        // We have carefully ensure that parameters are as required by system interface.
224        let ret = unsafe {
225            libc::epoll_wait(self.epoll_fd, events.as_mut_ptr(), MAX_EPOLL_EVENTS, NO_TIMEOUT)
226        };
227        if ret < 0 {
228            error!(LOG_LABEL, "epoll_wait({}) fail: {:?}",
229                   @public(self.epoll_fd),
230                   @public(Error::last_os_error()));
231            return None;
232        }
233        let num_of_events = ret as usize;
234        // SAFETY:
235        // `epoll_wait` returns the number of events and promise it is within the limit of
236        // `MAX_EPOLL_EVENTS`.
237        let epoll_events = unsafe {
238            std::slice::from_raw_parts(events.as_ptr(), num_of_events)
239        };
240        let epoll_events: Vec<EpollEvent> = epoll_events.iter().map(|e| {
241            EpollEvent {
242                fd: e.u64 as RawFd,
243                events: e.events,
244            }
245        }).collect();
246        Some(epoll_events)
247    }
248
249    fn epoll_reset(&self, fd: RawFd) -> FusionResult<()>
250    {
251        call_debug_enter!("Epoll::epoll_reset");
252        let mut ev = libc::epoll_event {
253            events: LIBC_EPOLLIN | LIBC_EPOLLONESHOT | LIBC_EPOLLHUP | LIBC_EPOLLERR,
254            u64: fd as u64,
255        };
256        // SAFETY:
257        // The epoll API is multi-thread safe.
258        // We have carefully ensure that parameters are as required by system interface.
259        let ret = unsafe {
260            libc::epoll_ctl(self.epoll_fd, libc::EPOLL_CTL_MOD, fd, &mut ev)
261        };
262        if ret != EPOLL_SUCCESS {
263            error!(LOG_LABEL, "In reset_fd, epoll_ctl_mod({},{}) fail: {:?}",
264                   @public(self.epoll_fd), @public(fd), @public(Error::last_os_error()));
265            Err(FusionErrorCode::Fail)
266        } else {
267            Ok(())
268        }
269    }
270
271    fn add_epoll_handler(&self, fd: RawFd, epoll_handler: EpollHandler)
272        -> FusionResult<Arc<dyn IEpollHandler>>
273    {
274        call_debug_enter!("Epoll::add_epoll_handler");
275        let mut guard = self.handlers.lock().unwrap();
276        if guard.contains_key(&fd) {
277            error!(LOG_LABEL, "Epoll handler ({}) has been added", @public(fd));
278            return Err(FusionErrorCode::Fail);
279        }
280        debug!(LOG_LABEL, "Add epoll handler ({})", @public(fd));
281        let raw = epoll_handler.raw_handler();
282        guard.insert(fd, epoll_handler);
283        let _ = self.epoll_add(fd);
284        Ok(raw)
285    }
286
287    fn remove_epoll_handler(&self, fd: RawFd) -> FusionResult<Arc<dyn IEpollHandler>>
288    {
289        call_debug_enter!("Epoll::remove_epoll_handler");
290        let mut guard = self.handlers.lock().unwrap();
291        let _ = self.epoll_del(fd);
292        if let Some(h) = guard.remove(&fd) {
293            debug!(LOG_LABEL, "Remove epoll handler ({})", @public(fd));
294            Ok(h.raw_handler())
295        } else {
296            error!(LOG_LABEL, "No epoll handler ({})", @public(fd));
297            Err(FusionErrorCode::Fail)
298        }
299    }
300
301    fn wake(&self, events: &[EpollEvent])
302    {
303        call_debug_enter!("Epoll::wake");
304        let mut guard = self.handlers.lock().unwrap();
305        for e in events {
306            if let Some(handler) = guard.get_mut(&e.fd) {
307                debug!(LOG_LABEL, "Wake epoll handler ({})", @public(e.fd));
308                handler.events = e.events;
309                if let Some(waker) = &handler.waker {
310                    waker.wake_by_ref();
311                }
312            } else {
313                error!(LOG_LABEL, "No epoll handler ({})", @public(e.fd));
314            }
315        }
316    }
317
318    fn dispatch_inner(&self, fd: RawFd, waker: &Waker) -> Option<(Arc<dyn IEpollHandler>, u32)>
319    {
320        call_debug_enter!("Epoll::dispatch_inner");
321        let mut guard = self.handlers.lock().unwrap();
322        if let Some(handler) = guard.get_mut(&fd) {
323            handler.set_waker(waker);
324            let events = handler.take_events() & LIBC_EPOLLALL;
325            if events != LIBC_EPOLLNONE {
326                Some((handler.raw_handler(), events))
327            } else {
328                debug!(LOG_LABEL, "No epoll event");
329                None
330            }
331        } else {
332            error!(LOG_LABEL, "No epoll handler with ({})", @public(fd));
333            None
334        }
335    }
336
337    fn dispatch(&self, fd: RawFd, waker: &Waker)
338    {
339        call_debug_enter!("Epoll::dispatch");
340        if let Some((handler, events)) = self.dispatch_inner(fd, waker) {
341            handler.dispatch(events);
342            let _ = self.epoll_reset(fd);
343        }
344    }
345}
346
347impl Default for Epoll {
348    fn default() -> Self
349    {
350        Self::new()
351    }
352}
353
354impl Drop for Epoll {
355    fn drop(&mut self)
356    {
357        // SAFETY:
358        // Parameter is as required by system, so consider it safe here.
359        let ret = unsafe { libc::close(self.epoll_fd) };
360        if ret != 0 {
361            error!(LOG_LABEL, "close({}) fail: {:?}",
362                   @public(self.epoll_fd),
363                   @public(Error::last_os_error()));
364        }
365    }
366}
367
368struct EpollHandlerFuture {
369    fd: RawFd,
370    epoll: Arc<Epoll>,
371}
372
373impl EpollHandlerFuture {
374    fn new(fd: RawFd, epoll: Arc<Epoll>) -> Self
375    {
376        Self { fd, epoll }
377    }
378}
379
380impl Future for EpollHandlerFuture {
381    type Output = ();
382
383    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>
384    {
385        call_debug_enter!("EpollHandlerFuture::poll");
386        self.epoll.dispatch(self.fd, cx.waker());
387        Poll::Pending
388    }
389}
390
391struct EpollWaker {
392    fds: [RawFd; 2],
393}
394
395impl EpollWaker {
396    fn new() -> Self
397    {
398        let mut fds: [c_int; 2] = [-1; 2];
399        // SAFETY:
400        // The pipe API is multi-thread safe.
401        // We have carefully checked that parameters are as required by system interface.
402        let ret = unsafe {
403            libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK)
404        };
405        if ret != 0 {
406            error!(LOG_LABEL, "pipe2 fail: {:?}", @public(Error::last_os_error()));
407        }
408        Self { fds }
409    }
410
411    fn wake(&self)
412    {
413        call_debug_enter!("EpollWaker::wake");
414        let data: i32 = 0;
415        // SAFETY:
416        // We have carefully checked that parameters are as required by system interface.
417        let ret = unsafe {
418            libc::write(self.fds[1],
419                        std::ptr::addr_of!(data) as *const c_void,
420                        std::mem::size_of_val(&data))
421        };
422        if ret == SYSTEM_IO_FAILURE {
423            error!(LOG_LABEL, "write fail: {:?}", @public(Error::last_os_error()));
424        }
425    }
426}
427
428impl IEpollHandler for EpollWaker {
429    fn fd(&self) -> RawFd
430    {
431        self.fds[0]
432    }
433
434    fn dispatch(&self, events: u32)
435    {
436        if (events & LIBC_EPOLLIN) == LIBC_EPOLLIN {
437            let data: i32 = 0;
438            // SAFETY:
439            // Parameters are as required by system and business logic, so it can be trusted.
440            let ret = unsafe {
441                libc::read(self.fd(),
442                           std::ptr::addr_of!(data) as *mut c_void,
443                           std::mem::size_of_val(&data))
444            };
445            if ret == SYSTEM_IO_FAILURE {
446                error!(LOG_LABEL, "read fail: {:?}", @public(Error::last_os_error()));
447            }
448        }
449    }
450}
451
452impl Default for EpollWaker {
453    fn default() -> Self
454    {
455        Self::new()
456    }
457}
458
459impl Drop for EpollWaker {
460    fn drop(&mut self)
461    {
462        for fd in &mut self.fds {
463            if *fd != INVALID_FD {
464                // SAFETY:
465                // Parameter is as required by system, so consider it safe here.
466                let ret = unsafe { libc::close(*fd) };
467                if ret != EPOLL_SUCCESS {
468                    error!(LOG_LABEL, "close({}) fail: {:?}",
469                           @public(*fd),
470                           @public(Error::last_os_error()));
471                }
472            }
473        }
474    }
475}
476
477/// Bookkeeping of epoll handling.
478pub struct Scheduler {
479    epoll: Arc<Epoll>,
480    epoll_waker: Arc<EpollWaker>,
481    is_running: Arc<AtomicBool>,
482    join_handle: Option<std::thread::JoinHandle<()>>,
483}
484
485impl Scheduler {
486    pub(crate) fn new() -> Self
487    {
488        call_debug_enter!("Scheduler::new");
489        let epoll: Arc<Epoll> = Arc::default();
490        let is_running = Arc::new(AtomicBool::new(true));
491        let driver = Driver::new(epoll.clone(), is_running.clone());
492        let join_handle = std::thread::spawn(move || {
493            driver.run();
494        });
495        let scheduler = Self {
496            epoll,
497            epoll_waker: Arc::default(),
498            is_running,
499            join_handle: Some(join_handle),
500        };
501        let _ = scheduler.add_epoll_handler(scheduler.epoll_waker.clone());
502        scheduler
503    }
504
505    pub(crate) fn add_epoll_handler(&self, handler: Arc<dyn IEpollHandler>)
506        -> FusionResult<Arc<dyn IEpollHandler>>
507    {
508        call_debug_enter!("Scheduler::add_epoll_handler");
509        let fd: RawFd = handler.fd();
510        let join_handle = ylong_runtime::spawn(
511            EpollHandlerFuture::new(fd, self.epoll.clone())
512        );
513        self.epoll.add_epoll_handler(fd, EpollHandler::new(handler, join_handle))
514    }
515
516    pub(crate) fn remove_epoll_handler(&self, handler: Arc<dyn IEpollHandler>)
517        -> FusionResult<Arc<dyn IEpollHandler>>
518    {
519        call_debug_enter!("Scheduler::remove_epoll_handler");
520        self.epoll.remove_epoll_handler(handler.fd())
521    }
522}
523
524impl Default for Scheduler {
525    fn default() -> Self
526    {
527        Self::new()
528    }
529}
530
531impl Drop for Scheduler {
532    fn drop(&mut self)
533    {
534        call_debug_enter!("Scheduler::drop");
535        self.is_running.store(false, Ordering::Relaxed);
536        self.epoll_waker.wake();
537        if let Some(join_handle) = self.join_handle.take() {
538            let _ = join_handle.join();
539        }
540    }
541}
542