1/* 2 * Copyright (C) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16//! Tests of asynchronous scheduling. 17 18use std::ffi::{ c_void, c_char, c_int, CString }; 19use std::io::Error; 20use std::os::fd::RawFd; 21use std::sync::{ Arc, Condvar, Mutex }; 22use std::sync::atomic::{ AtomicI32, Ordering }; 23use std::time::Duration; 24 25use hilog_rust::{ debug, info, error, hilog, HiLogLabel, LogType }; 26 27use fusion_scheduler_rust::{ Handler, IEpollHandler, LIBC_EPOLLIN }; 28use fusion_utils_rust::call_debug_enter; 29 30const LOG_LABEL: HiLogLabel = HiLogLabel { 31 log_type: LogType::LogCore, 32 domain: 0xD002220, 33 tag: "FusionSchedulerTest", 34}; 35 36struct EpollHandlerImpl { 37 fds: [RawFd; 2], 38 data: i32, 39} 40 41impl EpollHandlerImpl { 42 fn signal(&self, data: i32) 43 { 44 error!(LOG_LABEL, "EpollHandlerImpl::signal once"); 45 let ret = unsafe { 46 libc::write(self.fds[1], std::ptr::addr_of!(data) as *const c_void, std::mem::size_of_val(&data)) 47 }; 48 if ret == -1 { 49 error!(LOG_LABEL, "libc::write fail"); 50 } 51 } 52 53 fn fd(&self) -> RawFd 54 { 55 self.fds[0] 56 } 57 58 fn dispatch(&mut self, events: u32) 59 { 60 call_debug_enter!("EpollHandlerImpl::dispatch"); 61 if (events & LIBC_EPOLLIN) == LIBC_EPOLLIN { 62 let data: i32 = 0; 63 64 let ret = unsafe { 65 libc::read(self.fds[0], std::ptr::addr_of!(data) as *mut c_void, std::mem::size_of_val(&data)) 66 }; 67 if ret == -1 { 68 error!(LOG_LABEL, "libc::read fail"); 69 } 70 info!(LOG_LABEL, "EpollHandlerImpl::dispatch({}), data:{}", @public(self.fds[0]), @public(data)); 71 self.data = data; 72 } 73 } 74 75 fn data(&self) -> i32 76 { 77 self.data 78 } 79} 80 81impl Drop for EpollHandlerImpl { 82 fn drop(&mut self) 83 { 84 for fd in &mut self.fds { 85 if *fd != -1 { 86 unsafe { libc::close(*fd) }; 87 *fd = -1; 88 } 89 } 90 } 91} 92 93struct EpollHandler { 94 inner: Mutex<EpollHandlerImpl>, 95 var: Condvar, 96} 97 98impl EpollHandler { 99 fn new() -> Self 100 { 101 let mut fds: [c_int; 2] = [-1; 2]; 102 103 let ret = unsafe { libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) }; 104 if ret != 0 { 105 error!(LOG_LABEL, "In EpollHandler::new, libc::pipe2 fail:{:?}", @public(Error::last_os_error())); 106 } 107 debug!(LOG_LABEL, "In EpollHandler::new, fds:({},{})", @public(fds[0]), @public(fds[1])); 108 Self { 109 inner: Mutex::new(EpollHandlerImpl { 110 fds, 111 data: -1, 112 }), 113 var: Condvar::new(), 114 } 115 } 116 117 fn signal(&self, data: i32) 118 { 119 let guard = self.inner.lock().unwrap(); 120 guard.signal(data); 121 } 122 123 fn data(&self) -> i32 124 { 125 let guard = self.inner.lock().unwrap(); 126 guard.data() 127 } 128 129 fn wait(&self, dur: Duration) -> bool 130 { 131 call_debug_enter!("EpollHandler::wait"); 132 let guard = self.inner.lock().unwrap(); 133 let (_, ret) = self.var.wait_timeout(guard, dur).unwrap(); 134 if ret.timed_out() { 135 info!(LOG_LABEL, "In EpollHandler::wait, timeout"); 136 false 137 } else { 138 true 139 } 140 } 141} 142 143impl IEpollHandler for EpollHandler { 144 fn fd(&self) -> RawFd 145 { 146 let guard = self.inner.lock().unwrap(); 147 guard.fd() 148 } 149 150 fn dispatch(&self, events: u32) 151 { 152 call_debug_enter!("EpollHandler::dispatch"); 153 let mut guard = self.inner.lock().unwrap(); 154 guard.dispatch(events); 155 self.var.notify_one(); 156 } 157} 158 159#[test] 160fn test_add_epoll_handler() 161{ 162 let handler: Arc<Handler> = Arc::default(); 163 let epoll = Arc::new(EpollHandler::new()); 164 assert!(handler.add_epoll_handler(epoll.clone()).is_ok()); 165 166 let data: i32 = 13574; 167 epoll.signal(data); 168 assert!(epoll.wait(Duration::from_millis(100))); 169 info!(LOG_LABEL, "In test_add_epoll_handler, data:{}", @public(epoll.data())); 170 assert_eq!(epoll.data(), data); 171 assert!(handler.remove_epoll_handler(epoll).is_ok()); 172} 173 174fn hash(param: usize) -> usize 175{ 176 const HASHER: usize = 0xAAAAAAAA; 177 HASHER ^ param 178} 179 180#[test] 181fn test_post_sync_task() 182{ 183 let handler: Arc<Handler> = Arc::default(); 184 let param: usize = 0xAB1807; 185 186 let ret = handler.post_sync_task(move || { 187 hash(param) 188 }); 189 let expected = hash(param); 190 assert_eq!(ret, expected); 191} 192 193#[test] 194fn test_post_async_task() 195{ 196 let handler: Arc<Handler> = Arc::default(); 197 let param: usize = 0xAB1807; 198 199 let mut task_handle = handler.post_async_task(move || { 200 hash(param) 201 }); 202 let ret = task_handle.result().unwrap(); 203 let expected = hash(param); 204 assert_eq!(ret, expected); 205} 206 207#[test] 208fn test_post_perioric_task() 209{ 210 let handler: Arc<Handler> = Arc::default(); 211 let epoll = Arc::new(EpollHandler::new()); 212 let cloned_epoll = epoll.clone(); 213 assert!(handler.add_epoll_handler(epoll.clone()).is_ok()); 214 215 let _ = handler.post_perioric_task(move || { 216 static ID_RADIX: AtomicI32 = AtomicI32::new(1); 217 cloned_epoll.signal(ID_RADIX.fetch_add(1, Ordering::Relaxed)); 218 }, None, Duration::from_millis(100), Some(10)); 219 220 std::thread::sleep(Duration::from_secs(1)); 221 info!(LOG_LABEL, "In test_post_perioric_task, data:{}", @public(epoll.data())); 222 assert!(epoll.data() >= 10); 223 assert!(handler.remove_epoll_handler(epoll).is_ok()); 224} 225 226#[test] 227fn test_post_delayed_task() 228{ 229 let handler: Arc<Handler> = Arc::default(); 230 let epoll = Arc::new(EpollHandler::new()); 231 assert!(handler.add_epoll_handler(epoll.clone()).is_ok()); 232 let data: i32 = 13547; 233 let cloned_epoll = epoll.clone(); 234 235 let _ = handler.post_delayed_task(move || { 236 cloned_epoll.signal(data); 237 }, Duration::from_millis(10)); 238 239 assert!(epoll.wait(Duration::from_millis(100))); 240 info!(LOG_LABEL, "In test_post_delayed_task, data:{}", @public(epoll.data())); 241 assert_eq!(epoll.data(), data); 242 assert!(handler.remove_epoll_handler(epoll).is_ok()); 243} 244 245#[test] 246fn test_post_blocking_task() 247{ 248 let handler: Arc<Handler> = Arc::default(); 249 let param: usize = 0xAB1807; 250 251 let mut task_handle = handler.post_blocking_task(move || { 252 std::thread::sleep(Duration::from_millis(100)); 253 hash(param) 254 }); 255 let ret = task_handle.result().unwrap(); 256 let expected = hash(param); 257 assert_eq!(ret, expected); 258} 259