1/*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15#include "queue_monitor.h"
16#include <sstream>
17#include "dfx/log/ffrt_log_api.h"
18#include "util/slab.h"
19#include "sync/sync.h"
20#include "c/ffrt_dump.h"
21#include "dfx/sysevent/sysevent.h"
22#include "internal_inc/osal.h"
23
24namespace {
25constexpr int PROCESS_NAME_BUFFER_LENGTH = 1024;
26constexpr uint32_t INVALID_TASK_ID = 0;
27constexpr uint32_t TIME_CONVERT_UNIT = 1000;
28constexpr uint64_t QUEUE_INFO_INITIAL_CAPACITY = 64;
29constexpr uint64_t ALLOW_TIME_ACC_ERROR_US = 500;
30constexpr uint64_t MIN_TIMEOUT_THRESHOLD_US = 1000;
31constexpr uint64_t DESTRUCT_TRY_COUNT = 100;
32
33inline std::chrono::steady_clock::time_point GetDelayedTimeStamp(uint64_t delayUs)
34{
35    return std::chrono::steady_clock::now() + std::chrono::microseconds(delayUs);
36}
37}
38
39namespace ffrt {
40QueueMonitor::QueueMonitor()
41{
42    FFRT_LOGI("queue monitor ctor enter");
43    queuesRunningInfo_.reserve(QUEUE_INFO_INITIAL_CAPACITY);
44    queuesStructInfo_.reserve(QUEUE_INFO_INITIAL_CAPACITY);
45    lastReportedTask_.reserve(QUEUE_INFO_INITIAL_CAPACITY);
46    we_ = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
47    uint64_t timeout = ffrt_task_timeout_get_threshold() * TIME_CONVERT_UNIT;
48    if (timeout < MIN_TIMEOUT_THRESHOLD_US) {
49        timeoutUs_ = 0;
50        FFRT_LOGE("failed to setup watchdog because [%llu] us less than precision threshold", timeout);
51        return;
52    }
53    timeoutUs_ = timeout;
54    SendDelayedWorker(GetDelayedTimeStamp(timeoutUs_));
55    FFRT_LOGI("queue monitor ctor leave, watchdog timeout %llu us", timeoutUs_);
56}
57
58QueueMonitor::~QueueMonitor()
59{
60    exit_.store(true);
61    FFRT_LOGI("destruction of QueueMonitor enter");
62    int tryCnt = DESTRUCT_TRY_COUNT;
63    // 取消定时器成功,或者中断了发送定时器,则释放we完成析构
64    while (!DelayedRemove(we_->tp, we_) && !abortSendTimer_.load()) {
65        if (--tryCnt < 0) {
66            break;
67        }
68        usleep(MIN_TIMEOUT_THRESHOLD_US);
69    }
70    SimpleAllocator<WaitUntilEntry>::FreeMem(we_);
71    FFRT_LOGI("destruction of QueueMonitor leave");
72}
73
74QueueMonitor& QueueMonitor::GetInstance()
75{
76    static QueueMonitor instance;
77    return instance;
78}
79
80void QueueMonitor::RegisterQueueId(uint32_t queueId, QueueHandler* queueStruct)
81{
82    std::unique_lock lock(mutex_);
83    if (queueId == queuesRunningInfo_.size()) {
84        queuesRunningInfo_.emplace_back(std::make_pair(INVALID_TASK_ID, std::chrono::steady_clock::now()));
85        queuesStructInfo_.emplace_back(queueStruct);
86        lastReportedTask_.emplace_back(INVALID_TASK_ID);
87        FFRT_LOGD("queue registration in monitor gid=%u in turn succ", queueId);
88        return;
89    }
90
91    // only need to ensure that the corresponding info index has been initialized after constructed.
92    if (queueId > queuesRunningInfo_.size()) {
93        for (uint32_t i = queuesRunningInfo_.size(); i <= queueId; ++i) {
94            queuesRunningInfo_.emplace_back(std::make_pair(INVALID_TASK_ID, std::chrono::steady_clock::now()));
95            queuesStructInfo_.emplace_back(nullptr);
96            lastReportedTask_.emplace_back(INVALID_TASK_ID);
97        }
98        queuesStructInfo_[queueId] = queueStruct;
99    }
100    if (queuesStructInfo_[queueId] == nullptr) {
101        queuesStructInfo_[queueId] = queueStruct;
102    }
103    FFRT_LOGD("queue registration in monitor gid=%u by skip succ", queueId);
104}
105
106void QueueMonitor::ResetQueueInfo(uint32_t queueId)
107{
108    std::shared_lock lock(mutex_);
109    FFRT_COND_DO_ERR((queuesRunningInfo_.size() <= queueId), return,
110        "ResetQueueInfo queueId=%u access violation, RunningInfo_.size=%u", queueId, queuesRunningInfo_.size());
111    queuesRunningInfo_[queueId].first = INVALID_TASK_ID;
112    lastReportedTask_[queueId] = INVALID_TASK_ID;
113}
114
115void QueueMonitor::ResetQueueStruct(uint32_t queueId)
116{
117    std::shared_lock lock(mutex_);
118    FFRT_COND_DO_ERR((queuesStructInfo_.size() <= queueId), return,
119        "ResetQueueStruct queueId=%u access violation, StructInfo_.size=%u", queueId, queuesStructInfo_.size());
120    queuesStructInfo_[queueId] = nullptr;
121}
122
123void QueueMonitor::UpdateQueueInfo(uint32_t queueId, const uint64_t &taskId)
124{
125    std::shared_lock lock(mutex_);
126    FFRT_COND_DO_ERR((queuesRunningInfo_.size() <= queueId), return,
127        "UpdateQueueInfo queueId=%u access violation, RunningInfo_.size=%u", queueId, queuesRunningInfo_.size());
128    TimePoint now = std::chrono::steady_clock::now();
129    queuesRunningInfo_[queueId] = {taskId, now};
130    if (exit_.exchange(false)) {
131        abortSendTimer_.store(false);
132        SendDelayedWorker(now + std::chrono::microseconds(timeoutUs_));
133    }
134}
135
136uint64_t QueueMonitor::QueryQueueStatus(uint32_t queueId)
137{
138    std::shared_lock lock(mutex_);
139    FFRT_COND_DO_ERR((queuesRunningInfo_.size() <= queueId), return INVALID_TASK_ID,
140        "QueryQueueStatus queueId=%u access violation, RunningInfo_.size=%u", queueId, queuesRunningInfo_.size());
141    return queuesRunningInfo_[queueId].first;
142}
143
144void QueueMonitor::SendDelayedWorker(TimePoint delay)
145{
146    FFRT_COND_DO_ERR(exit_.load(), abortSendTimer_.store(true);
147        return;,
148        "exit_.load() is true");
149
150    we_->tp = delay;
151    we_->cb = ([this](WaitEntry* we_) { CheckQueuesStatus(); });
152
153    bool result = DelayedWakeup(we_->tp, we_, we_->cb);
154    // insurance mechanism, generally does not fail
155    while (!result) {
156        FFRT_LOGW("failed to set delayedworker because the given timestamp has passed");
157        we_->tp = GetDelayedTimeStamp(ALLOW_TIME_ACC_ERROR_US);
158        result = DelayedWakeup(we_->tp, we_, we_->cb);
159    }
160}
161
162void QueueMonitor::ResetTaskTimestampAfterWarning(uint32_t queueId, const uint64_t &taskId)
163{
164    std::unique_lock lock(mutex_);
165    if (queuesRunningInfo_[queueId].first == taskId) {
166        queuesRunningInfo_[queueId].second += std::chrono::microseconds(timeoutUs_);
167    }
168}
169
170void QueueMonitor::CheckQueuesStatus()
171{
172    {
173        std::unique_lock lock(mutex_);
174        auto iter = std::find_if(queuesRunningInfo_.cbegin(), queuesRunningInfo_.cend(),
175            [](const auto& pair) { return pair.first != INVALID_TASK_ID; });
176        if (iter == queuesRunningInfo_.cend()) {
177            exit_ = true;
178            return;
179        }
180    }
181
182    TimePoint oldestStartedTime = std::chrono::steady_clock::now();
183    TimePoint startThreshold = oldestStartedTime - std::chrono::microseconds(timeoutUs_ - ALLOW_TIME_ACC_ERROR_US);
184    uint64_t taskId = 0;
185    uint32_t queueRunningInfoSize = 0;
186    TimePoint taskTimestamp = oldestStartedTime;
187    {
188        std::shared_lock lock(mutex_);
189        queueRunningInfoSize = queuesRunningInfo_.size();
190    }
191
192    // Displays information about queues that hold locks for a long time.
193    for (uint32_t i = 0; i < queueRunningInfoSize; ++i) {
194        if (queuesStructInfo_[i] == nullptr || queuesStructInfo_[i]->GetQueue() == nullptr) {
195            continue;
196        }
197
198        if (!queuesStructInfo_[i]->GetQueue()->HasLock() || !queuesStructInfo_[i]->GetQueue()->IsLockTimeout()) {
199            continue;
200        }
201
202        queuesStructInfo_[i]->GetQueue()->PrintMutexOwner();
203    }
204
205    // Displays information about queues whose tasks time out.
206    for (uint32_t i = 0; i < queueRunningInfoSize; ++i) {
207        {
208            std::unique_lock lock(mutex_);
209            taskId = queuesRunningInfo_[i].first;
210            taskTimestamp = queuesRunningInfo_[i].second;
211        }
212
213        if (taskId == INVALID_TASK_ID) {
214            continue;
215        }
216
217        if (taskTimestamp < startThreshold) {
218            std::stringstream ss;
219            char processName[PROCESS_NAME_BUFFER_LENGTH];
220            GetProcessName(processName, PROCESS_NAME_BUFFER_LENGTH);
221            ss << "Serial_Queue_Timeout, process name:[" << processName << "], serial queue qid:[" << i
222                << "], serial task gid:[" << taskId << "], execution:[" << timeoutUs_ << "] us.";
223            if (queuesStructInfo_[i] != nullptr) {
224                ss << queuesStructInfo_[i]->GetDfxInfo();
225            }
226            FFRT_LOGE("%s", ss.str().c_str());
227#ifdef FFRT_SEND_EVENT
228            if (lastReportedTask_[i] != taskId) {
229                lastReportedTask_[i] = taskId;
230                std::string processNameStr = std::string(processName);
231                std::string senarioName = "Serial_Queue_Timeout";
232                TaskTimeoutReport(ss, processNameStr, senarioName);
233            }
234#endif
235            ffrt_task_timeout_cb func = ffrt_task_timeout_get_cb();
236            if (func) {
237                func(taskId, ss.str().c_str(), ss.str().size());
238            }
239            // reset timeout task timestamp for next warning
240            ResetTaskTimestampAfterWarning(i, taskId);
241            continue;
242        }
243
244        if (taskTimestamp < oldestStartedTime) {
245            oldestStartedTime = taskTimestamp;
246        }
247    }
248
249    SendDelayedWorker(oldestStartedTime + std::chrono::microseconds(timeoutUs_));
250    FFRT_LOGD("global watchdog completed queue status check and scheduled the next");
251}
252
253bool QueueMonitor::HasQueueActive()
254{
255    std::unique_lock lock(mutex_);
256    for (uint32_t i = 0; i < queuesRunningInfo_.size(); ++i) {
257        if (queuesRunningInfo_[i].first != INVALID_TASK_ID) {
258            return true;
259        }
260    }
261    return false;
262}
263} // namespace ffrt
264