1cac7dca0Sopenharmony_ci// Copyright (c) 2023 Huawei Device Co., Ltd.
2cac7dca0Sopenharmony_ci// Licensed under the Apache License, Version 2.0 (the "License");
3cac7dca0Sopenharmony_ci// you may not use this file except in compliance with the License.
4cac7dca0Sopenharmony_ci// You may obtain a copy of the License at
5cac7dca0Sopenharmony_ci//
6cac7dca0Sopenharmony_ci//     http://www.apache.org/licenses/LICENSE-2.0
7cac7dca0Sopenharmony_ci//
8cac7dca0Sopenharmony_ci// Unless required by applicable law or agreed to in writing, software
9cac7dca0Sopenharmony_ci// distributed under the License is distributed on an "AS IS" BASIS,
10cac7dca0Sopenharmony_ci// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11cac7dca0Sopenharmony_ci// See the License for the specific language governing permissions and
12cac7dca0Sopenharmony_ci// limitations under the License.
13cac7dca0Sopenharmony_ci
14cac7dca0Sopenharmony_ciuse std::future::Future;
15cac7dca0Sopenharmony_ciuse std::panic;
16cac7dca0Sopenharmony_ciuse std::ptr::NonNull;
17cac7dca0Sopenharmony_ciuse std::task::{Context, Poll, Waker};
18cac7dca0Sopenharmony_ci
19cac7dca0Sopenharmony_ciuse crate::error::{ErrorKind, ScheduleError};
20cac7dca0Sopenharmony_ciuse crate::executor::Schedule;
21cac7dca0Sopenharmony_ciuse crate::task::raw::{Header, Inner, TaskMngInfo};
22cac7dca0Sopenharmony_ciuse crate::task::state;
23cac7dca0Sopenharmony_ciuse crate::task::state::StateAction;
24cac7dca0Sopenharmony_ciuse crate::task::waker::WakerRefHeader;
25cac7dca0Sopenharmony_ci
26cac7dca0Sopenharmony_cicfg_not_ffrt! {
27cac7dca0Sopenharmony_ci    use crate::task::Task;
28cac7dca0Sopenharmony_ci}
29cac7dca0Sopenharmony_ci
30cac7dca0Sopenharmony_cipub(crate) struct TaskHandle<T: Future, S: Schedule> {
31cac7dca0Sopenharmony_ci    task: NonNull<TaskMngInfo<T, S>>,
32cac7dca0Sopenharmony_ci}
33cac7dca0Sopenharmony_ci
34cac7dca0Sopenharmony_ciimpl<T, S> TaskHandle<T, S>
35cac7dca0Sopenharmony_ciwhere
36cac7dca0Sopenharmony_ci    T: Future,
37cac7dca0Sopenharmony_ci    S: Schedule,
38cac7dca0Sopenharmony_ci{
39cac7dca0Sopenharmony_ci    pub(crate) unsafe fn from_raw(ptr: NonNull<Header>) -> Self {
40cac7dca0Sopenharmony_ci        TaskHandle {
41cac7dca0Sopenharmony_ci            task: ptr.cast::<TaskMngInfo<T, S>>(),
42cac7dca0Sopenharmony_ci        }
43cac7dca0Sopenharmony_ci    }
44cac7dca0Sopenharmony_ci
45cac7dca0Sopenharmony_ci    fn header(&self) -> &Header {
46cac7dca0Sopenharmony_ci        unsafe { self.task.as_ref().header() }
47cac7dca0Sopenharmony_ci    }
48cac7dca0Sopenharmony_ci
49cac7dca0Sopenharmony_ci    fn inner(&self) -> &Inner<T, S> {
50cac7dca0Sopenharmony_ci        unsafe { self.task.as_ref().inner() }
51cac7dca0Sopenharmony_ci    }
52cac7dca0Sopenharmony_ci}
53cac7dca0Sopenharmony_ci
54cac7dca0Sopenharmony_ciimpl<T, S> TaskHandle<T, S>
55cac7dca0Sopenharmony_ciwhere
56cac7dca0Sopenharmony_ci    T: Future,
57cac7dca0Sopenharmony_ci    S: Schedule,
58cac7dca0Sopenharmony_ci{
59cac7dca0Sopenharmony_ci    fn finish(self, state: usize, output: Result<T::Output, ScheduleError>) {
60cac7dca0Sopenharmony_ci        // send result if the JoinHandle is not dropped
61cac7dca0Sopenharmony_ci        if state::is_care_join_handle(state) {
62cac7dca0Sopenharmony_ci            self.inner().send_result(output);
63cac7dca0Sopenharmony_ci        } else {
64cac7dca0Sopenharmony_ci            self.inner().turning_to_used_data();
65cac7dca0Sopenharmony_ci        }
66cac7dca0Sopenharmony_ci
67cac7dca0Sopenharmony_ci        let cur = match self.header().state.turning_to_finish() {
68cac7dca0Sopenharmony_ci            Ok(cur) => cur,
69cac7dca0Sopenharmony_ci            Err(e) => panic!("{}", e.as_str()),
70cac7dca0Sopenharmony_ci        };
71cac7dca0Sopenharmony_ci
72cac7dca0Sopenharmony_ci        if state::is_set_waker(cur) {
73cac7dca0Sopenharmony_ci            self.inner().wake_join();
74cac7dca0Sopenharmony_ci        }
75cac7dca0Sopenharmony_ci        self.drop_ref();
76cac7dca0Sopenharmony_ci    }
77cac7dca0Sopenharmony_ci
78cac7dca0Sopenharmony_ci    pub(crate) fn release(self) {
79cac7dca0Sopenharmony_ci        unsafe { drop(Box::from_raw(self.task.as_ptr())) };
80cac7dca0Sopenharmony_ci    }
81cac7dca0Sopenharmony_ci
82cac7dca0Sopenharmony_ci    pub(crate) fn drop_ref(self) {
83cac7dca0Sopenharmony_ci        let prev = self.header().state.dec_ref();
84cac7dca0Sopenharmony_ci        if state::is_last_ref_count(prev) {
85cac7dca0Sopenharmony_ci            self.release();
86cac7dca0Sopenharmony_ci        }
87cac7dca0Sopenharmony_ci    }
88cac7dca0Sopenharmony_ci
89cac7dca0Sopenharmony_ci    pub(crate) fn get_result(self, out: &mut Poll<std::result::Result<T::Output, ScheduleError>>) {
90cac7dca0Sopenharmony_ci        *out = Poll::Ready(self.inner().turning_to_get_data());
91cac7dca0Sopenharmony_ci    }
92cac7dca0Sopenharmony_ci
93cac7dca0Sopenharmony_ci    pub(crate) fn drop_join_handle(self) {
94cac7dca0Sopenharmony_ci        if self.header().state.try_turning_to_un_join_handle() {
95cac7dca0Sopenharmony_ci            return;
96cac7dca0Sopenharmony_ci        }
97cac7dca0Sopenharmony_ci
98cac7dca0Sopenharmony_ci        match self.header().state.turn_to_un_join_handle() {
99cac7dca0Sopenharmony_ci            Ok(_) => {}
100cac7dca0Sopenharmony_ci            Err(_) => {
101cac7dca0Sopenharmony_ci                self.inner().turning_to_used_data();
102cac7dca0Sopenharmony_ci            }
103cac7dca0Sopenharmony_ci        }
104cac7dca0Sopenharmony_ci        self.drop_ref();
105cac7dca0Sopenharmony_ci    }
106cac7dca0Sopenharmony_ci
107cac7dca0Sopenharmony_ci    fn set_waker_inner(&self, des_waker: Waker, cur_state: usize) -> Result<usize, usize> {
108cac7dca0Sopenharmony_ci        assert!(
109cac7dca0Sopenharmony_ci            state::is_care_join_handle(cur_state),
110cac7dca0Sopenharmony_ci            "set waker failed: the join handle has been dropped"
111cac7dca0Sopenharmony_ci        );
112cac7dca0Sopenharmony_ci        assert!(
113cac7dca0Sopenharmony_ci            !state::is_set_waker(cur_state),
114cac7dca0Sopenharmony_ci            "set waker failed: the task already has a waker set"
115cac7dca0Sopenharmony_ci        );
116cac7dca0Sopenharmony_ci
117cac7dca0Sopenharmony_ci        unsafe {
118cac7dca0Sopenharmony_ci            let waker = self.inner().waker.get();
119cac7dca0Sopenharmony_ci            *waker = Some(des_waker);
120cac7dca0Sopenharmony_ci        }
121cac7dca0Sopenharmony_ci        let result = self.header().state.turn_to_set_waker();
122cac7dca0Sopenharmony_ci        if result.is_err() {
123cac7dca0Sopenharmony_ci            unsafe {
124cac7dca0Sopenharmony_ci                let waker = self.inner().waker.get();
125cac7dca0Sopenharmony_ci                *waker = None;
126cac7dca0Sopenharmony_ci            }
127cac7dca0Sopenharmony_ci        }
128cac7dca0Sopenharmony_ci        result
129cac7dca0Sopenharmony_ci    }
130cac7dca0Sopenharmony_ci
131cac7dca0Sopenharmony_ci    pub(crate) fn set_waker(self, cur: usize, des_waker: &Waker) -> bool {
132cac7dca0Sopenharmony_ci        let res = if state::is_set_waker(cur) {
133cac7dca0Sopenharmony_ci            let is_same_waker = unsafe {
134cac7dca0Sopenharmony_ci                // the status is set_waker, so waker must be set already
135cac7dca0Sopenharmony_ci                let waker = self.inner().waker.get();
136cac7dca0Sopenharmony_ci                (*waker)
137cac7dca0Sopenharmony_ci                    .as_ref()
138cac7dca0Sopenharmony_ci                    .expect("task status is set_waker, but waker is missing")
139cac7dca0Sopenharmony_ci                    .will_wake(des_waker)
140cac7dca0Sopenharmony_ci            };
141cac7dca0Sopenharmony_ci            // we don't register the same waker
142cac7dca0Sopenharmony_ci            if is_same_waker {
143cac7dca0Sopenharmony_ci                return false;
144cac7dca0Sopenharmony_ci            }
145cac7dca0Sopenharmony_ci            self.header()
146cac7dca0Sopenharmony_ci                .state
147cac7dca0Sopenharmony_ci                .turn_to_un_set_waker()
148cac7dca0Sopenharmony_ci                .and_then(|cur| self.set_waker_inner(des_waker.clone(), cur))
149cac7dca0Sopenharmony_ci        } else {
150cac7dca0Sopenharmony_ci            self.set_waker_inner(des_waker.clone(), cur)
151cac7dca0Sopenharmony_ci        };
152cac7dca0Sopenharmony_ci
153cac7dca0Sopenharmony_ci        if let Err(cur) = res {
154cac7dca0Sopenharmony_ci            assert!(
155cac7dca0Sopenharmony_ci                state::is_finished(cur),
156cac7dca0Sopenharmony_ci                "setting waker should only be failed dur to task completion"
157cac7dca0Sopenharmony_ci            );
158cac7dca0Sopenharmony_ci            return true;
159cac7dca0Sopenharmony_ci        }
160cac7dca0Sopenharmony_ci
161cac7dca0Sopenharmony_ci        false
162cac7dca0Sopenharmony_ci    }
163cac7dca0Sopenharmony_ci}
164cac7dca0Sopenharmony_ci
165cac7dca0Sopenharmony_ci#[cfg(not(feature = "ffrt"))]
166cac7dca0Sopenharmony_ciimpl<T, S> TaskHandle<T, S>
167cac7dca0Sopenharmony_ciwhere
168cac7dca0Sopenharmony_ci    T: Future,
169cac7dca0Sopenharmony_ci    S: Schedule,
170cac7dca0Sopenharmony_ci{
171cac7dca0Sopenharmony_ci    // Runs the task
172cac7dca0Sopenharmony_ci    pub(crate) fn run(self) {
173cac7dca0Sopenharmony_ci        let action = self.header().state.turning_to_running();
174cac7dca0Sopenharmony_ci
175cac7dca0Sopenharmony_ci        match action {
176cac7dca0Sopenharmony_ci            StateAction::Success => {}
177cac7dca0Sopenharmony_ci            StateAction::Canceled(cur) => {
178cac7dca0Sopenharmony_ci                let output = self.get_canceled();
179cac7dca0Sopenharmony_ci                return self.finish(cur, Err(output));
180cac7dca0Sopenharmony_ci            }
181cac7dca0Sopenharmony_ci            StateAction::Failed(state) => panic!("task state invalid: {state}"),
182cac7dca0Sopenharmony_ci            _ => unreachable!(),
183cac7dca0Sopenharmony_ci        };
184cac7dca0Sopenharmony_ci
185cac7dca0Sopenharmony_ci        // turn the task header into a waker
186cac7dca0Sopenharmony_ci        let waker = WakerRefHeader::<'_>::new::<T>(self.header());
187cac7dca0Sopenharmony_ci        let mut context = Context::from_waker(&waker);
188cac7dca0Sopenharmony_ci
189cac7dca0Sopenharmony_ci        let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
190cac7dca0Sopenharmony_ci            self.inner().poll(&mut context).map(Ok)
191cac7dca0Sopenharmony_ci        }));
192cac7dca0Sopenharmony_ci
193cac7dca0Sopenharmony_ci        let cur = self.header().state.get_current_state();
194cac7dca0Sopenharmony_ci        match res {
195cac7dca0Sopenharmony_ci            Ok(Poll::Ready(output)) => {
196cac7dca0Sopenharmony_ci                // send result if the JoinHandle is not dropped
197cac7dca0Sopenharmony_ci                self.finish(cur, output);
198cac7dca0Sopenharmony_ci            }
199cac7dca0Sopenharmony_ci
200cac7dca0Sopenharmony_ci            Ok(Poll::Pending) => match self.header().state.turning_to_idle() {
201cac7dca0Sopenharmony_ci                StateAction::Enqueue => {
202cac7dca0Sopenharmony_ci                    self.get_scheduled(true);
203cac7dca0Sopenharmony_ci                }
204cac7dca0Sopenharmony_ci                StateAction::Failed(state) => panic!("task state invalid: {state}"),
205cac7dca0Sopenharmony_ci                StateAction::Canceled(state) => {
206cac7dca0Sopenharmony_ci                    let output = self.get_canceled();
207cac7dca0Sopenharmony_ci                    self.finish(state, Err(output));
208cac7dca0Sopenharmony_ci                }
209cac7dca0Sopenharmony_ci                _ => {}
210cac7dca0Sopenharmony_ci            },
211cac7dca0Sopenharmony_ci
212cac7dca0Sopenharmony_ci            Err(_) => {
213cac7dca0Sopenharmony_ci                let output = Err(ScheduleError::new(ErrorKind::Panic, "panic happen"));
214cac7dca0Sopenharmony_ci                self.finish(cur, output);
215cac7dca0Sopenharmony_ci            }
216cac7dca0Sopenharmony_ci        }
217cac7dca0Sopenharmony_ci    }
218cac7dca0Sopenharmony_ci
219cac7dca0Sopenharmony_ci    pub(crate) unsafe fn shutdown(self) {
220cac7dca0Sopenharmony_ci        // Check if the JoinHandle gets dropped already. If JoinHandle is still there,
221cac7dca0Sopenharmony_ci        // wakes the JoinHandle.
222cac7dca0Sopenharmony_ci        let cur = self.header().state.dec_ref();
223cac7dca0Sopenharmony_ci        if state::ref_count(cur) > 0 && state::is_care_join_handle(cur) {
224cac7dca0Sopenharmony_ci            self.set_canceled();
225cac7dca0Sopenharmony_ci        } else {
226cac7dca0Sopenharmony_ci            self.release();
227cac7dca0Sopenharmony_ci        }
228cac7dca0Sopenharmony_ci    }
229cac7dca0Sopenharmony_ci
230cac7dca0Sopenharmony_ci    pub(crate) fn wake(self) {
231cac7dca0Sopenharmony_ci        self.wake_by_ref();
232cac7dca0Sopenharmony_ci        self.drop_ref();
233cac7dca0Sopenharmony_ci    }
234cac7dca0Sopenharmony_ci
235cac7dca0Sopenharmony_ci    pub(crate) fn wake_by_ref(&self) {
236cac7dca0Sopenharmony_ci        let prev = self.header().state.turn_to_scheduling();
237cac7dca0Sopenharmony_ci        if state::need_enqueue(prev) {
238cac7dca0Sopenharmony_ci            self.get_scheduled(false);
239cac7dca0Sopenharmony_ci        }
240cac7dca0Sopenharmony_ci    }
241cac7dca0Sopenharmony_ci
242cac7dca0Sopenharmony_ci    // Actually cancels the task during running
243cac7dca0Sopenharmony_ci    fn get_canceled(&self) -> ScheduleError {
244cac7dca0Sopenharmony_ci        self.inner().turning_to_used_data();
245cac7dca0Sopenharmony_ci        ErrorKind::TaskCanceled.into()
246cac7dca0Sopenharmony_ci    }
247cac7dca0Sopenharmony_ci
248cac7dca0Sopenharmony_ci    // Sets task state into canceled and scheduled
249cac7dca0Sopenharmony_ci    pub(crate) fn set_canceled(&self) {
250cac7dca0Sopenharmony_ci        if self.header().state.turn_to_canceled_and_scheduled() {
251cac7dca0Sopenharmony_ci            self.get_scheduled(false);
252cac7dca0Sopenharmony_ci        }
253cac7dca0Sopenharmony_ci    }
254cac7dca0Sopenharmony_ci
255cac7dca0Sopenharmony_ci    fn to_task(&self) -> Task {
256cac7dca0Sopenharmony_ci        unsafe { Task::from_raw(self.header().into()) }
257cac7dca0Sopenharmony_ci    }
258cac7dca0Sopenharmony_ci
259cac7dca0Sopenharmony_ci    fn get_scheduled(&self, lifo: bool) {
260cac7dca0Sopenharmony_ci        // the scheduler must exist when calling this method
261cac7dca0Sopenharmony_ci        self.inner()
262cac7dca0Sopenharmony_ci            .scheduler
263cac7dca0Sopenharmony_ci            .upgrade()
264cac7dca0Sopenharmony_ci            .expect("the scheduler has already been dropped")
265cac7dca0Sopenharmony_ci            .schedule(self.to_task(), lifo);
266cac7dca0Sopenharmony_ci    }
267cac7dca0Sopenharmony_ci}
268cac7dca0Sopenharmony_ci
269cac7dca0Sopenharmony_ci#[cfg(feature = "ffrt")]
270cac7dca0Sopenharmony_ciimpl<T, S> TaskHandle<T, S>
271cac7dca0Sopenharmony_ciwhere
272cac7dca0Sopenharmony_ci    T: Future,
273cac7dca0Sopenharmony_ci    S: Schedule,
274cac7dca0Sopenharmony_ci{
275cac7dca0Sopenharmony_ci    pub(crate) fn ffrt_run(self) -> bool {
276cac7dca0Sopenharmony_ci        self.inner().get_task_ctx();
277cac7dca0Sopenharmony_ci
278cac7dca0Sopenharmony_ci        match self.header().state.turning_to_running() {
279cac7dca0Sopenharmony_ci            StateAction::Failed(state) => panic!("turning to running failed: {:b}", state),
280cac7dca0Sopenharmony_ci            StateAction::Canceled(cur) => {
281cac7dca0Sopenharmony_ci                let output = self.ffrt_get_canceled();
282cac7dca0Sopenharmony_ci                self.finish(cur, Err(output));
283cac7dca0Sopenharmony_ci                return true;
284cac7dca0Sopenharmony_ci            }
285cac7dca0Sopenharmony_ci            _ => {}
286cac7dca0Sopenharmony_ci        }
287cac7dca0Sopenharmony_ci
288cac7dca0Sopenharmony_ci        let waker = WakerRefHeader::<'_>::new::<T>(self.header());
289cac7dca0Sopenharmony_ci        let mut context = Context::from_waker(&waker);
290cac7dca0Sopenharmony_ci
291cac7dca0Sopenharmony_ci        let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
292cac7dca0Sopenharmony_ci            self.inner().poll(&mut context).map(Ok)
293cac7dca0Sopenharmony_ci        }));
294cac7dca0Sopenharmony_ci
295cac7dca0Sopenharmony_ci        let cur = self.header().state.get_current_state();
296cac7dca0Sopenharmony_ci        match res {
297cac7dca0Sopenharmony_ci            Ok(Poll::Ready(output)) => {
298cac7dca0Sopenharmony_ci                // send result if the JoinHandle is not dropped
299cac7dca0Sopenharmony_ci                self.finish(cur, output);
300cac7dca0Sopenharmony_ci                true
301cac7dca0Sopenharmony_ci            }
302cac7dca0Sopenharmony_ci
303cac7dca0Sopenharmony_ci            Ok(Poll::Pending) => match self.header().state.turning_to_idle() {
304cac7dca0Sopenharmony_ci                StateAction::Enqueue => {
305cac7dca0Sopenharmony_ci                    let ffrt_task = unsafe { (*self.inner().task.get()).as_ref().unwrap() };
306cac7dca0Sopenharmony_ci                    ffrt_task.wake_task();
307cac7dca0Sopenharmony_ci                    false
308cac7dca0Sopenharmony_ci                }
309cac7dca0Sopenharmony_ci                StateAction::Failed(state) => panic!("task state invalid: {:b}", state),
310cac7dca0Sopenharmony_ci                StateAction::Canceled(state) => {
311cac7dca0Sopenharmony_ci                    let output = self.ffrt_get_canceled();
312cac7dca0Sopenharmony_ci                    self.finish(state, Err(output));
313cac7dca0Sopenharmony_ci                    true
314cac7dca0Sopenharmony_ci                }
315cac7dca0Sopenharmony_ci                _ => false,
316cac7dca0Sopenharmony_ci            },
317cac7dca0Sopenharmony_ci
318cac7dca0Sopenharmony_ci            Err(_) => {
319cac7dca0Sopenharmony_ci                let output = Err(ScheduleError::new(ErrorKind::Panic, "panic happen"));
320cac7dca0Sopenharmony_ci                self.finish(cur, output);
321cac7dca0Sopenharmony_ci                true
322cac7dca0Sopenharmony_ci            }
323cac7dca0Sopenharmony_ci        }
324cac7dca0Sopenharmony_ci    }
325cac7dca0Sopenharmony_ci
326cac7dca0Sopenharmony_ci    pub(crate) fn ffrt_wake(self) {
327cac7dca0Sopenharmony_ci        self.ffrt_wake_by_ref();
328cac7dca0Sopenharmony_ci        self.drop_ref();
329cac7dca0Sopenharmony_ci    }
330cac7dca0Sopenharmony_ci
331cac7dca0Sopenharmony_ci    pub(crate) fn ffrt_wake_by_ref(&self) {
332cac7dca0Sopenharmony_ci        let prev = self.header().state.turn_to_scheduling();
333cac7dca0Sopenharmony_ci        if state::need_enqueue(prev) {
334cac7dca0Sopenharmony_ci            let ffrt_task = unsafe { (*self.inner().task.get()).as_ref().unwrap() };
335cac7dca0Sopenharmony_ci            ffrt_task.wake_task();
336cac7dca0Sopenharmony_ci        }
337cac7dca0Sopenharmony_ci    }
338cac7dca0Sopenharmony_ci
339cac7dca0Sopenharmony_ci    // Actually cancels the task during running
340cac7dca0Sopenharmony_ci    fn ffrt_get_canceled(&self) -> ScheduleError {
341cac7dca0Sopenharmony_ci        self.inner().turning_to_used_data();
342cac7dca0Sopenharmony_ci        ErrorKind::TaskCanceled.into()
343cac7dca0Sopenharmony_ci    }
344cac7dca0Sopenharmony_ci
345cac7dca0Sopenharmony_ci    // Sets task state into canceled and scheduled
346cac7dca0Sopenharmony_ci    pub(crate) fn ffrt_set_canceled(&self) {
347cac7dca0Sopenharmony_ci        if self.header().state.turn_to_canceled_and_scheduled() {
348cac7dca0Sopenharmony_ci            let ffrt_task = unsafe { (*self.inner().task.get()).as_ref().unwrap() };
349cac7dca0Sopenharmony_ci            ffrt_task.wake_task();
350cac7dca0Sopenharmony_ci        }
351cac7dca0Sopenharmony_ci    }
352cac7dca0Sopenharmony_ci}
353