1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "queue_monitor.h"
16 #include <sstream>
17 #include "dfx/log/ffrt_log_api.h"
18 #include "util/slab.h"
19 #include "sync/sync.h"
20 #include "c/ffrt_dump.h"
21 #include "dfx/sysevent/sysevent.h"
22 #include "internal_inc/osal.h"
23 
24 namespace {
25 constexpr int PROCESS_NAME_BUFFER_LENGTH = 1024;
26 constexpr uint32_t INVALID_TASK_ID = 0;
27 constexpr uint32_t TIME_CONVERT_UNIT = 1000;
28 constexpr uint64_t QUEUE_INFO_INITIAL_CAPACITY = 64;
29 constexpr uint64_t ALLOW_TIME_ACC_ERROR_US = 500;
30 constexpr uint64_t MIN_TIMEOUT_THRESHOLD_US = 1000;
31 constexpr uint64_t DESTRUCT_TRY_COUNT = 100;
32 
GetDelayedTimeStamp(uint64_t delayUs)33 inline std::chrono::steady_clock::time_point GetDelayedTimeStamp(uint64_t delayUs)
34 {
35     return std::chrono::steady_clock::now() + std::chrono::microseconds(delayUs);
36 }
37 }
38 
39 namespace ffrt {
QueueMonitor()40 QueueMonitor::QueueMonitor()
41 {
42     FFRT_LOGI("queue monitor ctor enter");
43     queuesRunningInfo_.reserve(QUEUE_INFO_INITIAL_CAPACITY);
44     queuesStructInfo_.reserve(QUEUE_INFO_INITIAL_CAPACITY);
45     lastReportedTask_.reserve(QUEUE_INFO_INITIAL_CAPACITY);
46     we_ = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
47     uint64_t timeout = ffrt_task_timeout_get_threshold() * TIME_CONVERT_UNIT;
48     if (timeout < MIN_TIMEOUT_THRESHOLD_US) {
49         timeoutUs_ = 0;
50         FFRT_LOGE("failed to setup watchdog because [%llu] us less than precision threshold", timeout);
51         return;
52     }
53     timeoutUs_ = timeout;
54     SendDelayedWorker(GetDelayedTimeStamp(timeoutUs_));
55     FFRT_LOGI("queue monitor ctor leave, watchdog timeout %llu us", timeoutUs_);
56 }
57 
~QueueMonitor()58 QueueMonitor::~QueueMonitor()
59 {
60     exit_.store(true);
61     FFRT_LOGI("destruction of QueueMonitor enter");
62     int tryCnt = DESTRUCT_TRY_COUNT;
63     // 取消定时器成功,或者中断了发送定时器,则释放we完成析构
64     while (!DelayedRemove(we_->tp, we_) && !abortSendTimer_.load()) {
65         if (--tryCnt < 0) {
66             break;
67         }
68         usleep(MIN_TIMEOUT_THRESHOLD_US);
69     }
70     SimpleAllocator<WaitUntilEntry>::FreeMem(we_);
71     FFRT_LOGI("destruction of QueueMonitor leave");
72 }
73 
GetInstance()74 QueueMonitor& QueueMonitor::GetInstance()
75 {
76     static QueueMonitor instance;
77     return instance;
78 }
79 
RegisterQueueId(uint32_t queueId, QueueHandler* queueStruct)80 void QueueMonitor::RegisterQueueId(uint32_t queueId, QueueHandler* queueStruct)
81 {
82     std::unique_lock lock(mutex_);
83     if (queueId == queuesRunningInfo_.size()) {
84         queuesRunningInfo_.emplace_back(std::make_pair(INVALID_TASK_ID, std::chrono::steady_clock::now()));
85         queuesStructInfo_.emplace_back(queueStruct);
86         lastReportedTask_.emplace_back(INVALID_TASK_ID);
87         FFRT_LOGD("queue registration in monitor gid=%u in turn succ", queueId);
88         return;
89     }
90 
91     // only need to ensure that the corresponding info index has been initialized after constructed.
92     if (queueId > queuesRunningInfo_.size()) {
93         for (uint32_t i = queuesRunningInfo_.size(); i <= queueId; ++i) {
94             queuesRunningInfo_.emplace_back(std::make_pair(INVALID_TASK_ID, std::chrono::steady_clock::now()));
95             queuesStructInfo_.emplace_back(nullptr);
96             lastReportedTask_.emplace_back(INVALID_TASK_ID);
97         }
98         queuesStructInfo_[queueId] = queueStruct;
99     }
100     if (queuesStructInfo_[queueId] == nullptr) {
101         queuesStructInfo_[queueId] = queueStruct;
102     }
103     FFRT_LOGD("queue registration in monitor gid=%u by skip succ", queueId);
104 }
105 
ResetQueueInfo(uint32_t queueId)106 void QueueMonitor::ResetQueueInfo(uint32_t queueId)
107 {
108     std::shared_lock lock(mutex_);
109     FFRT_COND_DO_ERR((queuesRunningInfo_.size() <= queueId), return,
110         "ResetQueueInfo queueId=%u access violation, RunningInfo_.size=%u", queueId, queuesRunningInfo_.size());
111     queuesRunningInfo_[queueId].first = INVALID_TASK_ID;
112     lastReportedTask_[queueId] = INVALID_TASK_ID;
113 }
114 
ResetQueueStruct(uint32_t queueId)115 void QueueMonitor::ResetQueueStruct(uint32_t queueId)
116 {
117     std::shared_lock lock(mutex_);
118     FFRT_COND_DO_ERR((queuesStructInfo_.size() <= queueId), return,
119         "ResetQueueStruct queueId=%u access violation, StructInfo_.size=%u", queueId, queuesStructInfo_.size());
120     queuesStructInfo_[queueId] = nullptr;
121 }
122 
UpdateQueueInfo(uint32_t queueId, const uint64_t &taskId)123 void QueueMonitor::UpdateQueueInfo(uint32_t queueId, const uint64_t &taskId)
124 {
125     std::shared_lock lock(mutex_);
126     FFRT_COND_DO_ERR((queuesRunningInfo_.size() <= queueId), return,
127         "UpdateQueueInfo queueId=%u access violation, RunningInfo_.size=%u", queueId, queuesRunningInfo_.size());
128     TimePoint now = std::chrono::steady_clock::now();
129     queuesRunningInfo_[queueId] = {taskId, now};
130     if (exit_.exchange(false)) {
131         abortSendTimer_.store(false);
132         SendDelayedWorker(now + std::chrono::microseconds(timeoutUs_));
133     }
134 }
135 
QueryQueueStatus(uint32_t queueId)136 uint64_t QueueMonitor::QueryQueueStatus(uint32_t queueId)
137 {
138     std::shared_lock lock(mutex_);
139     FFRT_COND_DO_ERR((queuesRunningInfo_.size() <= queueId), return INVALID_TASK_ID,
140         "QueryQueueStatus queueId=%u access violation, RunningInfo_.size=%u", queueId, queuesRunningInfo_.size());
141     return queuesRunningInfo_[queueId].first;
142 }
143 
SendDelayedWorker(TimePoint delay)144 void QueueMonitor::SendDelayedWorker(TimePoint delay)
145 {
146     FFRT_COND_DO_ERR(exit_.load(), abortSendTimer_.store(true);
147         return;,
148         "exit_.load() is true");
149 
150     we_->tp = delay;
151     we_->cb = ([this](WaitEntry* we_) { CheckQueuesStatus(); });
152 
153     bool result = DelayedWakeup(we_->tp, we_, we_->cb);
154     // insurance mechanism, generally does not fail
155     while (!result) {
156         FFRT_LOGW("failed to set delayedworker because the given timestamp has passed");
157         we_->tp = GetDelayedTimeStamp(ALLOW_TIME_ACC_ERROR_US);
158         result = DelayedWakeup(we_->tp, we_, we_->cb);
159     }
160 }
161 
ResetTaskTimestampAfterWarning(uint32_t queueId, const uint64_t &taskId)162 void QueueMonitor::ResetTaskTimestampAfterWarning(uint32_t queueId, const uint64_t &taskId)
163 {
164     std::unique_lock lock(mutex_);
165     if (queuesRunningInfo_[queueId].first == taskId) {
166         queuesRunningInfo_[queueId].second += std::chrono::microseconds(timeoutUs_);
167     }
168 }
169 
CheckQueuesStatus()170 void QueueMonitor::CheckQueuesStatus()
171 {
172     {
173         std::unique_lock lock(mutex_);
174         auto iter = std::find_if(queuesRunningInfo_.cbegin(), queuesRunningInfo_.cend(),
175             [](const auto& pair) { return pair.first != INVALID_TASK_ID; });
176         if (iter == queuesRunningInfo_.cend()) {
177             exit_ = true;
178             return;
179         }
180     }
181 
182     TimePoint oldestStartedTime = std::chrono::steady_clock::now();
183     TimePoint startThreshold = oldestStartedTime - std::chrono::microseconds(timeoutUs_ - ALLOW_TIME_ACC_ERROR_US);
184     uint64_t taskId = 0;
185     uint32_t queueRunningInfoSize = 0;
186     TimePoint taskTimestamp = oldestStartedTime;
187     {
188         std::shared_lock lock(mutex_);
189         queueRunningInfoSize = queuesRunningInfo_.size();
190     }
191 
192     // Displays information about queues that hold locks for a long time.
193     for (uint32_t i = 0; i < queueRunningInfoSize; ++i) {
194         if (queuesStructInfo_[i] == nullptr || queuesStructInfo_[i]->GetQueue() == nullptr) {
195             continue;
196         }
197 
198         if (!queuesStructInfo_[i]->GetQueue()->HasLock() || !queuesStructInfo_[i]->GetQueue()->IsLockTimeout()) {
199             continue;
200         }
201 
202         queuesStructInfo_[i]->GetQueue()->PrintMutexOwner();
203     }
204 
205     // Displays information about queues whose tasks time out.
206     for (uint32_t i = 0; i < queueRunningInfoSize; ++i) {
207         {
208             std::unique_lock lock(mutex_);
209             taskId = queuesRunningInfo_[i].first;
210             taskTimestamp = queuesRunningInfo_[i].second;
211         }
212 
213         if (taskId == INVALID_TASK_ID) {
214             continue;
215         }
216 
217         if (taskTimestamp < startThreshold) {
218             std::stringstream ss;
219             char processName[PROCESS_NAME_BUFFER_LENGTH];
220             GetProcessName(processName, PROCESS_NAME_BUFFER_LENGTH);
221             ss << "Serial_Queue_Timeout, process name:[" << processName << "], serial queue qid:[" << i
222                 << "], serial task gid:[" << taskId << "], execution:[" << timeoutUs_ << "] us.";
223             if (queuesStructInfo_[i] != nullptr) {
224                 ss << queuesStructInfo_[i]->GetDfxInfo();
225             }
226             FFRT_LOGE("%s", ss.str().c_str());
227 #ifdef FFRT_SEND_EVENT
228             if (lastReportedTask_[i] != taskId) {
229                 lastReportedTask_[i] = taskId;
230                 std::string processNameStr = std::string(processName);
231                 std::string senarioName = "Serial_Queue_Timeout";
232                 TaskTimeoutReport(ss, processNameStr, senarioName);
233             }
234 #endif
235             ffrt_task_timeout_cb func = ffrt_task_timeout_get_cb();
236             if (func) {
237                 func(taskId, ss.str().c_str(), ss.str().size());
238             }
239             // reset timeout task timestamp for next warning
240             ResetTaskTimestampAfterWarning(i, taskId);
241             continue;
242         }
243 
244         if (taskTimestamp < oldestStartedTime) {
245             oldestStartedTime = taskTimestamp;
246         }
247     }
248 
249     SendDelayedWorker(oldestStartedTime + std::chrono::microseconds(timeoutUs_));
250     FFRT_LOGD("global watchdog completed queue status check and scheduled the next");
251 }
252 
HasQueueActive()253 bool QueueMonitor::HasQueueActive()
254 {
255     std::unique_lock lock(mutex_);
256     for (uint32_t i = 0; i < queuesRunningInfo_.size(); ++i) {
257         if (queuesRunningInfo_[i].first != INVALID_TASK_ID) {
258             return true;
259         }
260     }
261     return false;
262 }
263 } // namespace ffrt
264