1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "queue_monitor.h"
16 #include <sstream>
17 #include "dfx/log/ffrt_log_api.h"
18 #include "util/slab.h"
19 #include "sync/sync.h"
20 #include "c/ffrt_dump.h"
21 #include "dfx/sysevent/sysevent.h"
22 #include "internal_inc/osal.h"
23
24 namespace {
25 constexpr int PROCESS_NAME_BUFFER_LENGTH = 1024;
26 constexpr uint32_t INVALID_TASK_ID = 0;
27 constexpr uint32_t TIME_CONVERT_UNIT = 1000;
28 constexpr uint64_t QUEUE_INFO_INITIAL_CAPACITY = 64;
29 constexpr uint64_t ALLOW_TIME_ACC_ERROR_US = 500;
30 constexpr uint64_t MIN_TIMEOUT_THRESHOLD_US = 1000;
31 constexpr uint64_t DESTRUCT_TRY_COUNT = 100;
32
GetDelayedTimeStamp(uint64_t delayUs)33 inline std::chrono::steady_clock::time_point GetDelayedTimeStamp(uint64_t delayUs)
34 {
35 return std::chrono::steady_clock::now() + std::chrono::microseconds(delayUs);
36 }
37 }
38
39 namespace ffrt {
QueueMonitor()40 QueueMonitor::QueueMonitor()
41 {
42 FFRT_LOGI("queue monitor ctor enter");
43 queuesRunningInfo_.reserve(QUEUE_INFO_INITIAL_CAPACITY);
44 queuesStructInfo_.reserve(QUEUE_INFO_INITIAL_CAPACITY);
45 lastReportedTask_.reserve(QUEUE_INFO_INITIAL_CAPACITY);
46 we_ = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
47 uint64_t timeout = ffrt_task_timeout_get_threshold() * TIME_CONVERT_UNIT;
48 if (timeout < MIN_TIMEOUT_THRESHOLD_US) {
49 timeoutUs_ = 0;
50 FFRT_LOGE("failed to setup watchdog because [%llu] us less than precision threshold", timeout);
51 return;
52 }
53 timeoutUs_ = timeout;
54 SendDelayedWorker(GetDelayedTimeStamp(timeoutUs_));
55 FFRT_LOGI("queue monitor ctor leave, watchdog timeout %llu us", timeoutUs_);
56 }
57
~QueueMonitor()58 QueueMonitor::~QueueMonitor()
59 {
60 exit_.store(true);
61 FFRT_LOGI("destruction of QueueMonitor enter");
62 int tryCnt = DESTRUCT_TRY_COUNT;
63 // 取消定时器成功,或者中断了发送定时器,则释放we完成析构
64 while (!DelayedRemove(we_->tp, we_) && !abortSendTimer_.load()) {
65 if (--tryCnt < 0) {
66 break;
67 }
68 usleep(MIN_TIMEOUT_THRESHOLD_US);
69 }
70 SimpleAllocator<WaitUntilEntry>::FreeMem(we_);
71 FFRT_LOGI("destruction of QueueMonitor leave");
72 }
73
GetInstance()74 QueueMonitor& QueueMonitor::GetInstance()
75 {
76 static QueueMonitor instance;
77 return instance;
78 }
79
RegisterQueueId(uint32_t queueId, QueueHandler* queueStruct)80 void QueueMonitor::RegisterQueueId(uint32_t queueId, QueueHandler* queueStruct)
81 {
82 std::unique_lock lock(mutex_);
83 if (queueId == queuesRunningInfo_.size()) {
84 queuesRunningInfo_.emplace_back(std::make_pair(INVALID_TASK_ID, std::chrono::steady_clock::now()));
85 queuesStructInfo_.emplace_back(queueStruct);
86 lastReportedTask_.emplace_back(INVALID_TASK_ID);
87 FFRT_LOGD("queue registration in monitor gid=%u in turn succ", queueId);
88 return;
89 }
90
91 // only need to ensure that the corresponding info index has been initialized after constructed.
92 if (queueId > queuesRunningInfo_.size()) {
93 for (uint32_t i = queuesRunningInfo_.size(); i <= queueId; ++i) {
94 queuesRunningInfo_.emplace_back(std::make_pair(INVALID_TASK_ID, std::chrono::steady_clock::now()));
95 queuesStructInfo_.emplace_back(nullptr);
96 lastReportedTask_.emplace_back(INVALID_TASK_ID);
97 }
98 queuesStructInfo_[queueId] = queueStruct;
99 }
100 if (queuesStructInfo_[queueId] == nullptr) {
101 queuesStructInfo_[queueId] = queueStruct;
102 }
103 FFRT_LOGD("queue registration in monitor gid=%u by skip succ", queueId);
104 }
105
ResetQueueInfo(uint32_t queueId)106 void QueueMonitor::ResetQueueInfo(uint32_t queueId)
107 {
108 std::shared_lock lock(mutex_);
109 FFRT_COND_DO_ERR((queuesRunningInfo_.size() <= queueId), return,
110 "ResetQueueInfo queueId=%u access violation, RunningInfo_.size=%u", queueId, queuesRunningInfo_.size());
111 queuesRunningInfo_[queueId].first = INVALID_TASK_ID;
112 lastReportedTask_[queueId] = INVALID_TASK_ID;
113 }
114
ResetQueueStruct(uint32_t queueId)115 void QueueMonitor::ResetQueueStruct(uint32_t queueId)
116 {
117 std::shared_lock lock(mutex_);
118 FFRT_COND_DO_ERR((queuesStructInfo_.size() <= queueId), return,
119 "ResetQueueStruct queueId=%u access violation, StructInfo_.size=%u", queueId, queuesStructInfo_.size());
120 queuesStructInfo_[queueId] = nullptr;
121 }
122
UpdateQueueInfo(uint32_t queueId, const uint64_t &taskId)123 void QueueMonitor::UpdateQueueInfo(uint32_t queueId, const uint64_t &taskId)
124 {
125 std::shared_lock lock(mutex_);
126 FFRT_COND_DO_ERR((queuesRunningInfo_.size() <= queueId), return,
127 "UpdateQueueInfo queueId=%u access violation, RunningInfo_.size=%u", queueId, queuesRunningInfo_.size());
128 TimePoint now = std::chrono::steady_clock::now();
129 queuesRunningInfo_[queueId] = {taskId, now};
130 if (exit_.exchange(false)) {
131 abortSendTimer_.store(false);
132 SendDelayedWorker(now + std::chrono::microseconds(timeoutUs_));
133 }
134 }
135
QueryQueueStatus(uint32_t queueId)136 uint64_t QueueMonitor::QueryQueueStatus(uint32_t queueId)
137 {
138 std::shared_lock lock(mutex_);
139 FFRT_COND_DO_ERR((queuesRunningInfo_.size() <= queueId), return INVALID_TASK_ID,
140 "QueryQueueStatus queueId=%u access violation, RunningInfo_.size=%u", queueId, queuesRunningInfo_.size());
141 return queuesRunningInfo_[queueId].first;
142 }
143
SendDelayedWorker(TimePoint delay)144 void QueueMonitor::SendDelayedWorker(TimePoint delay)
145 {
146 FFRT_COND_DO_ERR(exit_.load(), abortSendTimer_.store(true);
147 return;,
148 "exit_.load() is true");
149
150 we_->tp = delay;
151 we_->cb = ([this](WaitEntry* we_) { CheckQueuesStatus(); });
152
153 bool result = DelayedWakeup(we_->tp, we_, we_->cb);
154 // insurance mechanism, generally does not fail
155 while (!result) {
156 FFRT_LOGW("failed to set delayedworker because the given timestamp has passed");
157 we_->tp = GetDelayedTimeStamp(ALLOW_TIME_ACC_ERROR_US);
158 result = DelayedWakeup(we_->tp, we_, we_->cb);
159 }
160 }
161
ResetTaskTimestampAfterWarning(uint32_t queueId, const uint64_t &taskId)162 void QueueMonitor::ResetTaskTimestampAfterWarning(uint32_t queueId, const uint64_t &taskId)
163 {
164 std::unique_lock lock(mutex_);
165 if (queuesRunningInfo_[queueId].first == taskId) {
166 queuesRunningInfo_[queueId].second += std::chrono::microseconds(timeoutUs_);
167 }
168 }
169
CheckQueuesStatus()170 void QueueMonitor::CheckQueuesStatus()
171 {
172 {
173 std::unique_lock lock(mutex_);
174 auto iter = std::find_if(queuesRunningInfo_.cbegin(), queuesRunningInfo_.cend(),
175 [](const auto& pair) { return pair.first != INVALID_TASK_ID; });
176 if (iter == queuesRunningInfo_.cend()) {
177 exit_ = true;
178 return;
179 }
180 }
181
182 TimePoint oldestStartedTime = std::chrono::steady_clock::now();
183 TimePoint startThreshold = oldestStartedTime - std::chrono::microseconds(timeoutUs_ - ALLOW_TIME_ACC_ERROR_US);
184 uint64_t taskId = 0;
185 uint32_t queueRunningInfoSize = 0;
186 TimePoint taskTimestamp = oldestStartedTime;
187 {
188 std::shared_lock lock(mutex_);
189 queueRunningInfoSize = queuesRunningInfo_.size();
190 }
191
192 // Displays information about queues that hold locks for a long time.
193 for (uint32_t i = 0; i < queueRunningInfoSize; ++i) {
194 if (queuesStructInfo_[i] == nullptr || queuesStructInfo_[i]->GetQueue() == nullptr) {
195 continue;
196 }
197
198 if (!queuesStructInfo_[i]->GetQueue()->HasLock() || !queuesStructInfo_[i]->GetQueue()->IsLockTimeout()) {
199 continue;
200 }
201
202 queuesStructInfo_[i]->GetQueue()->PrintMutexOwner();
203 }
204
205 // Displays information about queues whose tasks time out.
206 for (uint32_t i = 0; i < queueRunningInfoSize; ++i) {
207 {
208 std::unique_lock lock(mutex_);
209 taskId = queuesRunningInfo_[i].first;
210 taskTimestamp = queuesRunningInfo_[i].second;
211 }
212
213 if (taskId == INVALID_TASK_ID) {
214 continue;
215 }
216
217 if (taskTimestamp < startThreshold) {
218 std::stringstream ss;
219 char processName[PROCESS_NAME_BUFFER_LENGTH];
220 GetProcessName(processName, PROCESS_NAME_BUFFER_LENGTH);
221 ss << "Serial_Queue_Timeout, process name:[" << processName << "], serial queue qid:[" << i
222 << "], serial task gid:[" << taskId << "], execution:[" << timeoutUs_ << "] us.";
223 if (queuesStructInfo_[i] != nullptr) {
224 ss << queuesStructInfo_[i]->GetDfxInfo();
225 }
226 FFRT_LOGE("%s", ss.str().c_str());
227 #ifdef FFRT_SEND_EVENT
228 if (lastReportedTask_[i] != taskId) {
229 lastReportedTask_[i] = taskId;
230 std::string processNameStr = std::string(processName);
231 std::string senarioName = "Serial_Queue_Timeout";
232 TaskTimeoutReport(ss, processNameStr, senarioName);
233 }
234 #endif
235 ffrt_task_timeout_cb func = ffrt_task_timeout_get_cb();
236 if (func) {
237 func(taskId, ss.str().c_str(), ss.str().size());
238 }
239 // reset timeout task timestamp for next warning
240 ResetTaskTimestampAfterWarning(i, taskId);
241 continue;
242 }
243
244 if (taskTimestamp < oldestStartedTime) {
245 oldestStartedTime = taskTimestamp;
246 }
247 }
248
249 SendDelayedWorker(oldestStartedTime + std::chrono::microseconds(timeoutUs_));
250 FFRT_LOGD("global watchdog completed queue status check and scheduled the next");
251 }
252
HasQueueActive()253 bool QueueMonitor::HasQueueActive()
254 {
255 std::unique_lock lock(mutex_);
256 for (uint32_t i = 0; i < queuesRunningInfo_.size(); ++i) {
257 if (queuesRunningInfo_[i].first != INVALID_TASK_ID) {
258 return true;
259 }
260 }
261 return false;
262 }
263 } // namespace ffrt
264