1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "concurrent_queue.h"
17 #include <climits>
18 #include "dfx/log/ffrt_log_api.h"
19 #include "tm/queue_task.h"
20 #include "eu/loop.h"
21
22 namespace ffrt {
DelayTaskCb(void* task)23 static void DelayTaskCb(void* task)
24 {
25 static_cast<QueueTask*>(task)->Execute();
26 }
27
~ConcurrentQueue()28 ConcurrentQueue::~ConcurrentQueue()
29 {
30 FFRT_LOGI("destruct concurrent queueId=%u leave", queueId_);
31 }
32
Push(QueueTask* task)33 int ConcurrentQueue::Push(QueueTask* task)
34 {
35 std::unique_lock lock(mutex_);
36 FFRT_COND_DO_ERR(isExit_, return FAILED, "cannot push task, [queueId=%u] is exiting", queueId_);
37 if (task->GetPriority() > ffrt_queue_priority_idle) {
38 task->SetPriority(ffrt_queue_priority_low);
39 }
40
41 if (loop_ != nullptr) {
42 if (task->GetDelay() == 0) {
43 whenMap_.insert({task->GetUptime(), task});
44 loop_->WakeUp();
45 return SUCC;
46 }
47 return PushDelayTaskToTimer(task);
48 }
49 FFRT_COND_DO_ERR(IsOnLoop(), return FAILED, "cannot push task, [queueId=%u] loop empty", queueId_);
50
51 if (concurrency_.load() < maxConcurrency_) {
52 int oldValue = concurrency_.fetch_add(1);
53 FFRT_LOGD("task [gid=%llu] concurrency[%u] + 1 [queueId=%u]", task->gid, oldValue, queueId_);
54
55 if (task->GetDelay() > 0) {
56 whenMap_.insert({task->GetUptime(), task});
57 }
58
59 return CONCURRENT;
60 }
61
62 whenMap_.insert({task->GetUptime(), task});
63 if (task == whenMap_.begin()->second) {
64 cond_.NotifyAll();
65 }
66
67 return SUCC;
68 }
69
Pull()70 QueueTask* ConcurrentQueue::Pull()
71 {
72 std::unique_lock lock(mutex_);
73 // wait for delay task
74 uint64_t now = GetNow();
75 if (loop_ != nullptr) {
76 if (!whenMap_.empty() && now >= whenMap_.begin()->first && !isExit_) {
77 return dequeFunc_(queueId_, now, whenMap_, nullptr);
78 }
79 return nullptr;
80 }
81
82 while (!whenMap_.empty() && now < whenMap_.begin()->first && !isExit_) {
83 uint64_t diff = whenMap_.begin()->first - now;
84 FFRT_LOGD("[queueId=%u] stuck in %llu us wait", queueId_, diff);
85 cond_.WaitFor(lock, std::chrono::microseconds(diff));
86 FFRT_LOGD("[queueId=%u] wakeup from wait", queueId_);
87 now = GetNow();
88 }
89
90 // abort dequeue in abnormal scenarios
91 if (whenMap_.empty()) {
92 int oldValue = concurrency_.fetch_sub(1); // 取不到后继的task,当前这个task正式退出
93 FFRT_LOGD("concurrency[%d] - 1 [queueId=%u] switch into inactive", oldValue, queueId_);
94 return nullptr;
95 }
96 FFRT_COND_DO_ERR(isExit_, return nullptr, "cannot pull task, [queueId=%u] is exiting", queueId_);
97
98 // dequeue next expired task by priority
99 return dequeFunc_(queueId_, now, whenMap_, nullptr);
100 }
101
Stop()102 void ConcurrentQueue::Stop()
103 {
104 std::unique_lock lock(mutex_);
105 isExit_ = true;
106
107 for (auto it = whenMap_.begin(); it != whenMap_.end(); it++) {
108 if (it->second) {
109 it->second->Notify();
110 it->second->Destroy();
111 }
112 }
113 whenMap_.clear();
114 if (loop_ == nullptr) {
115 cond_.NotifyAll();
116 }
117
118 FFRT_LOGI("clear [queueId=%u] succ", queueId_);
119 }
120
SetLoop(Loop* loop)121 bool ConcurrentQueue::SetLoop(Loop* loop)
122 {
123 if (loop == nullptr || loop_ != nullptr) {
124 FFRT_LOGE("queueId %s should bind to loop invalid", queueId_);
125 return false;
126 }
127
128 loop_ = loop;
129 isOnLoop_.store(true);
130 return true;
131 }
132
PushDelayTaskToTimer(QueueTask* task)133 int ConcurrentQueue::PushDelayTaskToTimer(QueueTask* task)
134 {
135 uint64_t delayMs = (task->GetDelay() - 1) / 1000 + 1;
136 int timeout = delayMs > INT_MAX ? INT_MAX : delayMs;
137 if (loop_->TimerStart(timeout, task, DelayTaskCb, false) < 0) {
138 FFRT_LOGE("push delay queue task to timer fail");
139 return FAILED;
140 }
141 return SUCC;
142 }
143
CreateConcurrentQueue(const ffrt_queue_attr_t* attr)144 std::unique_ptr<BaseQueue> CreateConcurrentQueue(const ffrt_queue_attr_t* attr)
145 {
146 int maxConcurrency = ffrt_queue_attr_get_max_concurrency(attr) <= 0 ? 1 : ffrt_queue_attr_get_max_concurrency(attr);
147 return std::make_unique<ConcurrentQueue>(maxConcurrency);
148 }
149 } // namespace ffrt
150