1/* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16#include "wait_queue.h" 17#include "sched/execute_ctx.h" 18#include "eu/co_routine.h" 19#include "dfx/log/ffrt_log_api.h" 20#include "ffrt_trace.h" 21#include "sync/mutex_private.h" 22#include "tm/cpu_task.h" 23 24namespace ffrt { 25TaskWithNode::TaskWithNode() 26{ 27 auto ctx = ExecuteCtx::Cur(); 28 task = ctx->task; 29} 30 31void WaitQueue::ThreadWait(WaitUntilEntry* wn, mutexPrivate* lk, bool legacyMode, CPUEUTask* task) 32{ 33 wqlock.lock(); 34 if (legacyMode) { 35 task->blockType = BlockType::BLOCK_THREAD; 36 wn->task = task; 37 } 38 push_back(wn); 39 wqlock.unlock(); 40 { 41 std::unique_lock<std::mutex> nl(wn->wl); 42 lk->unlock(); 43 wn->cv.wait(nl); 44 } 45 lk->lock(); 46} 47 48bool WaitQueue::ThreadWaitUntil(WaitUntilEntry* wn, mutexPrivate* lk, 49 const TimePoint& tp, bool legacyMode, CPUEUTask* task) 50{ 51 bool ret = false; 52 wqlock.lock(); 53 wn->status.store(we_status::INIT, std::memory_order_release); 54 if (legacyMode) { 55 task->blockType = BlockType::BLOCK_THREAD; 56 wn->task = task; 57 } 58 push_back(wn); 59 wqlock.unlock(); 60 { 61 std::unique_lock<std::mutex> nl(wn->wl); 62 lk->unlock(); 63 if (wn->cv.wait_until(nl, tp) == std::cv_status::timeout) { 64 ret = true; 65 } 66 } 67 68 // notify scenarios wn is already pooped 69 // in addition, condition variables may be spurious woken up 70 // in this case, wn needs to be removed from the linked list 71 if (ret || wn->status.load(std::memory_order_acquire) != we_status::NOTIFING) { 72 wqlock.lock(); 73 remove(wn); 74 wqlock.unlock(); 75 } 76 lk->lock(); 77 return ret; 78} 79 80void WaitQueue::SuspendAndWait(mutexPrivate* lk) 81{ 82 ExecuteCtx* ctx = ExecuteCtx::Cur(); 83 CPUEUTask* task = ctx->task; 84 if (ThreadWaitMode(task)) { 85 ThreadWait(&ctx->wn, lk, LegacyMode(task), task); 86 return; 87 } 88 task->wue = new WaitUntilEntry(task); 89 FFRT_BLOCK_TRACER(task->gid, cnd); 90 CoWait([&](CPUEUTask* task) -> bool { 91 wqlock.lock(); 92 push_back(task->wue); 93 lk->unlock(); // Unlock needs to be in wqlock protection, guaranteed to be executed before lk.lock after CoWake 94 wqlock.unlock(); 95 // The ownership of the task belongs to WaitQueue list, and the task cannot be accessed any more. 96 return true; 97 }); 98 delete task->wue; 99 task->wue = nullptr; 100 lk->lock(); 101} 102 103bool WeTimeoutProc(WaitQueue* wq, WaitUntilEntry* wue) 104{ 105 bool toWake = true; 106 107 // two kinds: 1) notify was not called, timeout grabbed the lock first; 108 if (wue->status.load(std::memory_order_acquire) == we_status::INIT) { 109 // timeout processes wue first, cv will not be processed again. timeout is responsible for destroying wue. 110 wq->remove(wue); 111 delete wue; 112 wue = nullptr; 113 } else { 114 // 2) notify enters the critical section, first writes the notify status, and then releases the lock 115 // notify is responsible for destroying wue. 116 wue->status.store(we_status::TIMEOUT_DONE, std::memory_order_release); 117 toWake = false; 118 } 119 return toWake; 120} 121 122bool WaitQueue::SuspendAndWaitUntil(mutexPrivate* lk, const TimePoint& tp) noexcept 123{ 124 bool ret = false; 125 ExecuteCtx* ctx = ExecuteCtx::Cur(); 126 CPUEUTask* task = ctx->task; 127 if (ThreadWaitMode(task)) { 128 return ThreadWaitUntil(&ctx->wn, lk, tp, LegacyMode(task), task); 129 } 130 task->wue = new WaitUntilEntry(task); 131 task->wue->hasWaitTime = true; 132 task->wue->tp = tp; 133 task->wue->cb = ([&](WaitEntry* we) { 134 WaitUntilEntry* wue = static_cast<WaitUntilEntry*>(we); 135 ffrt::CPUEUTask* task = wue->task; 136 wqlock.lock(); 137 if (!WeTimeoutProc(this, wue)) { 138 wqlock.unlock(); 139 return; 140 } 141 wqlock.unlock(); 142 FFRT_LOGD("task(%d) time is up", task->gid); 143 CoRoutineFactory::CoWakeFunc(task, true); 144 }); 145 FFRT_BLOCK_TRACER(task->gid, cnt); 146 CoWait([&](CPUEUTask* task) -> bool { 147 WaitUntilEntry* we = task->wue; 148 wqlock.lock(); 149 push_back(we); 150 lk->unlock(); // Unlock needs to be in wqlock protection, guaranteed to be executed before lk.lock after CoWake 151 if (DelayedWakeup(we->tp, we, we->cb)) { 152 wqlock.unlock(); 153 // The ownership of the task belongs to WaitQueue list, and the task cannot be accessed any more. 154 return true; 155 } else { 156 if (!WeTimeoutProc(this, we)) { 157 wqlock.unlock(); 158 // The ownership of the task belongs to WaitQueue list, and the task cannot be accessed any more. 159 return true; 160 } 161 task->wakeupTimeOut = true; 162 wqlock.unlock(); 163 // The ownership of the task belongs to WaitQueue list, and the task cannot be accessed any more. 164 return false; 165 } 166 }); 167 ret = task->wakeupTimeOut; 168 task->wue = nullptr; 169 task->wakeupTimeOut = false; 170 lk->lock(); 171 return ret; 172} 173 174bool WaitQueue::WeNotifyProc(WaitUntilEntry* we) 175{ 176 if (!we->hasWaitTime) { 177 // For wait task without timeout, we will be deleted after the wait task wakes up. 178 return true; 179 } 180 181 WaitEntry* dwe = static_cast<WaitEntry*>(we); 182 if (!DelayedRemove(we->tp, dwe)) { 183 // Deletion of timer failed during the notify process, indicating that timer cb has been executed at this time 184 // waiting for cb execution to complete, and marking notify as being processed. 185 we->status.store(we_status::NOTIFING, std::memory_order_release); 186 wqlock.unlock(); 187 while (we->status.load(std::memory_order_acquire) != we_status::TIMEOUT_DONE) { 188 } 189 wqlock.lock(); 190 } 191 192 delete we; 193 return true; 194} 195 196void WaitQueue::Notify(bool one) noexcept 197{ 198 // the caller should assure the WaitQueue life time. 199 // this function should assure the WaitQueue do not be access after the wqlock is empty(), 200 // that mean the last wait thread/co may destory the WaitQueue. 201 // all the break out should assure the wqlock is in unlock state. 202 // the continue should assure the wqlock is in lock state. 203 wqlock.lock(); 204 for (; ;) { 205 if (empty()) { 206 wqlock.unlock(); 207 break; 208 } 209 WaitUntilEntry* we = pop_front(); 210 if (we == nullptr) { 211 wqlock.unlock(); 212 break; 213 } 214 bool isEmpty = empty(); 215 CPUEUTask* task = we->task; 216 if (ThreadNotifyMode(task) || we->weType == 2) { 217 std::unique_lock<std::mutex> lk(we->wl); 218 we->status.store(we_status::NOTIFING, std::memory_order_release); 219 if (BlockThread(task)) { 220 task->blockType = BlockType::BLOCK_COROUTINE; 221 we->task = nullptr; 222 } 223 wqlock.unlock(); 224 we->cv.notify_one(); 225 } else { 226 if (!WeNotifyProc(we)) { 227 continue; 228 } 229 wqlock.unlock(); 230 CoRoutineFactory::CoWakeFunc(task, false); 231 } 232 if (isEmpty || one) { 233 break; 234 } 235 wqlock.lock(); 236 } 237} 238 239} // namespace ffrt 240