1484543d1Sopenharmony_ci/* 2484543d1Sopenharmony_ci * Copyright (c) 2023 Huawei Device Co., Ltd. 3484543d1Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 4484543d1Sopenharmony_ci * you may not use this file except in compliance with the License. 5484543d1Sopenharmony_ci * You may obtain a copy of the License at 6484543d1Sopenharmony_ci * 7484543d1Sopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 8484543d1Sopenharmony_ci * 9484543d1Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software 10484543d1Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 11484543d1Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12484543d1Sopenharmony_ci * See the License for the specific language governing permissions and 13484543d1Sopenharmony_ci * limitations under the License. 14484543d1Sopenharmony_ci */ 15484543d1Sopenharmony_ci 16484543d1Sopenharmony_ci#include "concurrent_queue.h" 17484543d1Sopenharmony_ci#include <climits> 18484543d1Sopenharmony_ci#include "dfx/log/ffrt_log_api.h" 19484543d1Sopenharmony_ci#include "tm/queue_task.h" 20484543d1Sopenharmony_ci#include "eu/loop.h" 21484543d1Sopenharmony_ci 22484543d1Sopenharmony_cinamespace ffrt { 23484543d1Sopenharmony_cistatic void DelayTaskCb(void* task) 24484543d1Sopenharmony_ci{ 25484543d1Sopenharmony_ci static_cast<QueueTask*>(task)->Execute(); 26484543d1Sopenharmony_ci} 27484543d1Sopenharmony_ci 28484543d1Sopenharmony_ciConcurrentQueue::~ConcurrentQueue() 29484543d1Sopenharmony_ci{ 30484543d1Sopenharmony_ci FFRT_LOGI("destruct concurrent queueId=%u leave", queueId_); 31484543d1Sopenharmony_ci} 32484543d1Sopenharmony_ci 33484543d1Sopenharmony_ciint ConcurrentQueue::Push(QueueTask* task) 34484543d1Sopenharmony_ci{ 35484543d1Sopenharmony_ci std::unique_lock lock(mutex_); 36484543d1Sopenharmony_ci FFRT_COND_DO_ERR(isExit_, return FAILED, "cannot push task, [queueId=%u] is exiting", queueId_); 37484543d1Sopenharmony_ci if (task->GetPriority() > ffrt_queue_priority_idle) { 38484543d1Sopenharmony_ci task->SetPriority(ffrt_queue_priority_low); 39484543d1Sopenharmony_ci } 40484543d1Sopenharmony_ci 41484543d1Sopenharmony_ci if (loop_ != nullptr) { 42484543d1Sopenharmony_ci if (task->GetDelay() == 0) { 43484543d1Sopenharmony_ci whenMap_.insert({task->GetUptime(), task}); 44484543d1Sopenharmony_ci loop_->WakeUp(); 45484543d1Sopenharmony_ci return SUCC; 46484543d1Sopenharmony_ci } 47484543d1Sopenharmony_ci return PushDelayTaskToTimer(task); 48484543d1Sopenharmony_ci } 49484543d1Sopenharmony_ci FFRT_COND_DO_ERR(IsOnLoop(), return FAILED, "cannot push task, [queueId=%u] loop empty", queueId_); 50484543d1Sopenharmony_ci 51484543d1Sopenharmony_ci if (concurrency_.load() < maxConcurrency_) { 52484543d1Sopenharmony_ci int oldValue = concurrency_.fetch_add(1); 53484543d1Sopenharmony_ci FFRT_LOGD("task [gid=%llu] concurrency[%u] + 1 [queueId=%u]", task->gid, oldValue, queueId_); 54484543d1Sopenharmony_ci 55484543d1Sopenharmony_ci if (task->GetDelay() > 0) { 56484543d1Sopenharmony_ci whenMap_.insert({task->GetUptime(), task}); 57484543d1Sopenharmony_ci } 58484543d1Sopenharmony_ci 59484543d1Sopenharmony_ci return CONCURRENT; 60484543d1Sopenharmony_ci } 61484543d1Sopenharmony_ci 62484543d1Sopenharmony_ci whenMap_.insert({task->GetUptime(), task}); 63484543d1Sopenharmony_ci if (task == whenMap_.begin()->second) { 64484543d1Sopenharmony_ci cond_.NotifyAll(); 65484543d1Sopenharmony_ci } 66484543d1Sopenharmony_ci 67484543d1Sopenharmony_ci return SUCC; 68484543d1Sopenharmony_ci} 69484543d1Sopenharmony_ci 70484543d1Sopenharmony_ciQueueTask* ConcurrentQueue::Pull() 71484543d1Sopenharmony_ci{ 72484543d1Sopenharmony_ci std::unique_lock lock(mutex_); 73484543d1Sopenharmony_ci // wait for delay task 74484543d1Sopenharmony_ci uint64_t now = GetNow(); 75484543d1Sopenharmony_ci if (loop_ != nullptr) { 76484543d1Sopenharmony_ci if (!whenMap_.empty() && now >= whenMap_.begin()->first && !isExit_) { 77484543d1Sopenharmony_ci return dequeFunc_(queueId_, now, whenMap_, nullptr); 78484543d1Sopenharmony_ci } 79484543d1Sopenharmony_ci return nullptr; 80484543d1Sopenharmony_ci } 81484543d1Sopenharmony_ci 82484543d1Sopenharmony_ci while (!whenMap_.empty() && now < whenMap_.begin()->first && !isExit_) { 83484543d1Sopenharmony_ci uint64_t diff = whenMap_.begin()->first - now; 84484543d1Sopenharmony_ci FFRT_LOGD("[queueId=%u] stuck in %llu us wait", queueId_, diff); 85484543d1Sopenharmony_ci cond_.WaitFor(lock, std::chrono::microseconds(diff)); 86484543d1Sopenharmony_ci FFRT_LOGD("[queueId=%u] wakeup from wait", queueId_); 87484543d1Sopenharmony_ci now = GetNow(); 88484543d1Sopenharmony_ci } 89484543d1Sopenharmony_ci 90484543d1Sopenharmony_ci // abort dequeue in abnormal scenarios 91484543d1Sopenharmony_ci if (whenMap_.empty()) { 92484543d1Sopenharmony_ci int oldValue = concurrency_.fetch_sub(1); // 取不到后继的task,当前这个task正式退出 93484543d1Sopenharmony_ci FFRT_LOGD("concurrency[%d] - 1 [queueId=%u] switch into inactive", oldValue, queueId_); 94484543d1Sopenharmony_ci return nullptr; 95484543d1Sopenharmony_ci } 96484543d1Sopenharmony_ci FFRT_COND_DO_ERR(isExit_, return nullptr, "cannot pull task, [queueId=%u] is exiting", queueId_); 97484543d1Sopenharmony_ci 98484543d1Sopenharmony_ci // dequeue next expired task by priority 99484543d1Sopenharmony_ci return dequeFunc_(queueId_, now, whenMap_, nullptr); 100484543d1Sopenharmony_ci} 101484543d1Sopenharmony_ci 102484543d1Sopenharmony_civoid ConcurrentQueue::Stop() 103484543d1Sopenharmony_ci{ 104484543d1Sopenharmony_ci std::unique_lock lock(mutex_); 105484543d1Sopenharmony_ci isExit_ = true; 106484543d1Sopenharmony_ci 107484543d1Sopenharmony_ci for (auto it = whenMap_.begin(); it != whenMap_.end(); it++) { 108484543d1Sopenharmony_ci if (it->second) { 109484543d1Sopenharmony_ci it->second->Notify(); 110484543d1Sopenharmony_ci it->second->Destroy(); 111484543d1Sopenharmony_ci } 112484543d1Sopenharmony_ci } 113484543d1Sopenharmony_ci whenMap_.clear(); 114484543d1Sopenharmony_ci if (loop_ == nullptr) { 115484543d1Sopenharmony_ci cond_.NotifyAll(); 116484543d1Sopenharmony_ci } 117484543d1Sopenharmony_ci 118484543d1Sopenharmony_ci FFRT_LOGI("clear [queueId=%u] succ", queueId_); 119484543d1Sopenharmony_ci} 120484543d1Sopenharmony_ci 121484543d1Sopenharmony_cibool ConcurrentQueue::SetLoop(Loop* loop) 122484543d1Sopenharmony_ci{ 123484543d1Sopenharmony_ci if (loop == nullptr || loop_ != nullptr) { 124484543d1Sopenharmony_ci FFRT_LOGE("queueId %s should bind to loop invalid", queueId_); 125484543d1Sopenharmony_ci return false; 126484543d1Sopenharmony_ci } 127484543d1Sopenharmony_ci 128484543d1Sopenharmony_ci loop_ = loop; 129484543d1Sopenharmony_ci isOnLoop_.store(true); 130484543d1Sopenharmony_ci return true; 131484543d1Sopenharmony_ci} 132484543d1Sopenharmony_ci 133484543d1Sopenharmony_ciint ConcurrentQueue::PushDelayTaskToTimer(QueueTask* task) 134484543d1Sopenharmony_ci{ 135484543d1Sopenharmony_ci uint64_t delayMs = (task->GetDelay() - 1) / 1000 + 1; 136484543d1Sopenharmony_ci int timeout = delayMs > INT_MAX ? INT_MAX : delayMs; 137484543d1Sopenharmony_ci if (loop_->TimerStart(timeout, task, DelayTaskCb, false) < 0) { 138484543d1Sopenharmony_ci FFRT_LOGE("push delay queue task to timer fail"); 139484543d1Sopenharmony_ci return FAILED; 140484543d1Sopenharmony_ci } 141484543d1Sopenharmony_ci return SUCC; 142484543d1Sopenharmony_ci} 143484543d1Sopenharmony_ci 144484543d1Sopenharmony_cistd::unique_ptr<BaseQueue> CreateConcurrentQueue(const ffrt_queue_attr_t* attr) 145484543d1Sopenharmony_ci{ 146484543d1Sopenharmony_ci int maxConcurrency = ffrt_queue_attr_get_max_concurrency(attr) <= 0 ? 1 : ffrt_queue_attr_get_max_concurrency(attr); 147484543d1Sopenharmony_ci return std::make_unique<ConcurrentQueue>(maxConcurrency); 148484543d1Sopenharmony_ci} 149484543d1Sopenharmony_ci} // namespace ffrt 150