1/* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15#include "queue_monitor.h" 16#include <sstream> 17#include "dfx/log/ffrt_log_api.h" 18#include "util/slab.h" 19#include "sync/sync.h" 20#include "c/ffrt_dump.h" 21#include "dfx/sysevent/sysevent.h" 22#include "internal_inc/osal.h" 23 24namespace { 25constexpr int PROCESS_NAME_BUFFER_LENGTH = 1024; 26constexpr uint32_t INVALID_TASK_ID = 0; 27constexpr uint32_t TIME_CONVERT_UNIT = 1000; 28constexpr uint64_t QUEUE_INFO_INITIAL_CAPACITY = 64; 29constexpr uint64_t ALLOW_TIME_ACC_ERROR_US = 500; 30constexpr uint64_t MIN_TIMEOUT_THRESHOLD_US = 1000; 31constexpr uint64_t DESTRUCT_TRY_COUNT = 100; 32 33inline std::chrono::steady_clock::time_point GetDelayedTimeStamp(uint64_t delayUs) 34{ 35 return std::chrono::steady_clock::now() + std::chrono::microseconds(delayUs); 36} 37} 38 39namespace ffrt { 40QueueMonitor::QueueMonitor() 41{ 42 FFRT_LOGI("queue monitor ctor enter"); 43 queuesRunningInfo_.reserve(QUEUE_INFO_INITIAL_CAPACITY); 44 queuesStructInfo_.reserve(QUEUE_INFO_INITIAL_CAPACITY); 45 lastReportedTask_.reserve(QUEUE_INFO_INITIAL_CAPACITY); 46 we_ = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry(); 47 uint64_t timeout = ffrt_task_timeout_get_threshold() * TIME_CONVERT_UNIT; 48 if (timeout < MIN_TIMEOUT_THRESHOLD_US) { 49 timeoutUs_ = 0; 50 FFRT_LOGE("failed to setup watchdog because [%llu] us less than precision threshold", timeout); 51 return; 52 } 53 timeoutUs_ = timeout; 54 SendDelayedWorker(GetDelayedTimeStamp(timeoutUs_)); 55 FFRT_LOGI("queue monitor ctor leave, watchdog timeout %llu us", timeoutUs_); 56} 57 58QueueMonitor::~QueueMonitor() 59{ 60 exit_.store(true); 61 FFRT_LOGI("destruction of QueueMonitor enter"); 62 int tryCnt = DESTRUCT_TRY_COUNT; 63 // 取消定时器成功,或者中断了发送定时器,则释放we完成析构 64 while (!DelayedRemove(we_->tp, we_) && !abortSendTimer_.load()) { 65 if (--tryCnt < 0) { 66 break; 67 } 68 usleep(MIN_TIMEOUT_THRESHOLD_US); 69 } 70 SimpleAllocator<WaitUntilEntry>::FreeMem(we_); 71 FFRT_LOGI("destruction of QueueMonitor leave"); 72} 73 74QueueMonitor& QueueMonitor::GetInstance() 75{ 76 static QueueMonitor instance; 77 return instance; 78} 79 80void QueueMonitor::RegisterQueueId(uint32_t queueId, QueueHandler* queueStruct) 81{ 82 std::unique_lock lock(mutex_); 83 if (queueId == queuesRunningInfo_.size()) { 84 queuesRunningInfo_.emplace_back(std::make_pair(INVALID_TASK_ID, std::chrono::steady_clock::now())); 85 queuesStructInfo_.emplace_back(queueStruct); 86 lastReportedTask_.emplace_back(INVALID_TASK_ID); 87 FFRT_LOGD("queue registration in monitor gid=%u in turn succ", queueId); 88 return; 89 } 90 91 // only need to ensure that the corresponding info index has been initialized after constructed. 92 if (queueId > queuesRunningInfo_.size()) { 93 for (uint32_t i = queuesRunningInfo_.size(); i <= queueId; ++i) { 94 queuesRunningInfo_.emplace_back(std::make_pair(INVALID_TASK_ID, std::chrono::steady_clock::now())); 95 queuesStructInfo_.emplace_back(nullptr); 96 lastReportedTask_.emplace_back(INVALID_TASK_ID); 97 } 98 queuesStructInfo_[queueId] = queueStruct; 99 } 100 if (queuesStructInfo_[queueId] == nullptr) { 101 queuesStructInfo_[queueId] = queueStruct; 102 } 103 FFRT_LOGD("queue registration in monitor gid=%u by skip succ", queueId); 104} 105 106void QueueMonitor::ResetQueueInfo(uint32_t queueId) 107{ 108 std::shared_lock lock(mutex_); 109 FFRT_COND_DO_ERR((queuesRunningInfo_.size() <= queueId), return, 110 "ResetQueueInfo queueId=%u access violation, RunningInfo_.size=%u", queueId, queuesRunningInfo_.size()); 111 queuesRunningInfo_[queueId].first = INVALID_TASK_ID; 112 lastReportedTask_[queueId] = INVALID_TASK_ID; 113} 114 115void QueueMonitor::ResetQueueStruct(uint32_t queueId) 116{ 117 std::shared_lock lock(mutex_); 118 FFRT_COND_DO_ERR((queuesStructInfo_.size() <= queueId), return, 119 "ResetQueueStruct queueId=%u access violation, StructInfo_.size=%u", queueId, queuesStructInfo_.size()); 120 queuesStructInfo_[queueId] = nullptr; 121} 122 123void QueueMonitor::UpdateQueueInfo(uint32_t queueId, const uint64_t &taskId) 124{ 125 std::shared_lock lock(mutex_); 126 FFRT_COND_DO_ERR((queuesRunningInfo_.size() <= queueId), return, 127 "UpdateQueueInfo queueId=%u access violation, RunningInfo_.size=%u", queueId, queuesRunningInfo_.size()); 128 TimePoint now = std::chrono::steady_clock::now(); 129 queuesRunningInfo_[queueId] = {taskId, now}; 130 if (exit_.exchange(false)) { 131 abortSendTimer_.store(false); 132 SendDelayedWorker(now + std::chrono::microseconds(timeoutUs_)); 133 } 134} 135 136uint64_t QueueMonitor::QueryQueueStatus(uint32_t queueId) 137{ 138 std::shared_lock lock(mutex_); 139 FFRT_COND_DO_ERR((queuesRunningInfo_.size() <= queueId), return INVALID_TASK_ID, 140 "QueryQueueStatus queueId=%u access violation, RunningInfo_.size=%u", queueId, queuesRunningInfo_.size()); 141 return queuesRunningInfo_[queueId].first; 142} 143 144void QueueMonitor::SendDelayedWorker(TimePoint delay) 145{ 146 FFRT_COND_DO_ERR(exit_.load(), abortSendTimer_.store(true); 147 return;, 148 "exit_.load() is true"); 149 150 we_->tp = delay; 151 we_->cb = ([this](WaitEntry* we_) { CheckQueuesStatus(); }); 152 153 bool result = DelayedWakeup(we_->tp, we_, we_->cb); 154 // insurance mechanism, generally does not fail 155 while (!result) { 156 FFRT_LOGW("failed to set delayedworker because the given timestamp has passed"); 157 we_->tp = GetDelayedTimeStamp(ALLOW_TIME_ACC_ERROR_US); 158 result = DelayedWakeup(we_->tp, we_, we_->cb); 159 } 160} 161 162void QueueMonitor::ResetTaskTimestampAfterWarning(uint32_t queueId, const uint64_t &taskId) 163{ 164 std::unique_lock lock(mutex_); 165 if (queuesRunningInfo_[queueId].first == taskId) { 166 queuesRunningInfo_[queueId].second += std::chrono::microseconds(timeoutUs_); 167 } 168} 169 170void QueueMonitor::CheckQueuesStatus() 171{ 172 { 173 std::unique_lock lock(mutex_); 174 auto iter = std::find_if(queuesRunningInfo_.cbegin(), queuesRunningInfo_.cend(), 175 [](const auto& pair) { return pair.first != INVALID_TASK_ID; }); 176 if (iter == queuesRunningInfo_.cend()) { 177 exit_ = true; 178 return; 179 } 180 } 181 182 TimePoint oldestStartedTime = std::chrono::steady_clock::now(); 183 TimePoint startThreshold = oldestStartedTime - std::chrono::microseconds(timeoutUs_ - ALLOW_TIME_ACC_ERROR_US); 184 uint64_t taskId = 0; 185 uint32_t queueRunningInfoSize = 0; 186 TimePoint taskTimestamp = oldestStartedTime; 187 { 188 std::shared_lock lock(mutex_); 189 queueRunningInfoSize = queuesRunningInfo_.size(); 190 } 191 192 // Displays information about queues that hold locks for a long time. 193 for (uint32_t i = 0; i < queueRunningInfoSize; ++i) { 194 if (queuesStructInfo_[i] == nullptr || queuesStructInfo_[i]->GetQueue() == nullptr) { 195 continue; 196 } 197 198 if (!queuesStructInfo_[i]->GetQueue()->HasLock() || !queuesStructInfo_[i]->GetQueue()->IsLockTimeout()) { 199 continue; 200 } 201 202 queuesStructInfo_[i]->GetQueue()->PrintMutexOwner(); 203 } 204 205 // Displays information about queues whose tasks time out. 206 for (uint32_t i = 0; i < queueRunningInfoSize; ++i) { 207 { 208 std::unique_lock lock(mutex_); 209 taskId = queuesRunningInfo_[i].first; 210 taskTimestamp = queuesRunningInfo_[i].second; 211 } 212 213 if (taskId == INVALID_TASK_ID) { 214 continue; 215 } 216 217 if (taskTimestamp < startThreshold) { 218 std::stringstream ss; 219 char processName[PROCESS_NAME_BUFFER_LENGTH]; 220 GetProcessName(processName, PROCESS_NAME_BUFFER_LENGTH); 221 ss << "Serial_Queue_Timeout, process name:[" << processName << "], serial queue qid:[" << i 222 << "], serial task gid:[" << taskId << "], execution:[" << timeoutUs_ << "] us."; 223 if (queuesStructInfo_[i] != nullptr) { 224 ss << queuesStructInfo_[i]->GetDfxInfo(); 225 } 226 FFRT_LOGE("%s", ss.str().c_str()); 227#ifdef FFRT_SEND_EVENT 228 if (lastReportedTask_[i] != taskId) { 229 lastReportedTask_[i] = taskId; 230 std::string processNameStr = std::string(processName); 231 std::string senarioName = "Serial_Queue_Timeout"; 232 TaskTimeoutReport(ss, processNameStr, senarioName); 233 } 234#endif 235 ffrt_task_timeout_cb func = ffrt_task_timeout_get_cb(); 236 if (func) { 237 func(taskId, ss.str().c_str(), ss.str().size()); 238 } 239 // reset timeout task timestamp for next warning 240 ResetTaskTimestampAfterWarning(i, taskId); 241 continue; 242 } 243 244 if (taskTimestamp < oldestStartedTime) { 245 oldestStartedTime = taskTimestamp; 246 } 247 } 248 249 SendDelayedWorker(oldestStartedTime + std::chrono::microseconds(timeoutUs_)); 250 FFRT_LOGD("global watchdog completed queue status check and scheduled the next"); 251} 252 253bool QueueMonitor::HasQueueActive() 254{ 255 std::unique_lock lock(mutex_); 256 for (uint32_t i = 0; i < queuesRunningInfo_.size(); ++i) { 257 if (queuesRunningInfo_[i].first != INVALID_TASK_ID) { 258 return true; 259 } 260 } 261 return false; 262} 263} // namespace ffrt 264