1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "base/thread/background_task_executor.h"
17
18 #include <pthread.h>
19
20 #include "base/log/log.h"
21 #include "base/memory/memory_monitor.h"
22 #include "base/thread/frame_trace_adapter.h"
23
24 namespace OHOS::Ace {
25 namespace {
26
27 constexpr size_t MAX_BACKGROUND_THREADS = 8;
28 constexpr uint32_t PURGE_FLAG_MASK = (1 << MAX_BACKGROUND_THREADS) - 1;
29
SetThreadName(uint32_t threadNo)30 void SetThreadName(uint32_t threadNo)
31 {
32 std::string name("ace.bg.");
33 name.append(std::to_string(threadNo));
34 #if defined(MAC_PLATFORM) || defined(IOS_PLATFORM)
35 pthread_setname_np(name.c_str());
36 #else
37 pthread_setname_np(pthread_self(), name.c_str());
38 #endif
39 }
40
41 } // namespace
42
GetInstance()43 BackgroundTaskExecutor& BackgroundTaskExecutor::GetInstance()
44 {
45 static BackgroundTaskExecutor instance;
46 return instance;
47 }
48
BackgroundTaskExecutor()49 BackgroundTaskExecutor::BackgroundTaskExecutor() : maxThreadNum_(MAX_BACKGROUND_THREADS)
50 {
51 FrameTraceAdapter* ft = FrameTraceAdapter::GetInstance();
52 if (ft != nullptr && ft->IsEnabled()) {
53 return;
54 } else {
55 LOGI("Create ace bg threads pool.");
56 if (maxThreadNum_ > 1) {
57 // Start other threads in the first created thread.
58 PostTask([this, num = maxThreadNum_ - 1]() { StartNewThreads(num); });
59 }
60
61 // Make sure there is at least 1 thread in background thread pool.
62 StartNewThreads(1);
63 }
64 }
65
~BackgroundTaskExecutor()66 BackgroundTaskExecutor::~BackgroundTaskExecutor()
67 {
68 std::list<std::thread> threads;
69
70 {
71 std::lock_guard<std::mutex> lock(mutex_);
72 running_ = false;
73 condition_.notify_all();
74 threads = std::move(threads_);
75 }
76
77 for (auto& threadInPool : threads) {
78 threadInPool.join();
79 }
80 }
81
PostTask(Task&& task, BgTaskPriority priority)82 bool BackgroundTaskExecutor::PostTask(Task&& task, BgTaskPriority priority)
83 {
84 if (!task) {
85 return false;
86 }
87
88 std::lock_guard<std::mutex> lock(mutex_);
89 if (!running_) {
90 return false;
91 }
92 FrameTraceAdapter* ft = FrameTraceAdapter::GetInstance();
93 if (ft != nullptr && ft->IsEnabled()) {
94 switch (priority) {
95 case BgTaskPriority::LOW:
96 ft->SlowExecute(std::move(task));
97 break;
98 default:
99 ft->QuickExecute(std::move(task));
100 break;
101 }
102 return true;
103 }
104 switch (priority) {
105 case BgTaskPriority::LOW:
106 lowPriorityTasks_.emplace_back(std::move(task));
107 break;
108 default:
109 tasks_.emplace_back(std::move(task));
110 break;
111 }
112 condition_.notify_one();
113 return true;
114 }
115
PostTask(const Task& task, BgTaskPriority priority)116 bool BackgroundTaskExecutor::PostTask(const Task& task, BgTaskPriority priority)
117 {
118 if (!task) {
119 return false;
120 }
121
122 std::lock_guard<std::mutex> lock(mutex_);
123 if (!running_) {
124 return false;
125 }
126 FrameTraceAdapter* ft = FrameTraceAdapter::GetInstance();
127 if (ft != nullptr && ft->IsEnabled()) {
128 Task variableTask = task;
129 switch (priority) {
130 case BgTaskPriority::LOW:
131 ft->SlowExecute(std::move(variableTask));
132 break;
133 default:
134 ft->QuickExecute(std::move(variableTask));
135 }
136 return true;
137 }
138 switch (priority) {
139 case BgTaskPriority::LOW:
140 lowPriorityTasks_.emplace_back(task);
141 break;
142 default:
143 tasks_.emplace_back(task);
144 break;
145 }
146 condition_.notify_one();
147 return true;
148 }
149
StartNewThreads(size_t num)150 void BackgroundTaskExecutor::StartNewThreads(size_t num)
151 {
152 uint32_t currentThreadNo = 0;
153
154 {
155 std::lock_guard<std::mutex> lock(mutex_);
156 if (!running_ || currentThreadNum_ >= maxThreadNum_) {
157 return;
158 }
159 if (currentThreadNum_ + num > maxThreadNum_) {
160 num = maxThreadNum_ - currentThreadNum_;
161 }
162 currentThreadNo = currentThreadNum_ + 1;
163 currentThreadNum_ += num;
164 }
165
166 // Start new threads.
167 std::list<std::thread> newThreads;
168 for (size_t idx = 0; idx < num; ++idx) {
169 newThreads.emplace_back(std::bind(&BackgroundTaskExecutor::ThreadLoop, this, currentThreadNo + idx));
170 }
171
172 {
173 std::lock_guard<std::mutex> lock(mutex_);
174 if (running_) {
175 threads_.splice(threads_.end(), newThreads);
176 }
177 }
178
179 for (auto& newThread : newThreads) {
180 // Join the new thread if stop running.
181 if (newThread.joinable()) {
182 newThread.join();
183 }
184 }
185 }
186
ThreadLoop(uint32_t threadNo)187 void BackgroundTaskExecutor::ThreadLoop(uint32_t threadNo)
188 {
189 if (threadNo == 0) {
190 return;
191 }
192 SetThreadName(threadNo);
193 Task task;
194 const uint32_t purgeFlag = (1u << (threadNo - 1u));
195 std::unique_lock<std::mutex> lock(mutex_);
196 while (running_) {
197 if (tasks_.empty() && lowPriorityTasks_.empty()) {
198 if ((purgeFlags_ & purgeFlag) != purgeFlag) {
199 condition_.wait(lock);
200 continue;
201 }
202
203 lock.unlock();
204 PurgeMallocCache();
205 lock.lock();
206 purgeFlags_ &= ~purgeFlag;
207 continue;
208 }
209 // deal with tasks_ first. do lowPriorityTasks_ only when all tasks_ done.
210 if (!tasks_.empty()) {
211 task = std::move(tasks_.front());
212 tasks_.pop_front();
213 } else {
214 task = std::move(lowPriorityTasks_.front());
215 lowPriorityTasks_.pop_front();
216 }
217
218 lock.unlock();
219 // Execute the task and clear after execution.
220 task();
221 task = nullptr;
222 lock.lock();
223 }
224 }
225
TriggerGarbageCollection()226 void BackgroundTaskExecutor::TriggerGarbageCollection()
227 {
228 std::lock_guard<std::mutex> lock(mutex_);
229 purgeFlags_ = PURGE_FLAG_MASK;
230 condition_.notify_all();
231 }
232
233 } // namespace OHOS::Ace
234