1484543d1Sopenharmony_ci/*
2484543d1Sopenharmony_ci * Copyright (c) 2023 Huawei Device Co., Ltd.
3484543d1Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
4484543d1Sopenharmony_ci * you may not use this file except in compliance with the License.
5484543d1Sopenharmony_ci * You may obtain a copy of the License at
6484543d1Sopenharmony_ci *
7484543d1Sopenharmony_ci *     http://www.apache.org/licenses/LICENSE-2.0
8484543d1Sopenharmony_ci *
9484543d1Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software
10484543d1Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
11484543d1Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12484543d1Sopenharmony_ci * See the License for the specific language governing permissions and
13484543d1Sopenharmony_ci * limitations under the License.
14484543d1Sopenharmony_ci */
15484543d1Sopenharmony_ci
16484543d1Sopenharmony_ci#include "concurrent_queue.h"
17484543d1Sopenharmony_ci#include <climits>
18484543d1Sopenharmony_ci#include "dfx/log/ffrt_log_api.h"
19484543d1Sopenharmony_ci#include "tm/queue_task.h"
20484543d1Sopenharmony_ci#include "eu/loop.h"
21484543d1Sopenharmony_ci
22484543d1Sopenharmony_cinamespace ffrt {
23484543d1Sopenharmony_cistatic void DelayTaskCb(void* task)
24484543d1Sopenharmony_ci{
25484543d1Sopenharmony_ci    static_cast<QueueTask*>(task)->Execute();
26484543d1Sopenharmony_ci}
27484543d1Sopenharmony_ci
28484543d1Sopenharmony_ciConcurrentQueue::~ConcurrentQueue()
29484543d1Sopenharmony_ci{
30484543d1Sopenharmony_ci    FFRT_LOGI("destruct concurrent queueId=%u leave", queueId_);
31484543d1Sopenharmony_ci}
32484543d1Sopenharmony_ci
33484543d1Sopenharmony_ciint ConcurrentQueue::Push(QueueTask* task)
34484543d1Sopenharmony_ci{
35484543d1Sopenharmony_ci    std::unique_lock lock(mutex_);
36484543d1Sopenharmony_ci    FFRT_COND_DO_ERR(isExit_, return FAILED, "cannot push task, [queueId=%u] is exiting", queueId_);
37484543d1Sopenharmony_ci    if (task->GetPriority() > ffrt_queue_priority_idle) {
38484543d1Sopenharmony_ci        task->SetPriority(ffrt_queue_priority_low);
39484543d1Sopenharmony_ci    }
40484543d1Sopenharmony_ci
41484543d1Sopenharmony_ci    if (loop_ != nullptr) {
42484543d1Sopenharmony_ci        if (task->GetDelay() == 0) {
43484543d1Sopenharmony_ci            whenMap_.insert({task->GetUptime(), task});
44484543d1Sopenharmony_ci            loop_->WakeUp();
45484543d1Sopenharmony_ci            return SUCC;
46484543d1Sopenharmony_ci        }
47484543d1Sopenharmony_ci        return PushDelayTaskToTimer(task);
48484543d1Sopenharmony_ci    }
49484543d1Sopenharmony_ci    FFRT_COND_DO_ERR(IsOnLoop(), return FAILED, "cannot push task, [queueId=%u] loop empty", queueId_);
50484543d1Sopenharmony_ci
51484543d1Sopenharmony_ci    if (concurrency_.load() < maxConcurrency_) {
52484543d1Sopenharmony_ci        int oldValue = concurrency_.fetch_add(1);
53484543d1Sopenharmony_ci        FFRT_LOGD("task [gid=%llu] concurrency[%u] + 1 [queueId=%u]", task->gid, oldValue, queueId_);
54484543d1Sopenharmony_ci
55484543d1Sopenharmony_ci        if (task->GetDelay() > 0) {
56484543d1Sopenharmony_ci            whenMap_.insert({task->GetUptime(), task});
57484543d1Sopenharmony_ci        }
58484543d1Sopenharmony_ci
59484543d1Sopenharmony_ci        return CONCURRENT;
60484543d1Sopenharmony_ci    }
61484543d1Sopenharmony_ci
62484543d1Sopenharmony_ci    whenMap_.insert({task->GetUptime(), task});
63484543d1Sopenharmony_ci    if (task == whenMap_.begin()->second) {
64484543d1Sopenharmony_ci        cond_.NotifyAll();
65484543d1Sopenharmony_ci    }
66484543d1Sopenharmony_ci
67484543d1Sopenharmony_ci    return SUCC;
68484543d1Sopenharmony_ci}
69484543d1Sopenharmony_ci
70484543d1Sopenharmony_ciQueueTask* ConcurrentQueue::Pull()
71484543d1Sopenharmony_ci{
72484543d1Sopenharmony_ci    std::unique_lock lock(mutex_);
73484543d1Sopenharmony_ci    // wait for delay task
74484543d1Sopenharmony_ci    uint64_t now = GetNow();
75484543d1Sopenharmony_ci    if (loop_ != nullptr) {
76484543d1Sopenharmony_ci        if (!whenMap_.empty() && now >= whenMap_.begin()->first && !isExit_) {
77484543d1Sopenharmony_ci            return dequeFunc_(queueId_, now, whenMap_, nullptr);
78484543d1Sopenharmony_ci        }
79484543d1Sopenharmony_ci        return nullptr;
80484543d1Sopenharmony_ci    }
81484543d1Sopenharmony_ci
82484543d1Sopenharmony_ci    while (!whenMap_.empty() && now < whenMap_.begin()->first && !isExit_) {
83484543d1Sopenharmony_ci        uint64_t diff = whenMap_.begin()->first - now;
84484543d1Sopenharmony_ci        FFRT_LOGD("[queueId=%u] stuck in %llu us wait", queueId_, diff);
85484543d1Sopenharmony_ci        cond_.WaitFor(lock, std::chrono::microseconds(diff));
86484543d1Sopenharmony_ci        FFRT_LOGD("[queueId=%u] wakeup from wait", queueId_);
87484543d1Sopenharmony_ci        now = GetNow();
88484543d1Sopenharmony_ci    }
89484543d1Sopenharmony_ci
90484543d1Sopenharmony_ci    // abort dequeue in abnormal scenarios
91484543d1Sopenharmony_ci    if (whenMap_.empty()) {
92484543d1Sopenharmony_ci        int oldValue = concurrency_.fetch_sub(1); // 取不到后继的task,当前这个task正式退出
93484543d1Sopenharmony_ci        FFRT_LOGD("concurrency[%d] - 1 [queueId=%u] switch into inactive", oldValue, queueId_);
94484543d1Sopenharmony_ci        return nullptr;
95484543d1Sopenharmony_ci    }
96484543d1Sopenharmony_ci    FFRT_COND_DO_ERR(isExit_, return nullptr, "cannot pull task, [queueId=%u] is exiting", queueId_);
97484543d1Sopenharmony_ci
98484543d1Sopenharmony_ci    // dequeue next expired task by priority
99484543d1Sopenharmony_ci    return dequeFunc_(queueId_, now, whenMap_, nullptr);
100484543d1Sopenharmony_ci}
101484543d1Sopenharmony_ci
102484543d1Sopenharmony_civoid ConcurrentQueue::Stop()
103484543d1Sopenharmony_ci{
104484543d1Sopenharmony_ci    std::unique_lock lock(mutex_);
105484543d1Sopenharmony_ci    isExit_ = true;
106484543d1Sopenharmony_ci
107484543d1Sopenharmony_ci    for (auto it = whenMap_.begin(); it != whenMap_.end(); it++) {
108484543d1Sopenharmony_ci        if (it->second) {
109484543d1Sopenharmony_ci            it->second->Notify();
110484543d1Sopenharmony_ci            it->second->Destroy();
111484543d1Sopenharmony_ci        }
112484543d1Sopenharmony_ci    }
113484543d1Sopenharmony_ci    whenMap_.clear();
114484543d1Sopenharmony_ci    if (loop_ == nullptr) {
115484543d1Sopenharmony_ci        cond_.NotifyAll();
116484543d1Sopenharmony_ci    }
117484543d1Sopenharmony_ci
118484543d1Sopenharmony_ci    FFRT_LOGI("clear [queueId=%u] succ", queueId_);
119484543d1Sopenharmony_ci}
120484543d1Sopenharmony_ci
121484543d1Sopenharmony_cibool ConcurrentQueue::SetLoop(Loop* loop)
122484543d1Sopenharmony_ci{
123484543d1Sopenharmony_ci    if (loop == nullptr || loop_ != nullptr) {
124484543d1Sopenharmony_ci        FFRT_LOGE("queueId %s should bind to loop invalid", queueId_);
125484543d1Sopenharmony_ci        return false;
126484543d1Sopenharmony_ci    }
127484543d1Sopenharmony_ci
128484543d1Sopenharmony_ci    loop_ = loop;
129484543d1Sopenharmony_ci    isOnLoop_.store(true);
130484543d1Sopenharmony_ci    return true;
131484543d1Sopenharmony_ci}
132484543d1Sopenharmony_ci
133484543d1Sopenharmony_ciint ConcurrentQueue::PushDelayTaskToTimer(QueueTask* task)
134484543d1Sopenharmony_ci{
135484543d1Sopenharmony_ci    uint64_t delayMs = (task->GetDelay() - 1) / 1000 + 1;
136484543d1Sopenharmony_ci    int timeout = delayMs > INT_MAX ? INT_MAX : delayMs;
137484543d1Sopenharmony_ci    if (loop_->TimerStart(timeout, task, DelayTaskCb, false) < 0) {
138484543d1Sopenharmony_ci        FFRT_LOGE("push delay queue task to timer fail");
139484543d1Sopenharmony_ci        return FAILED;
140484543d1Sopenharmony_ci    }
141484543d1Sopenharmony_ci    return SUCC;
142484543d1Sopenharmony_ci}
143484543d1Sopenharmony_ci
144484543d1Sopenharmony_cistd::unique_ptr<BaseQueue> CreateConcurrentQueue(const ffrt_queue_attr_t* attr)
145484543d1Sopenharmony_ci{
146484543d1Sopenharmony_ci    int maxConcurrency = ffrt_queue_attr_get_max_concurrency(attr) <= 0 ? 1 : ffrt_queue_attr_get_max_concurrency(attr);
147484543d1Sopenharmony_ci    return std::make_unique<ConcurrentQueue>(maxConcurrency);
148484543d1Sopenharmony_ci}
149484543d1Sopenharmony_ci} // namespace ffrt
150