1cac7dca0Sopenharmony_ci// Copyright (c) 2023 Huawei Device Co., Ltd. 2cac7dca0Sopenharmony_ci// Licensed under the Apache License, Version 2.0 (the "License"); 3cac7dca0Sopenharmony_ci// you may not use this file except in compliance with the License. 4cac7dca0Sopenharmony_ci// You may obtain a copy of the License at 5cac7dca0Sopenharmony_ci// 6cac7dca0Sopenharmony_ci// http://www.apache.org/licenses/LICENSE-2.0 7cac7dca0Sopenharmony_ci// 8cac7dca0Sopenharmony_ci// Unless required by applicable law or agreed to in writing, software 9cac7dca0Sopenharmony_ci// distributed under the License is distributed on an "AS IS" BASIS, 10cac7dca0Sopenharmony_ci// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11cac7dca0Sopenharmony_ci// See the License for the specific language governing permissions and 12cac7dca0Sopenharmony_ci// limitations under the License. 13cac7dca0Sopenharmony_ci 14cac7dca0Sopenharmony_ciuse std::future::Future; 15cac7dca0Sopenharmony_ciuse std::panic; 16cac7dca0Sopenharmony_ciuse std::ptr::NonNull; 17cac7dca0Sopenharmony_ciuse std::task::{Context, Poll, Waker}; 18cac7dca0Sopenharmony_ci 19cac7dca0Sopenharmony_ciuse crate::error::{ErrorKind, ScheduleError}; 20cac7dca0Sopenharmony_ciuse crate::executor::Schedule; 21cac7dca0Sopenharmony_ciuse crate::task::raw::{Header, Inner, TaskMngInfo}; 22cac7dca0Sopenharmony_ciuse crate::task::state; 23cac7dca0Sopenharmony_ciuse crate::task::state::StateAction; 24cac7dca0Sopenharmony_ciuse crate::task::waker::WakerRefHeader; 25cac7dca0Sopenharmony_ci 26cac7dca0Sopenharmony_cicfg_not_ffrt! { 27cac7dca0Sopenharmony_ci use crate::task::Task; 28cac7dca0Sopenharmony_ci} 29cac7dca0Sopenharmony_ci 30cac7dca0Sopenharmony_cipub(crate) struct TaskHandle<T: Future, S: Schedule> { 31cac7dca0Sopenharmony_ci task: NonNull<TaskMngInfo<T, S>>, 32cac7dca0Sopenharmony_ci} 33cac7dca0Sopenharmony_ci 34cac7dca0Sopenharmony_ciimpl<T, S> TaskHandle<T, S> 35cac7dca0Sopenharmony_ciwhere 36cac7dca0Sopenharmony_ci T: Future, 37cac7dca0Sopenharmony_ci S: Schedule, 38cac7dca0Sopenharmony_ci{ 39cac7dca0Sopenharmony_ci pub(crate) unsafe fn from_raw(ptr: NonNull<Header>) -> Self { 40cac7dca0Sopenharmony_ci TaskHandle { 41cac7dca0Sopenharmony_ci task: ptr.cast::<TaskMngInfo<T, S>>(), 42cac7dca0Sopenharmony_ci } 43cac7dca0Sopenharmony_ci } 44cac7dca0Sopenharmony_ci 45cac7dca0Sopenharmony_ci fn header(&self) -> &Header { 46cac7dca0Sopenharmony_ci unsafe { self.task.as_ref().header() } 47cac7dca0Sopenharmony_ci } 48cac7dca0Sopenharmony_ci 49cac7dca0Sopenharmony_ci fn inner(&self) -> &Inner<T, S> { 50cac7dca0Sopenharmony_ci unsafe { self.task.as_ref().inner() } 51cac7dca0Sopenharmony_ci } 52cac7dca0Sopenharmony_ci} 53cac7dca0Sopenharmony_ci 54cac7dca0Sopenharmony_ciimpl<T, S> TaskHandle<T, S> 55cac7dca0Sopenharmony_ciwhere 56cac7dca0Sopenharmony_ci T: Future, 57cac7dca0Sopenharmony_ci S: Schedule, 58cac7dca0Sopenharmony_ci{ 59cac7dca0Sopenharmony_ci fn finish(self, state: usize, output: Result<T::Output, ScheduleError>) { 60cac7dca0Sopenharmony_ci // send result if the JoinHandle is not dropped 61cac7dca0Sopenharmony_ci if state::is_care_join_handle(state) { 62cac7dca0Sopenharmony_ci self.inner().send_result(output); 63cac7dca0Sopenharmony_ci } else { 64cac7dca0Sopenharmony_ci self.inner().turning_to_used_data(); 65cac7dca0Sopenharmony_ci } 66cac7dca0Sopenharmony_ci 67cac7dca0Sopenharmony_ci let cur = match self.header().state.turning_to_finish() { 68cac7dca0Sopenharmony_ci Ok(cur) => cur, 69cac7dca0Sopenharmony_ci Err(e) => panic!("{}", e.as_str()), 70cac7dca0Sopenharmony_ci }; 71cac7dca0Sopenharmony_ci 72cac7dca0Sopenharmony_ci if state::is_set_waker(cur) { 73cac7dca0Sopenharmony_ci self.inner().wake_join(); 74cac7dca0Sopenharmony_ci } 75cac7dca0Sopenharmony_ci self.drop_ref(); 76cac7dca0Sopenharmony_ci } 77cac7dca0Sopenharmony_ci 78cac7dca0Sopenharmony_ci pub(crate) fn release(self) { 79cac7dca0Sopenharmony_ci unsafe { drop(Box::from_raw(self.task.as_ptr())) }; 80cac7dca0Sopenharmony_ci } 81cac7dca0Sopenharmony_ci 82cac7dca0Sopenharmony_ci pub(crate) fn drop_ref(self) { 83cac7dca0Sopenharmony_ci let prev = self.header().state.dec_ref(); 84cac7dca0Sopenharmony_ci if state::is_last_ref_count(prev) { 85cac7dca0Sopenharmony_ci self.release(); 86cac7dca0Sopenharmony_ci } 87cac7dca0Sopenharmony_ci } 88cac7dca0Sopenharmony_ci 89cac7dca0Sopenharmony_ci pub(crate) fn get_result(self, out: &mut Poll<std::result::Result<T::Output, ScheduleError>>) { 90cac7dca0Sopenharmony_ci *out = Poll::Ready(self.inner().turning_to_get_data()); 91cac7dca0Sopenharmony_ci } 92cac7dca0Sopenharmony_ci 93cac7dca0Sopenharmony_ci pub(crate) fn drop_join_handle(self) { 94cac7dca0Sopenharmony_ci if self.header().state.try_turning_to_un_join_handle() { 95cac7dca0Sopenharmony_ci return; 96cac7dca0Sopenharmony_ci } 97cac7dca0Sopenharmony_ci 98cac7dca0Sopenharmony_ci match self.header().state.turn_to_un_join_handle() { 99cac7dca0Sopenharmony_ci Ok(_) => {} 100cac7dca0Sopenharmony_ci Err(_) => { 101cac7dca0Sopenharmony_ci self.inner().turning_to_used_data(); 102cac7dca0Sopenharmony_ci } 103cac7dca0Sopenharmony_ci } 104cac7dca0Sopenharmony_ci self.drop_ref(); 105cac7dca0Sopenharmony_ci } 106cac7dca0Sopenharmony_ci 107cac7dca0Sopenharmony_ci fn set_waker_inner(&self, des_waker: Waker, cur_state: usize) -> Result<usize, usize> { 108cac7dca0Sopenharmony_ci assert!( 109cac7dca0Sopenharmony_ci state::is_care_join_handle(cur_state), 110cac7dca0Sopenharmony_ci "set waker failed: the join handle has been dropped" 111cac7dca0Sopenharmony_ci ); 112cac7dca0Sopenharmony_ci assert!( 113cac7dca0Sopenharmony_ci !state::is_set_waker(cur_state), 114cac7dca0Sopenharmony_ci "set waker failed: the task already has a waker set" 115cac7dca0Sopenharmony_ci ); 116cac7dca0Sopenharmony_ci 117cac7dca0Sopenharmony_ci unsafe { 118cac7dca0Sopenharmony_ci let waker = self.inner().waker.get(); 119cac7dca0Sopenharmony_ci *waker = Some(des_waker); 120cac7dca0Sopenharmony_ci } 121cac7dca0Sopenharmony_ci let result = self.header().state.turn_to_set_waker(); 122cac7dca0Sopenharmony_ci if result.is_err() { 123cac7dca0Sopenharmony_ci unsafe { 124cac7dca0Sopenharmony_ci let waker = self.inner().waker.get(); 125cac7dca0Sopenharmony_ci *waker = None; 126cac7dca0Sopenharmony_ci } 127cac7dca0Sopenharmony_ci } 128cac7dca0Sopenharmony_ci result 129cac7dca0Sopenharmony_ci } 130cac7dca0Sopenharmony_ci 131cac7dca0Sopenharmony_ci pub(crate) fn set_waker(self, cur: usize, des_waker: &Waker) -> bool { 132cac7dca0Sopenharmony_ci let res = if state::is_set_waker(cur) { 133cac7dca0Sopenharmony_ci let is_same_waker = unsafe { 134cac7dca0Sopenharmony_ci // the status is set_waker, so waker must be set already 135cac7dca0Sopenharmony_ci let waker = self.inner().waker.get(); 136cac7dca0Sopenharmony_ci (*waker) 137cac7dca0Sopenharmony_ci .as_ref() 138cac7dca0Sopenharmony_ci .expect("task status is set_waker, but waker is missing") 139cac7dca0Sopenharmony_ci .will_wake(des_waker) 140cac7dca0Sopenharmony_ci }; 141cac7dca0Sopenharmony_ci // we don't register the same waker 142cac7dca0Sopenharmony_ci if is_same_waker { 143cac7dca0Sopenharmony_ci return false; 144cac7dca0Sopenharmony_ci } 145cac7dca0Sopenharmony_ci self.header() 146cac7dca0Sopenharmony_ci .state 147cac7dca0Sopenharmony_ci .turn_to_un_set_waker() 148cac7dca0Sopenharmony_ci .and_then(|cur| self.set_waker_inner(des_waker.clone(), cur)) 149cac7dca0Sopenharmony_ci } else { 150cac7dca0Sopenharmony_ci self.set_waker_inner(des_waker.clone(), cur) 151cac7dca0Sopenharmony_ci }; 152cac7dca0Sopenharmony_ci 153cac7dca0Sopenharmony_ci if let Err(cur) = res { 154cac7dca0Sopenharmony_ci assert!( 155cac7dca0Sopenharmony_ci state::is_finished(cur), 156cac7dca0Sopenharmony_ci "setting waker should only be failed dur to task completion" 157cac7dca0Sopenharmony_ci ); 158cac7dca0Sopenharmony_ci return true; 159cac7dca0Sopenharmony_ci } 160cac7dca0Sopenharmony_ci 161cac7dca0Sopenharmony_ci false 162cac7dca0Sopenharmony_ci } 163cac7dca0Sopenharmony_ci} 164cac7dca0Sopenharmony_ci 165cac7dca0Sopenharmony_ci#[cfg(not(feature = "ffrt"))] 166cac7dca0Sopenharmony_ciimpl<T, S> TaskHandle<T, S> 167cac7dca0Sopenharmony_ciwhere 168cac7dca0Sopenharmony_ci T: Future, 169cac7dca0Sopenharmony_ci S: Schedule, 170cac7dca0Sopenharmony_ci{ 171cac7dca0Sopenharmony_ci // Runs the task 172cac7dca0Sopenharmony_ci pub(crate) fn run(self) { 173cac7dca0Sopenharmony_ci let action = self.header().state.turning_to_running(); 174cac7dca0Sopenharmony_ci 175cac7dca0Sopenharmony_ci match action { 176cac7dca0Sopenharmony_ci StateAction::Success => {} 177cac7dca0Sopenharmony_ci StateAction::Canceled(cur) => { 178cac7dca0Sopenharmony_ci let output = self.get_canceled(); 179cac7dca0Sopenharmony_ci return self.finish(cur, Err(output)); 180cac7dca0Sopenharmony_ci } 181cac7dca0Sopenharmony_ci StateAction::Failed(state) => panic!("task state invalid: {state}"), 182cac7dca0Sopenharmony_ci _ => unreachable!(), 183cac7dca0Sopenharmony_ci }; 184cac7dca0Sopenharmony_ci 185cac7dca0Sopenharmony_ci // turn the task header into a waker 186cac7dca0Sopenharmony_ci let waker = WakerRefHeader::<'_>::new::<T>(self.header()); 187cac7dca0Sopenharmony_ci let mut context = Context::from_waker(&waker); 188cac7dca0Sopenharmony_ci 189cac7dca0Sopenharmony_ci let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { 190cac7dca0Sopenharmony_ci self.inner().poll(&mut context).map(Ok) 191cac7dca0Sopenharmony_ci })); 192cac7dca0Sopenharmony_ci 193cac7dca0Sopenharmony_ci let cur = self.header().state.get_current_state(); 194cac7dca0Sopenharmony_ci match res { 195cac7dca0Sopenharmony_ci Ok(Poll::Ready(output)) => { 196cac7dca0Sopenharmony_ci // send result if the JoinHandle is not dropped 197cac7dca0Sopenharmony_ci self.finish(cur, output); 198cac7dca0Sopenharmony_ci } 199cac7dca0Sopenharmony_ci 200cac7dca0Sopenharmony_ci Ok(Poll::Pending) => match self.header().state.turning_to_idle() { 201cac7dca0Sopenharmony_ci StateAction::Enqueue => { 202cac7dca0Sopenharmony_ci self.get_scheduled(true); 203cac7dca0Sopenharmony_ci } 204cac7dca0Sopenharmony_ci StateAction::Failed(state) => panic!("task state invalid: {state}"), 205cac7dca0Sopenharmony_ci StateAction::Canceled(state) => { 206cac7dca0Sopenharmony_ci let output = self.get_canceled(); 207cac7dca0Sopenharmony_ci self.finish(state, Err(output)); 208cac7dca0Sopenharmony_ci } 209cac7dca0Sopenharmony_ci _ => {} 210cac7dca0Sopenharmony_ci }, 211cac7dca0Sopenharmony_ci 212cac7dca0Sopenharmony_ci Err(_) => { 213cac7dca0Sopenharmony_ci let output = Err(ScheduleError::new(ErrorKind::Panic, "panic happen")); 214cac7dca0Sopenharmony_ci self.finish(cur, output); 215cac7dca0Sopenharmony_ci } 216cac7dca0Sopenharmony_ci } 217cac7dca0Sopenharmony_ci } 218cac7dca0Sopenharmony_ci 219cac7dca0Sopenharmony_ci pub(crate) unsafe fn shutdown(self) { 220cac7dca0Sopenharmony_ci // Check if the JoinHandle gets dropped already. If JoinHandle is still there, 221cac7dca0Sopenharmony_ci // wakes the JoinHandle. 222cac7dca0Sopenharmony_ci let cur = self.header().state.dec_ref(); 223cac7dca0Sopenharmony_ci if state::ref_count(cur) > 0 && state::is_care_join_handle(cur) { 224cac7dca0Sopenharmony_ci self.set_canceled(); 225cac7dca0Sopenharmony_ci } else { 226cac7dca0Sopenharmony_ci self.release(); 227cac7dca0Sopenharmony_ci } 228cac7dca0Sopenharmony_ci } 229cac7dca0Sopenharmony_ci 230cac7dca0Sopenharmony_ci pub(crate) fn wake(self) { 231cac7dca0Sopenharmony_ci self.wake_by_ref(); 232cac7dca0Sopenharmony_ci self.drop_ref(); 233cac7dca0Sopenharmony_ci } 234cac7dca0Sopenharmony_ci 235cac7dca0Sopenharmony_ci pub(crate) fn wake_by_ref(&self) { 236cac7dca0Sopenharmony_ci let prev = self.header().state.turn_to_scheduling(); 237cac7dca0Sopenharmony_ci if state::need_enqueue(prev) { 238cac7dca0Sopenharmony_ci self.get_scheduled(false); 239cac7dca0Sopenharmony_ci } 240cac7dca0Sopenharmony_ci } 241cac7dca0Sopenharmony_ci 242cac7dca0Sopenharmony_ci // Actually cancels the task during running 243cac7dca0Sopenharmony_ci fn get_canceled(&self) -> ScheduleError { 244cac7dca0Sopenharmony_ci self.inner().turning_to_used_data(); 245cac7dca0Sopenharmony_ci ErrorKind::TaskCanceled.into() 246cac7dca0Sopenharmony_ci } 247cac7dca0Sopenharmony_ci 248cac7dca0Sopenharmony_ci // Sets task state into canceled and scheduled 249cac7dca0Sopenharmony_ci pub(crate) fn set_canceled(&self) { 250cac7dca0Sopenharmony_ci if self.header().state.turn_to_canceled_and_scheduled() { 251cac7dca0Sopenharmony_ci self.get_scheduled(false); 252cac7dca0Sopenharmony_ci } 253cac7dca0Sopenharmony_ci } 254cac7dca0Sopenharmony_ci 255cac7dca0Sopenharmony_ci fn to_task(&self) -> Task { 256cac7dca0Sopenharmony_ci unsafe { Task::from_raw(self.header().into()) } 257cac7dca0Sopenharmony_ci } 258cac7dca0Sopenharmony_ci 259cac7dca0Sopenharmony_ci fn get_scheduled(&self, lifo: bool) { 260cac7dca0Sopenharmony_ci // the scheduler must exist when calling this method 261cac7dca0Sopenharmony_ci self.inner() 262cac7dca0Sopenharmony_ci .scheduler 263cac7dca0Sopenharmony_ci .upgrade() 264cac7dca0Sopenharmony_ci .expect("the scheduler has already been dropped") 265cac7dca0Sopenharmony_ci .schedule(self.to_task(), lifo); 266cac7dca0Sopenharmony_ci } 267cac7dca0Sopenharmony_ci} 268cac7dca0Sopenharmony_ci 269cac7dca0Sopenharmony_ci#[cfg(feature = "ffrt")] 270cac7dca0Sopenharmony_ciimpl<T, S> TaskHandle<T, S> 271cac7dca0Sopenharmony_ciwhere 272cac7dca0Sopenharmony_ci T: Future, 273cac7dca0Sopenharmony_ci S: Schedule, 274cac7dca0Sopenharmony_ci{ 275cac7dca0Sopenharmony_ci pub(crate) fn ffrt_run(self) -> bool { 276cac7dca0Sopenharmony_ci self.inner().get_task_ctx(); 277cac7dca0Sopenharmony_ci 278cac7dca0Sopenharmony_ci match self.header().state.turning_to_running() { 279cac7dca0Sopenharmony_ci StateAction::Failed(state) => panic!("turning to running failed: {:b}", state), 280cac7dca0Sopenharmony_ci StateAction::Canceled(cur) => { 281cac7dca0Sopenharmony_ci let output = self.ffrt_get_canceled(); 282cac7dca0Sopenharmony_ci self.finish(cur, Err(output)); 283cac7dca0Sopenharmony_ci return true; 284cac7dca0Sopenharmony_ci } 285cac7dca0Sopenharmony_ci _ => {} 286cac7dca0Sopenharmony_ci } 287cac7dca0Sopenharmony_ci 288cac7dca0Sopenharmony_ci let waker = WakerRefHeader::<'_>::new::<T>(self.header()); 289cac7dca0Sopenharmony_ci let mut context = Context::from_waker(&waker); 290cac7dca0Sopenharmony_ci 291cac7dca0Sopenharmony_ci let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { 292cac7dca0Sopenharmony_ci self.inner().poll(&mut context).map(Ok) 293cac7dca0Sopenharmony_ci })); 294cac7dca0Sopenharmony_ci 295cac7dca0Sopenharmony_ci let cur = self.header().state.get_current_state(); 296cac7dca0Sopenharmony_ci match res { 297cac7dca0Sopenharmony_ci Ok(Poll::Ready(output)) => { 298cac7dca0Sopenharmony_ci // send result if the JoinHandle is not dropped 299cac7dca0Sopenharmony_ci self.finish(cur, output); 300cac7dca0Sopenharmony_ci true 301cac7dca0Sopenharmony_ci } 302cac7dca0Sopenharmony_ci 303cac7dca0Sopenharmony_ci Ok(Poll::Pending) => match self.header().state.turning_to_idle() { 304cac7dca0Sopenharmony_ci StateAction::Enqueue => { 305cac7dca0Sopenharmony_ci let ffrt_task = unsafe { (*self.inner().task.get()).as_ref().unwrap() }; 306cac7dca0Sopenharmony_ci ffrt_task.wake_task(); 307cac7dca0Sopenharmony_ci false 308cac7dca0Sopenharmony_ci } 309cac7dca0Sopenharmony_ci StateAction::Failed(state) => panic!("task state invalid: {:b}", state), 310cac7dca0Sopenharmony_ci StateAction::Canceled(state) => { 311cac7dca0Sopenharmony_ci let output = self.ffrt_get_canceled(); 312cac7dca0Sopenharmony_ci self.finish(state, Err(output)); 313cac7dca0Sopenharmony_ci true 314cac7dca0Sopenharmony_ci } 315cac7dca0Sopenharmony_ci _ => false, 316cac7dca0Sopenharmony_ci }, 317cac7dca0Sopenharmony_ci 318cac7dca0Sopenharmony_ci Err(_) => { 319cac7dca0Sopenharmony_ci let output = Err(ScheduleError::new(ErrorKind::Panic, "panic happen")); 320cac7dca0Sopenharmony_ci self.finish(cur, output); 321cac7dca0Sopenharmony_ci true 322cac7dca0Sopenharmony_ci } 323cac7dca0Sopenharmony_ci } 324cac7dca0Sopenharmony_ci } 325cac7dca0Sopenharmony_ci 326cac7dca0Sopenharmony_ci pub(crate) fn ffrt_wake(self) { 327cac7dca0Sopenharmony_ci self.ffrt_wake_by_ref(); 328cac7dca0Sopenharmony_ci self.drop_ref(); 329cac7dca0Sopenharmony_ci } 330cac7dca0Sopenharmony_ci 331cac7dca0Sopenharmony_ci pub(crate) fn ffrt_wake_by_ref(&self) { 332cac7dca0Sopenharmony_ci let prev = self.header().state.turn_to_scheduling(); 333cac7dca0Sopenharmony_ci if state::need_enqueue(prev) { 334cac7dca0Sopenharmony_ci let ffrt_task = unsafe { (*self.inner().task.get()).as_ref().unwrap() }; 335cac7dca0Sopenharmony_ci ffrt_task.wake_task(); 336cac7dca0Sopenharmony_ci } 337cac7dca0Sopenharmony_ci } 338cac7dca0Sopenharmony_ci 339cac7dca0Sopenharmony_ci // Actually cancels the task during running 340cac7dca0Sopenharmony_ci fn ffrt_get_canceled(&self) -> ScheduleError { 341cac7dca0Sopenharmony_ci self.inner().turning_to_used_data(); 342cac7dca0Sopenharmony_ci ErrorKind::TaskCanceled.into() 343cac7dca0Sopenharmony_ci } 344cac7dca0Sopenharmony_ci 345cac7dca0Sopenharmony_ci // Sets task state into canceled and scheduled 346cac7dca0Sopenharmony_ci pub(crate) fn ffrt_set_canceled(&self) { 347cac7dca0Sopenharmony_ci if self.header().state.turn_to_canceled_and_scheduled() { 348cac7dca0Sopenharmony_ci let ffrt_task = unsafe { (*self.inner().task.get()).as_ref().unwrap() }; 349cac7dca0Sopenharmony_ci ffrt_task.wake_task(); 350cac7dca0Sopenharmony_ci } 351cac7dca0Sopenharmony_ci } 352cac7dca0Sopenharmony_ci} 353