1/*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include "wait_queue.h"
17#include "sched/execute_ctx.h"
18#include "eu/co_routine.h"
19#include "dfx/log/ffrt_log_api.h"
20#include "ffrt_trace.h"
21#include "sync/mutex_private.h"
22#include "tm/cpu_task.h"
23
24namespace ffrt {
25TaskWithNode::TaskWithNode()
26{
27    auto ctx = ExecuteCtx::Cur();
28    task = ctx->task;
29}
30
31void WaitQueue::ThreadWait(WaitUntilEntry* wn, mutexPrivate* lk, bool legacyMode, CPUEUTask* task)
32{
33    wqlock.lock();
34    if (legacyMode) {
35        task->blockType = BlockType::BLOCK_THREAD;
36        wn->task = task;
37    }
38    push_back(wn);
39    wqlock.unlock();
40    {
41        std::unique_lock<std::mutex> nl(wn->wl);
42        lk->unlock();
43        wn->cv.wait(nl);
44    }
45    lk->lock();
46}
47
48bool WaitQueue::ThreadWaitUntil(WaitUntilEntry* wn, mutexPrivate* lk,
49    const TimePoint& tp, bool legacyMode, CPUEUTask* task)
50{
51    bool ret = false;
52    wqlock.lock();
53    wn->status.store(we_status::INIT, std::memory_order_release);
54    if (legacyMode) {
55        task->blockType = BlockType::BLOCK_THREAD;
56        wn->task = task;
57    }
58    push_back(wn);
59    wqlock.unlock();
60    {
61        std::unique_lock<std::mutex> nl(wn->wl);
62        lk->unlock();
63        if (wn->cv.wait_until(nl, tp) == std::cv_status::timeout) {
64            ret = true;
65        }
66    }
67
68    // notify scenarios wn is already pooped
69    // in addition, condition variables may be spurious woken up
70    // in this case, wn needs to be removed from the linked list
71    if (ret || wn->status.load(std::memory_order_acquire) != we_status::NOTIFING) {
72        wqlock.lock();
73        remove(wn);
74        wqlock.unlock();
75    }
76    lk->lock();
77    return ret;
78}
79
80void WaitQueue::SuspendAndWait(mutexPrivate* lk)
81{
82    ExecuteCtx* ctx = ExecuteCtx::Cur();
83    CPUEUTask* task = ctx->task;
84    if (ThreadWaitMode(task)) {
85        ThreadWait(&ctx->wn, lk, LegacyMode(task), task);
86        return;
87    }
88    task->wue = new WaitUntilEntry(task);
89    FFRT_BLOCK_TRACER(task->gid, cnd);
90    CoWait([&](CPUEUTask* task) -> bool {
91        wqlock.lock();
92        push_back(task->wue);
93        lk->unlock(); // Unlock needs to be in wqlock protection, guaranteed to be executed before lk.lock after CoWake
94        wqlock.unlock();
95        // The ownership of the task belongs to WaitQueue list, and the task cannot be accessed any more.
96        return true;
97    });
98    delete task->wue;
99    task->wue = nullptr;
100    lk->lock();
101}
102
103bool WeTimeoutProc(WaitQueue* wq, WaitUntilEntry* wue)
104{
105    bool toWake = true;
106
107    // two kinds: 1) notify was not called, timeout grabbed the lock first;
108    if (wue->status.load(std::memory_order_acquire) == we_status::INIT) {
109        // timeout processes wue first, cv will not be processed again. timeout is responsible for destroying wue.
110        wq->remove(wue);
111        delete wue;
112        wue = nullptr;
113    } else {
114        // 2) notify enters the critical section, first writes the notify status, and then releases the lock
115        // notify is responsible for destroying wue.
116        wue->status.store(we_status::TIMEOUT_DONE, std::memory_order_release);
117        toWake = false;
118    }
119    return toWake;
120}
121
122bool WaitQueue::SuspendAndWaitUntil(mutexPrivate* lk, const TimePoint& tp) noexcept
123{
124    bool ret = false;
125    ExecuteCtx* ctx = ExecuteCtx::Cur();
126    CPUEUTask* task = ctx->task;
127    if (ThreadWaitMode(task)) {
128        return ThreadWaitUntil(&ctx->wn, lk, tp, LegacyMode(task), task);
129    }
130    task->wue = new WaitUntilEntry(task);
131    task->wue->hasWaitTime = true;
132    task->wue->tp = tp;
133    task->wue->cb = ([&](WaitEntry* we) {
134        WaitUntilEntry* wue = static_cast<WaitUntilEntry*>(we);
135        ffrt::CPUEUTask* task = wue->task;
136        wqlock.lock();
137        if (!WeTimeoutProc(this, wue)) {
138            wqlock.unlock();
139            return;
140        }
141        wqlock.unlock();
142        FFRT_LOGD("task(%d) time is up", task->gid);
143        CoRoutineFactory::CoWakeFunc(task, true);
144    });
145    FFRT_BLOCK_TRACER(task->gid, cnt);
146    CoWait([&](CPUEUTask* task) -> bool {
147        WaitUntilEntry* we = task->wue;
148        wqlock.lock();
149        push_back(we);
150        lk->unlock(); // Unlock needs to be in wqlock protection, guaranteed to be executed before lk.lock after CoWake
151        if (DelayedWakeup(we->tp, we, we->cb)) {
152            wqlock.unlock();
153            // The ownership of the task belongs to WaitQueue list, and the task cannot be accessed any more.
154            return true;
155        } else {
156            if (!WeTimeoutProc(this, we)) {
157                wqlock.unlock();
158                // The ownership of the task belongs to WaitQueue list, and the task cannot be accessed any more.
159                return true;
160            }
161            task->wakeupTimeOut = true;
162            wqlock.unlock();
163            // The ownership of the task belongs to WaitQueue list, and the task cannot be accessed any more.
164            return false;
165        }
166    });
167    ret = task->wakeupTimeOut;
168    task->wue = nullptr;
169    task->wakeupTimeOut = false;
170    lk->lock();
171    return ret;
172}
173
174bool WaitQueue::WeNotifyProc(WaitUntilEntry* we)
175{
176    if (!we->hasWaitTime) {
177        // For wait task without timeout, we will be deleted after the wait task wakes up.
178        return true;
179    }
180
181    WaitEntry* dwe = static_cast<WaitEntry*>(we);
182    if (!DelayedRemove(we->tp, dwe)) {
183        // Deletion of timer failed during the notify process, indicating that timer cb has been executed at this time
184        // waiting for cb execution to complete, and marking notify as being processed.
185        we->status.store(we_status::NOTIFING, std::memory_order_release);
186        wqlock.unlock();
187        while (we->status.load(std::memory_order_acquire) != we_status::TIMEOUT_DONE) {
188        }
189        wqlock.lock();
190    }
191
192    delete we;
193    return true;
194}
195
196void WaitQueue::Notify(bool one) noexcept
197{
198    // the caller should assure the WaitQueue life time.
199    // this function should assure the WaitQueue do not be access after the wqlock is empty(),
200    // that mean the last wait thread/co may destory the WaitQueue.
201    // all the break out should assure the wqlock is in unlock state.
202    // the continue should assure the wqlock is in lock state.
203    wqlock.lock();
204    for (; ;) {
205        if (empty()) {
206            wqlock.unlock();
207            break;
208        }
209        WaitUntilEntry* we = pop_front();
210        if (we == nullptr) {
211            wqlock.unlock();
212            break;
213        }
214        bool isEmpty = empty();
215        CPUEUTask* task = we->task;
216        if (ThreadNotifyMode(task) || we->weType == 2) {
217            std::unique_lock<std::mutex> lk(we->wl);
218            we->status.store(we_status::NOTIFING, std::memory_order_release);
219            if (BlockThread(task)) {
220                task->blockType = BlockType::BLOCK_COROUTINE;
221                we->task = nullptr;
222            }
223            wqlock.unlock();
224            we->cv.notify_one();
225        } else {
226            if (!WeNotifyProc(we)) {
227                continue;
228            }
229            wqlock.unlock();
230            CoRoutineFactory::CoWakeFunc(task, false);
231        }
232        if (isEmpty || one) {
233            break;
234        }
235        wqlock.lock();
236    }
237}
238
239} // namespace ffrt
240