1f857971dSopenharmony_ci/*
2f857971dSopenharmony_ci * Copyright (C) 2023 Huawei Device Co., Ltd.
3f857971dSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
4f857971dSopenharmony_ci * you may not use this file except in compliance with the License.
5f857971dSopenharmony_ci * You may obtain a copy of the License at
6f857971dSopenharmony_ci *
7f857971dSopenharmony_ci *     http://www.apache.org/licenses/LICENSE-2.0
8f857971dSopenharmony_ci *
9f857971dSopenharmony_ci * Unless required by applicable law or agreed to in writing, software
10f857971dSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
11f857971dSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12f857971dSopenharmony_ci * See the License for the specific language governing permissions and
13f857971dSopenharmony_ci * limitations under the License.
14f857971dSopenharmony_ci */
15f857971dSopenharmony_ci
16f857971dSopenharmony_ci//! Implementation of epoll event loop.
17f857971dSopenharmony_ci
18f857971dSopenharmony_ci#![allow(dead_code)]
19f857971dSopenharmony_ci#![allow(unused_variables)]
20f857971dSopenharmony_ci
21f857971dSopenharmony_ciuse std::collections::BTreeMap;
22f857971dSopenharmony_ciuse std::ffi::{ c_void, c_char, c_int, CString };
23f857971dSopenharmony_ciuse std::future::Future;
24f857971dSopenharmony_ciuse std::io::Error;
25f857971dSopenharmony_ciuse std::os::fd::RawFd;
26f857971dSopenharmony_ciuse std::pin::Pin;
27f857971dSopenharmony_ciuse std::sync::{ Arc, Mutex };
28f857971dSopenharmony_ciuse std::sync::atomic::{ AtomicBool, Ordering };
29f857971dSopenharmony_ciuse std::task::{ Context, Poll, Waker };
30f857971dSopenharmony_ciuse fusion_utils_rust::{ call_debug_enter, FusionErrorCode, FusionResult };
31f857971dSopenharmony_ciuse hilog_rust::{ debug, info, error, hilog, HiLogLabel, LogType };
32f857971dSopenharmony_ci
33f857971dSopenharmony_ci/// Indicating data other than high-priority data can be read.
34f857971dSopenharmony_cipub const LIBC_EPOLLIN: u32 = libc::EPOLLIN as u32;
35f857971dSopenharmony_ciconst LIBC_EPOLLONESHOT: u32 = libc::EPOLLONESHOT as u32;
36f857971dSopenharmony_ci/// Indicating an error has occurred.
37f857971dSopenharmony_cipub const LIBC_EPOLLERR: u32 = libc::EPOLLERR as u32;
38f857971dSopenharmony_ci/// Indicating a hangup has occurred.
39f857971dSopenharmony_cipub const LIBC_EPOLLHUP: u32 = libc::EPOLLHUP as u32;
40f857971dSopenharmony_ciconst LIBC_EPOLLALL: u32 = LIBC_EPOLLIN | LIBC_EPOLLERR | LIBC_EPOLLHUP;
41f857971dSopenharmony_ciconst LIBC_EPOLLNONE: u32 = 0;
42f857971dSopenharmony_ciconst MAX_EPOLL_EVENTS: c_int = 128;
43f857971dSopenharmony_ciconst EPOLL_SUCCESS: c_int = 0;
44f857971dSopenharmony_ciconst EPOLL_FAILURE: c_int = -1;
45f857971dSopenharmony_ciconst NO_TIMEOUT: c_int = -1;
46f857971dSopenharmony_ciconst SYSTEM_IO_FAILURE: libc::ssize_t = -1;
47f857971dSopenharmony_ciconst INVALID_FD: RawFd = -1;
48f857971dSopenharmony_ciconst LOG_LABEL: HiLogLabel = HiLogLabel {
49f857971dSopenharmony_ci    log_type: LogType::LogCore,
50f857971dSopenharmony_ci    domain: 0xD002220,
51f857971dSopenharmony_ci    tag: "Scheduler",
52f857971dSopenharmony_ci};
53f857971dSopenharmony_ci
54f857971dSopenharmony_ci/// Abstraction of epoll handler.
55f857971dSopenharmony_cipub trait IEpollHandler: Send + Sync {
56f857971dSopenharmony_ci    /// Return file descriptor of this epoll handler.
57f857971dSopenharmony_ci    fn fd(&self) -> RawFd;
58f857971dSopenharmony_ci    /// Dispatch epoll events to this epoll handler.
59f857971dSopenharmony_ci    fn dispatch(&self, events: u32);
60f857971dSopenharmony_ci}
61f857971dSopenharmony_ci
62f857971dSopenharmony_cistruct EpollEvent {
63f857971dSopenharmony_ci    fd: RawFd,
64f857971dSopenharmony_ci    events: u32,
65f857971dSopenharmony_ci}
66f857971dSopenharmony_ci
67f857971dSopenharmony_cistruct EpollHandler {
68f857971dSopenharmony_ci    raw: Arc<dyn IEpollHandler>,
69f857971dSopenharmony_ci    handle: ylong_runtime::task::JoinHandle<()>,
70f857971dSopenharmony_ci    waker: Option<Waker>,
71f857971dSopenharmony_ci    events: u32,
72f857971dSopenharmony_ci}
73f857971dSopenharmony_ci
74f857971dSopenharmony_ciimpl EpollHandler {
75f857971dSopenharmony_ci    fn new(raw: Arc<dyn IEpollHandler>, handle: ylong_runtime::task::JoinHandle<()>) -> Self
76f857971dSopenharmony_ci    {
77f857971dSopenharmony_ci        Self {
78f857971dSopenharmony_ci            raw,
79f857971dSopenharmony_ci            handle,
80f857971dSopenharmony_ci            waker: Default::default(),
81f857971dSopenharmony_ci            events: Default::default(),
82f857971dSopenharmony_ci        }
83f857971dSopenharmony_ci    }
84f857971dSopenharmony_ci
85f857971dSopenharmony_ci    #[inline]
86f857971dSopenharmony_ci    fn fd(&self) -> RawFd
87f857971dSopenharmony_ci    {
88f857971dSopenharmony_ci        self.raw.fd()
89f857971dSopenharmony_ci    }
90f857971dSopenharmony_ci
91f857971dSopenharmony_ci    #[inline]
92f857971dSopenharmony_ci    fn raw_handler(&self) -> Arc<dyn IEpollHandler>
93f857971dSopenharmony_ci    {
94f857971dSopenharmony_ci        self.raw.clone()
95f857971dSopenharmony_ci    }
96f857971dSopenharmony_ci
97f857971dSopenharmony_ci    #[inline]
98f857971dSopenharmony_ci    fn set_waker(&mut self, waker: &Waker)
99f857971dSopenharmony_ci    {
100f857971dSopenharmony_ci        self.waker.replace(waker.clone());
101f857971dSopenharmony_ci    }
102f857971dSopenharmony_ci
103f857971dSopenharmony_ci    #[inline]
104f857971dSopenharmony_ci    fn take_events(&mut self) -> u32
105f857971dSopenharmony_ci    {
106f857971dSopenharmony_ci        let events = self.events;
107f857971dSopenharmony_ci        self.events = Default::default();
108f857971dSopenharmony_ci        events
109f857971dSopenharmony_ci    }
110f857971dSopenharmony_ci}
111f857971dSopenharmony_ci
112f857971dSopenharmony_ciimpl Drop for EpollHandler {
113f857971dSopenharmony_ci    fn drop(&mut self)
114f857971dSopenharmony_ci    {
115f857971dSopenharmony_ci        self.handle.cancel();
116f857971dSopenharmony_ci    }
117f857971dSopenharmony_ci}
118f857971dSopenharmony_ci
119f857971dSopenharmony_ci/// `Driver` encapsulate event loop of epoll.
120f857971dSopenharmony_cistruct Driver {
121f857971dSopenharmony_ci    epoll: Arc<Epoll>,
122f857971dSopenharmony_ci    is_running: Arc<AtomicBool>,
123f857971dSopenharmony_ci}
124f857971dSopenharmony_ci
125f857971dSopenharmony_ciimpl Driver {
126f857971dSopenharmony_ci    fn new(epoll: Arc<Epoll>, is_running: Arc<AtomicBool>) -> Self
127f857971dSopenharmony_ci    {
128f857971dSopenharmony_ci        Self { epoll, is_running }
129f857971dSopenharmony_ci    }
130f857971dSopenharmony_ci
131f857971dSopenharmony_ci    #[inline]
132f857971dSopenharmony_ci    fn is_running(&self) -> bool
133f857971dSopenharmony_ci    {
134f857971dSopenharmony_ci        self.is_running.load(Ordering::Relaxed)
135f857971dSopenharmony_ci    }
136f857971dSopenharmony_ci
137f857971dSopenharmony_ci    fn run(&self)
138f857971dSopenharmony_ci    {
139f857971dSopenharmony_ci        call_debug_enter!("Driver::run");
140f857971dSopenharmony_ci        while self.is_running() {
141f857971dSopenharmony_ci            if let Some(epoll_events) = self.epoll.epoll_wait() {
142f857971dSopenharmony_ci                if !self.is_running() {
143f857971dSopenharmony_ci                    info!(LOG_LABEL, "Driver stopped running");
144f857971dSopenharmony_ci                    break;
145f857971dSopenharmony_ci                }
146f857971dSopenharmony_ci                self.epoll.wake(&epoll_events);
147f857971dSopenharmony_ci            }
148f857971dSopenharmony_ci        }
149f857971dSopenharmony_ci    }
150f857971dSopenharmony_ci}
151f857971dSopenharmony_ci
152f857971dSopenharmony_cistruct Epoll {
153f857971dSopenharmony_ci    epoll_fd: RawFd,
154f857971dSopenharmony_ci    handlers: Mutex<BTreeMap<RawFd, EpollHandler>>,
155f857971dSopenharmony_ci}
156f857971dSopenharmony_ci
157f857971dSopenharmony_ciimpl Epoll {
158f857971dSopenharmony_ci    fn new() -> Self
159f857971dSopenharmony_ci    {
160f857971dSopenharmony_ci        // SAFETY:
161f857971dSopenharmony_ci        // The epoll API is multi-thread safe.
162f857971dSopenharmony_ci        // This is a normal system call, no safety pitfall.
163f857971dSopenharmony_ci        let epoll_fd = unsafe { libc::epoll_create1(libc::EPOLL_CLOEXEC) };
164f857971dSopenharmony_ci        assert_ne!(epoll_fd, INVALID_FD, "epoll_create1 fail: {:?}", Error::last_os_error());
165f857971dSopenharmony_ci        Self {
166f857971dSopenharmony_ci            epoll_fd,
167f857971dSopenharmony_ci            handlers: Mutex::default(),
168f857971dSopenharmony_ci        }
169f857971dSopenharmony_ci    }
170f857971dSopenharmony_ci
171f857971dSopenharmony_ci    #[inline]
172f857971dSopenharmony_ci    fn fd(&self) -> RawFd
173f857971dSopenharmony_ci    {
174f857971dSopenharmony_ci        self.epoll_fd
175f857971dSopenharmony_ci    }
176f857971dSopenharmony_ci
177f857971dSopenharmony_ci    fn epoll_add(&self, fd: RawFd) -> FusionResult<()>
178f857971dSopenharmony_ci    {
179f857971dSopenharmony_ci        call_debug_enter!("Epoll::epoll_add");
180f857971dSopenharmony_ci        let mut ev = libc::epoll_event {
181f857971dSopenharmony_ci            events: LIBC_EPOLLIN | LIBC_EPOLLONESHOT | LIBC_EPOLLHUP | LIBC_EPOLLERR,
182f857971dSopenharmony_ci            u64: fd as u64,
183f857971dSopenharmony_ci        };
184f857971dSopenharmony_ci        // SAFETY:
185f857971dSopenharmony_ci        // The epoll API is multi-thread safe.
186f857971dSopenharmony_ci        // We have carefully ensure that parameters are as required by system interface.
187f857971dSopenharmony_ci        let ret = unsafe {
188f857971dSopenharmony_ci            libc::epoll_ctl(self.epoll_fd, libc::EPOLL_CTL_ADD, fd, &mut ev)
189f857971dSopenharmony_ci        };
190f857971dSopenharmony_ci        if ret != EPOLL_SUCCESS {
191f857971dSopenharmony_ci            error!(LOG_LABEL, "epoll_ctl_add({},{}) fail: {:?}",
192f857971dSopenharmony_ci                   @public(self.epoll_fd), @public(fd), @public(Error::last_os_error()));
193f857971dSopenharmony_ci            Err(FusionErrorCode::Fail)
194f857971dSopenharmony_ci        } else {
195f857971dSopenharmony_ci            Ok(())
196f857971dSopenharmony_ci        }
197f857971dSopenharmony_ci    }
198f857971dSopenharmony_ci
199f857971dSopenharmony_ci    fn epoll_del(&self, fd: RawFd) -> FusionResult<()>
200f857971dSopenharmony_ci    {
201f857971dSopenharmony_ci        call_debug_enter!("Epoll::epoll_del");
202f857971dSopenharmony_ci        // SAFETY:
203f857971dSopenharmony_ci        // The epoll API is multi-thread safe.
204f857971dSopenharmony_ci        // We have carefully ensure that parameters are as required by system interface.
205f857971dSopenharmony_ci        let ret = unsafe {
206f857971dSopenharmony_ci            libc::epoll_ctl(self.epoll_fd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut())
207f857971dSopenharmony_ci        };
208f857971dSopenharmony_ci        if ret != EPOLL_SUCCESS {
209f857971dSopenharmony_ci            error!(LOG_LABEL, "epoll_ctl_remove({},{}) fail: {:?}",
210f857971dSopenharmony_ci                   @public(self.epoll_fd), @public(fd), @public(Error::last_os_error()));
211f857971dSopenharmony_ci            Err(FusionErrorCode::Fail)
212f857971dSopenharmony_ci        } else {
213f857971dSopenharmony_ci            Ok(())
214f857971dSopenharmony_ci        }
215f857971dSopenharmony_ci    }
216f857971dSopenharmony_ci
217f857971dSopenharmony_ci    fn epoll_wait(&self) -> Option<Vec<EpollEvent>>
218f857971dSopenharmony_ci    {
219f857971dSopenharmony_ci        call_debug_enter!("Epoll::epoll_wait");
220f857971dSopenharmony_ci        let mut events: Vec<libc::epoll_event> = Vec::with_capacity(MAX_EPOLL_EVENTS as usize);
221f857971dSopenharmony_ci        // SAFETY:
222f857971dSopenharmony_ci        // The epoll API is multi-thread safe.
223f857971dSopenharmony_ci        // We have carefully ensure that parameters are as required by system interface.
224f857971dSopenharmony_ci        let ret = unsafe {
225f857971dSopenharmony_ci            libc::epoll_wait(self.epoll_fd, events.as_mut_ptr(), MAX_EPOLL_EVENTS, NO_TIMEOUT)
226f857971dSopenharmony_ci        };
227f857971dSopenharmony_ci        if ret < 0 {
228f857971dSopenharmony_ci            error!(LOG_LABEL, "epoll_wait({}) fail: {:?}",
229f857971dSopenharmony_ci                   @public(self.epoll_fd),
230f857971dSopenharmony_ci                   @public(Error::last_os_error()));
231f857971dSopenharmony_ci            return None;
232f857971dSopenharmony_ci        }
233f857971dSopenharmony_ci        let num_of_events = ret as usize;
234f857971dSopenharmony_ci        // SAFETY:
235f857971dSopenharmony_ci        // `epoll_wait` returns the number of events and promise it is within the limit of
236f857971dSopenharmony_ci        // `MAX_EPOLL_EVENTS`.
237f857971dSopenharmony_ci        let epoll_events = unsafe {
238f857971dSopenharmony_ci            std::slice::from_raw_parts(events.as_ptr(), num_of_events)
239f857971dSopenharmony_ci        };
240f857971dSopenharmony_ci        let epoll_events: Vec<EpollEvent> = epoll_events.iter().map(|e| {
241f857971dSopenharmony_ci            EpollEvent {
242f857971dSopenharmony_ci                fd: e.u64 as RawFd,
243f857971dSopenharmony_ci                events: e.events,
244f857971dSopenharmony_ci            }
245f857971dSopenharmony_ci        }).collect();
246f857971dSopenharmony_ci        Some(epoll_events)
247f857971dSopenharmony_ci    }
248f857971dSopenharmony_ci
249f857971dSopenharmony_ci    fn epoll_reset(&self, fd: RawFd) -> FusionResult<()>
250f857971dSopenharmony_ci    {
251f857971dSopenharmony_ci        call_debug_enter!("Epoll::epoll_reset");
252f857971dSopenharmony_ci        let mut ev = libc::epoll_event {
253f857971dSopenharmony_ci            events: LIBC_EPOLLIN | LIBC_EPOLLONESHOT | LIBC_EPOLLHUP | LIBC_EPOLLERR,
254f857971dSopenharmony_ci            u64: fd as u64,
255f857971dSopenharmony_ci        };
256f857971dSopenharmony_ci        // SAFETY:
257f857971dSopenharmony_ci        // The epoll API is multi-thread safe.
258f857971dSopenharmony_ci        // We have carefully ensure that parameters are as required by system interface.
259f857971dSopenharmony_ci        let ret = unsafe {
260f857971dSopenharmony_ci            libc::epoll_ctl(self.epoll_fd, libc::EPOLL_CTL_MOD, fd, &mut ev)
261f857971dSopenharmony_ci        };
262f857971dSopenharmony_ci        if ret != EPOLL_SUCCESS {
263f857971dSopenharmony_ci            error!(LOG_LABEL, "In reset_fd, epoll_ctl_mod({},{}) fail: {:?}",
264f857971dSopenharmony_ci                   @public(self.epoll_fd), @public(fd), @public(Error::last_os_error()));
265f857971dSopenharmony_ci            Err(FusionErrorCode::Fail)
266f857971dSopenharmony_ci        } else {
267f857971dSopenharmony_ci            Ok(())
268f857971dSopenharmony_ci        }
269f857971dSopenharmony_ci    }
270f857971dSopenharmony_ci
271f857971dSopenharmony_ci    fn add_epoll_handler(&self, fd: RawFd, epoll_handler: EpollHandler)
272f857971dSopenharmony_ci        -> FusionResult<Arc<dyn IEpollHandler>>
273f857971dSopenharmony_ci    {
274f857971dSopenharmony_ci        call_debug_enter!("Epoll::add_epoll_handler");
275f857971dSopenharmony_ci        let mut guard = self.handlers.lock().unwrap();
276f857971dSopenharmony_ci        if guard.contains_key(&fd) {
277f857971dSopenharmony_ci            error!(LOG_LABEL, "Epoll handler ({}) has been added", @public(fd));
278f857971dSopenharmony_ci            return Err(FusionErrorCode::Fail);
279f857971dSopenharmony_ci        }
280f857971dSopenharmony_ci        debug!(LOG_LABEL, "Add epoll handler ({})", @public(fd));
281f857971dSopenharmony_ci        let raw = epoll_handler.raw_handler();
282f857971dSopenharmony_ci        guard.insert(fd, epoll_handler);
283f857971dSopenharmony_ci        let _ = self.epoll_add(fd);
284f857971dSopenharmony_ci        Ok(raw)
285f857971dSopenharmony_ci    }
286f857971dSopenharmony_ci
287f857971dSopenharmony_ci    fn remove_epoll_handler(&self, fd: RawFd) -> FusionResult<Arc<dyn IEpollHandler>>
288f857971dSopenharmony_ci    {
289f857971dSopenharmony_ci        call_debug_enter!("Epoll::remove_epoll_handler");
290f857971dSopenharmony_ci        let mut guard = self.handlers.lock().unwrap();
291f857971dSopenharmony_ci        let _ = self.epoll_del(fd);
292f857971dSopenharmony_ci        if let Some(h) = guard.remove(&fd) {
293f857971dSopenharmony_ci            debug!(LOG_LABEL, "Remove epoll handler ({})", @public(fd));
294f857971dSopenharmony_ci            Ok(h.raw_handler())
295f857971dSopenharmony_ci        } else {
296f857971dSopenharmony_ci            error!(LOG_LABEL, "No epoll handler ({})", @public(fd));
297f857971dSopenharmony_ci            Err(FusionErrorCode::Fail)
298f857971dSopenharmony_ci        }
299f857971dSopenharmony_ci    }
300f857971dSopenharmony_ci
301f857971dSopenharmony_ci    fn wake(&self, events: &[EpollEvent])
302f857971dSopenharmony_ci    {
303f857971dSopenharmony_ci        call_debug_enter!("Epoll::wake");
304f857971dSopenharmony_ci        let mut guard = self.handlers.lock().unwrap();
305f857971dSopenharmony_ci        for e in events {
306f857971dSopenharmony_ci            if let Some(handler) = guard.get_mut(&e.fd) {
307f857971dSopenharmony_ci                debug!(LOG_LABEL, "Wake epoll handler ({})", @public(e.fd));
308f857971dSopenharmony_ci                handler.events = e.events;
309f857971dSopenharmony_ci                if let Some(waker) = &handler.waker {
310f857971dSopenharmony_ci                    waker.wake_by_ref();
311f857971dSopenharmony_ci                }
312f857971dSopenharmony_ci            } else {
313f857971dSopenharmony_ci                error!(LOG_LABEL, "No epoll handler ({})", @public(e.fd));
314f857971dSopenharmony_ci            }
315f857971dSopenharmony_ci        }
316f857971dSopenharmony_ci    }
317f857971dSopenharmony_ci
318f857971dSopenharmony_ci    fn dispatch_inner(&self, fd: RawFd, waker: &Waker) -> Option<(Arc<dyn IEpollHandler>, u32)>
319f857971dSopenharmony_ci    {
320f857971dSopenharmony_ci        call_debug_enter!("Epoll::dispatch_inner");
321f857971dSopenharmony_ci        let mut guard = self.handlers.lock().unwrap();
322f857971dSopenharmony_ci        if let Some(handler) = guard.get_mut(&fd) {
323f857971dSopenharmony_ci            handler.set_waker(waker);
324f857971dSopenharmony_ci            let events = handler.take_events() & LIBC_EPOLLALL;
325f857971dSopenharmony_ci            if events != LIBC_EPOLLNONE {
326f857971dSopenharmony_ci                Some((handler.raw_handler(), events))
327f857971dSopenharmony_ci            } else {
328f857971dSopenharmony_ci                debug!(LOG_LABEL, "No epoll event");
329f857971dSopenharmony_ci                None
330f857971dSopenharmony_ci            }
331f857971dSopenharmony_ci        } else {
332f857971dSopenharmony_ci            error!(LOG_LABEL, "No epoll handler with ({})", @public(fd));
333f857971dSopenharmony_ci            None
334f857971dSopenharmony_ci        }
335f857971dSopenharmony_ci    }
336f857971dSopenharmony_ci
337f857971dSopenharmony_ci    fn dispatch(&self, fd: RawFd, waker: &Waker)
338f857971dSopenharmony_ci    {
339f857971dSopenharmony_ci        call_debug_enter!("Epoll::dispatch");
340f857971dSopenharmony_ci        if let Some((handler, events)) = self.dispatch_inner(fd, waker) {
341f857971dSopenharmony_ci            handler.dispatch(events);
342f857971dSopenharmony_ci            let _ = self.epoll_reset(fd);
343f857971dSopenharmony_ci        }
344f857971dSopenharmony_ci    }
345f857971dSopenharmony_ci}
346f857971dSopenharmony_ci
347f857971dSopenharmony_ciimpl Default for Epoll {
348f857971dSopenharmony_ci    fn default() -> Self
349f857971dSopenharmony_ci    {
350f857971dSopenharmony_ci        Self::new()
351f857971dSopenharmony_ci    }
352f857971dSopenharmony_ci}
353f857971dSopenharmony_ci
354f857971dSopenharmony_ciimpl Drop for Epoll {
355f857971dSopenharmony_ci    fn drop(&mut self)
356f857971dSopenharmony_ci    {
357f857971dSopenharmony_ci        // SAFETY:
358f857971dSopenharmony_ci        // Parameter is as required by system, so consider it safe here.
359f857971dSopenharmony_ci        let ret = unsafe { libc::close(self.epoll_fd) };
360f857971dSopenharmony_ci        if ret != 0 {
361f857971dSopenharmony_ci            error!(LOG_LABEL, "close({}) fail: {:?}",
362f857971dSopenharmony_ci                   @public(self.epoll_fd),
363f857971dSopenharmony_ci                   @public(Error::last_os_error()));
364f857971dSopenharmony_ci        }
365f857971dSopenharmony_ci    }
366f857971dSopenharmony_ci}
367f857971dSopenharmony_ci
368f857971dSopenharmony_cistruct EpollHandlerFuture {
369f857971dSopenharmony_ci    fd: RawFd,
370f857971dSopenharmony_ci    epoll: Arc<Epoll>,
371f857971dSopenharmony_ci}
372f857971dSopenharmony_ci
373f857971dSopenharmony_ciimpl EpollHandlerFuture {
374f857971dSopenharmony_ci    fn new(fd: RawFd, epoll: Arc<Epoll>) -> Self
375f857971dSopenharmony_ci    {
376f857971dSopenharmony_ci        Self { fd, epoll }
377f857971dSopenharmony_ci    }
378f857971dSopenharmony_ci}
379f857971dSopenharmony_ci
380f857971dSopenharmony_ciimpl Future for EpollHandlerFuture {
381f857971dSopenharmony_ci    type Output = ();
382f857971dSopenharmony_ci
383f857971dSopenharmony_ci    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>
384f857971dSopenharmony_ci    {
385f857971dSopenharmony_ci        call_debug_enter!("EpollHandlerFuture::poll");
386f857971dSopenharmony_ci        self.epoll.dispatch(self.fd, cx.waker());
387f857971dSopenharmony_ci        Poll::Pending
388f857971dSopenharmony_ci    }
389f857971dSopenharmony_ci}
390f857971dSopenharmony_ci
391f857971dSopenharmony_cistruct EpollWaker {
392f857971dSopenharmony_ci    fds: [RawFd; 2],
393f857971dSopenharmony_ci}
394f857971dSopenharmony_ci
395f857971dSopenharmony_ciimpl EpollWaker {
396f857971dSopenharmony_ci    fn new() -> Self
397f857971dSopenharmony_ci    {
398f857971dSopenharmony_ci        let mut fds: [c_int; 2] = [-1; 2];
399f857971dSopenharmony_ci        // SAFETY:
400f857971dSopenharmony_ci        // The pipe API is multi-thread safe.
401f857971dSopenharmony_ci        // We have carefully checked that parameters are as required by system interface.
402f857971dSopenharmony_ci        let ret = unsafe {
403f857971dSopenharmony_ci            libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK)
404f857971dSopenharmony_ci        };
405f857971dSopenharmony_ci        if ret != 0 {
406f857971dSopenharmony_ci            error!(LOG_LABEL, "pipe2 fail: {:?}", @public(Error::last_os_error()));
407f857971dSopenharmony_ci        }
408f857971dSopenharmony_ci        Self { fds }
409f857971dSopenharmony_ci    }
410f857971dSopenharmony_ci
411f857971dSopenharmony_ci    fn wake(&self)
412f857971dSopenharmony_ci    {
413f857971dSopenharmony_ci        call_debug_enter!("EpollWaker::wake");
414f857971dSopenharmony_ci        let data: i32 = 0;
415f857971dSopenharmony_ci        // SAFETY:
416f857971dSopenharmony_ci        // We have carefully checked that parameters are as required by system interface.
417f857971dSopenharmony_ci        let ret = unsafe {
418f857971dSopenharmony_ci            libc::write(self.fds[1],
419f857971dSopenharmony_ci                        std::ptr::addr_of!(data) as *const c_void,
420f857971dSopenharmony_ci                        std::mem::size_of_val(&data))
421f857971dSopenharmony_ci        };
422f857971dSopenharmony_ci        if ret == SYSTEM_IO_FAILURE {
423f857971dSopenharmony_ci            error!(LOG_LABEL, "write fail: {:?}", @public(Error::last_os_error()));
424f857971dSopenharmony_ci        }
425f857971dSopenharmony_ci    }
426f857971dSopenharmony_ci}
427f857971dSopenharmony_ci
428f857971dSopenharmony_ciimpl IEpollHandler for EpollWaker {
429f857971dSopenharmony_ci    fn fd(&self) -> RawFd
430f857971dSopenharmony_ci    {
431f857971dSopenharmony_ci        self.fds[0]
432f857971dSopenharmony_ci    }
433f857971dSopenharmony_ci
434f857971dSopenharmony_ci    fn dispatch(&self, events: u32)
435f857971dSopenharmony_ci    {
436f857971dSopenharmony_ci        if (events & LIBC_EPOLLIN) == LIBC_EPOLLIN {
437f857971dSopenharmony_ci            let data: i32 = 0;
438f857971dSopenharmony_ci            // SAFETY:
439f857971dSopenharmony_ci            // Parameters are as required by system and business logic, so it can be trusted.
440f857971dSopenharmony_ci            let ret = unsafe {
441f857971dSopenharmony_ci                libc::read(self.fd(),
442f857971dSopenharmony_ci                           std::ptr::addr_of!(data) as *mut c_void,
443f857971dSopenharmony_ci                           std::mem::size_of_val(&data))
444f857971dSopenharmony_ci            };
445f857971dSopenharmony_ci            if ret == SYSTEM_IO_FAILURE {
446f857971dSopenharmony_ci                error!(LOG_LABEL, "read fail: {:?}", @public(Error::last_os_error()));
447f857971dSopenharmony_ci            }
448f857971dSopenharmony_ci        }
449f857971dSopenharmony_ci    }
450f857971dSopenharmony_ci}
451f857971dSopenharmony_ci
452f857971dSopenharmony_ciimpl Default for EpollWaker {
453f857971dSopenharmony_ci    fn default() -> Self
454f857971dSopenharmony_ci    {
455f857971dSopenharmony_ci        Self::new()
456f857971dSopenharmony_ci    }
457f857971dSopenharmony_ci}
458f857971dSopenharmony_ci
459f857971dSopenharmony_ciimpl Drop for EpollWaker {
460f857971dSopenharmony_ci    fn drop(&mut self)
461f857971dSopenharmony_ci    {
462f857971dSopenharmony_ci        for fd in &mut self.fds {
463f857971dSopenharmony_ci            if *fd != INVALID_FD {
464f857971dSopenharmony_ci                // SAFETY:
465f857971dSopenharmony_ci                // Parameter is as required by system, so consider it safe here.
466f857971dSopenharmony_ci                let ret = unsafe { libc::close(*fd) };
467f857971dSopenharmony_ci                if ret != EPOLL_SUCCESS {
468f857971dSopenharmony_ci                    error!(LOG_LABEL, "close({}) fail: {:?}",
469f857971dSopenharmony_ci                           @public(*fd),
470f857971dSopenharmony_ci                           @public(Error::last_os_error()));
471f857971dSopenharmony_ci                }
472f857971dSopenharmony_ci            }
473f857971dSopenharmony_ci        }
474f857971dSopenharmony_ci    }
475f857971dSopenharmony_ci}
476f857971dSopenharmony_ci
477f857971dSopenharmony_ci/// Bookkeeping of epoll handling.
478f857971dSopenharmony_cipub struct Scheduler {
479f857971dSopenharmony_ci    epoll: Arc<Epoll>,
480f857971dSopenharmony_ci    epoll_waker: Arc<EpollWaker>,
481f857971dSopenharmony_ci    is_running: Arc<AtomicBool>,
482f857971dSopenharmony_ci    join_handle: Option<std::thread::JoinHandle<()>>,
483f857971dSopenharmony_ci}
484f857971dSopenharmony_ci
485f857971dSopenharmony_ciimpl Scheduler {
486f857971dSopenharmony_ci    pub(crate) fn new() -> Self
487f857971dSopenharmony_ci    {
488f857971dSopenharmony_ci        call_debug_enter!("Scheduler::new");
489f857971dSopenharmony_ci        let epoll: Arc<Epoll> = Arc::default();
490f857971dSopenharmony_ci        let is_running = Arc::new(AtomicBool::new(true));
491f857971dSopenharmony_ci        let driver = Driver::new(epoll.clone(), is_running.clone());
492f857971dSopenharmony_ci        let join_handle = std::thread::spawn(move || {
493f857971dSopenharmony_ci            driver.run();
494f857971dSopenharmony_ci        });
495f857971dSopenharmony_ci        let scheduler = Self {
496f857971dSopenharmony_ci            epoll,
497f857971dSopenharmony_ci            epoll_waker: Arc::default(),
498f857971dSopenharmony_ci            is_running,
499f857971dSopenharmony_ci            join_handle: Some(join_handle),
500f857971dSopenharmony_ci        };
501f857971dSopenharmony_ci        let _ = scheduler.add_epoll_handler(scheduler.epoll_waker.clone());
502f857971dSopenharmony_ci        scheduler
503f857971dSopenharmony_ci    }
504f857971dSopenharmony_ci
505f857971dSopenharmony_ci    pub(crate) fn add_epoll_handler(&self, handler: Arc<dyn IEpollHandler>)
506f857971dSopenharmony_ci        -> FusionResult<Arc<dyn IEpollHandler>>
507f857971dSopenharmony_ci    {
508f857971dSopenharmony_ci        call_debug_enter!("Scheduler::add_epoll_handler");
509f857971dSopenharmony_ci        let fd: RawFd = handler.fd();
510f857971dSopenharmony_ci        let join_handle = ylong_runtime::spawn(
511f857971dSopenharmony_ci            EpollHandlerFuture::new(fd, self.epoll.clone())
512f857971dSopenharmony_ci        );
513f857971dSopenharmony_ci        self.epoll.add_epoll_handler(fd, EpollHandler::new(handler, join_handle))
514f857971dSopenharmony_ci    }
515f857971dSopenharmony_ci
516f857971dSopenharmony_ci    pub(crate) fn remove_epoll_handler(&self, handler: Arc<dyn IEpollHandler>)
517f857971dSopenharmony_ci        -> FusionResult<Arc<dyn IEpollHandler>>
518f857971dSopenharmony_ci    {
519f857971dSopenharmony_ci        call_debug_enter!("Scheduler::remove_epoll_handler");
520f857971dSopenharmony_ci        self.epoll.remove_epoll_handler(handler.fd())
521f857971dSopenharmony_ci    }
522f857971dSopenharmony_ci}
523f857971dSopenharmony_ci
524f857971dSopenharmony_ciimpl Default for Scheduler {
525f857971dSopenharmony_ci    fn default() -> Self
526f857971dSopenharmony_ci    {
527f857971dSopenharmony_ci        Self::new()
528f857971dSopenharmony_ci    }
529f857971dSopenharmony_ci}
530f857971dSopenharmony_ci
531f857971dSopenharmony_ciimpl Drop for Scheduler {
532f857971dSopenharmony_ci    fn drop(&mut self)
533f857971dSopenharmony_ci    {
534f857971dSopenharmony_ci        call_debug_enter!("Scheduler::drop");
535f857971dSopenharmony_ci        self.is_running.store(false, Ordering::Relaxed);
536f857971dSopenharmony_ci        self.epoll_waker.wake();
537f857971dSopenharmony_ci        if let Some(join_handle) = self.join_handle.take() {
538f857971dSopenharmony_ci            let _ = join_handle.join();
539f857971dSopenharmony_ci        }
540f857971dSopenharmony_ci    }
541f857971dSopenharmony_ci}
542