1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "queue_handler.h"
16 #include <sstream>
17 #include "dfx/log/ffrt_log_api.h"
18 #include "dfx/trace_record/ffrt_trace_record.h"
19 #include "queue_monitor.h"
20 #include "util/event_handler_adapter.h"
21 #include "util/ffrt_facade.h"
22 #include "util/slab.h"
23 #include "tm/queue_task.h"
24 #include "concurrent_queue.h"
25 #include "eventhandler_adapter_queue.h"
26 #include "sched/scheduler.h"
27
28 namespace {
29 constexpr int PROCESS_NAME_BUFFER_LENGTH = 1024;
30 constexpr uint32_t STRING_SIZE_MAX = 128;
31 constexpr uint32_t TASK_DONE_WAIT_UNIT = 10;
32 }
33
34 namespace ffrt {
QueueHandler(const char* name, const ffrt_queue_attr_t* attr, const int type)35 QueueHandler::QueueHandler(const char* name, const ffrt_queue_attr_t* attr, const int type)
36 {
37 // parse queue attribute
38 if (attr) {
39 qos_ = (ffrt_queue_attr_get_qos(attr) >= ffrt_qos_background) ? ffrt_queue_attr_get_qos(attr) : qos_;
40 timeout_ = ffrt_queue_attr_get_timeout(attr);
41 timeoutCb_ = ffrt_queue_attr_get_callback(attr);
42 }
43
44 // callback reference counting is to ensure life cycle
45 if (timeout_ > 0 && timeoutCb_ != nullptr) {
46 QueueTask* cbTask = GetQueueTaskByFuncStorageOffset(timeoutCb_);
47 cbTask->IncDeleteRef();
48 }
49
50 queue_ = CreateQueue(type, attr);
51 FFRT_COND_DO_ERR((queue_ == nullptr), return, "[queueId=%u] constructed failed", GetQueueId());
52
53 if (name != nullptr && std::string(name).size() <= STRING_SIZE_MAX) {
54 name_ = "sq_" + std::string(name) + "_" + std::to_string(GetQueueId());
55 } else {
56 name_ += "sq_unnamed_" + std::to_string(GetQueueId());
57 FFRT_LOGW("failed to set [queueId=%u] name due to invalid name or length.", GetQueueId());
58 }
59
60 QueueMonitor::GetInstance().RegisterQueueId(GetQueueId(), this);
61 FFRT_LOGI("construct %s succ, qos[%d]", name_.c_str(), qos_);
62 }
63
~QueueHandler()64 QueueHandler::~QueueHandler()
65 {
66 FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot destruct, [queueId=%u] constructed failed", GetQueueId());
67 FFRT_LOGI("destruct %s enter", name_.c_str());
68 // clear tasks in queue
69 queue_->Stop();
70 while (QueueMonitor::GetInstance().QueryQueueStatus(GetQueueId()) || queue_->GetActiveStatus()) {
71 std::this_thread::sleep_for(std::chrono::microseconds(TASK_DONE_WAIT_UNIT));
72 }
73 QueueMonitor::GetInstance().ResetQueueStruct(GetQueueId());
74
75 // release callback resource
76 if (timeout_ > 0) {
77 // wait for all delayedWorker to complete.
78 while (delayedCbCnt_.load() > 0) {
79 this_task::sleep_for(std::chrono::microseconds(timeout_));
80 }
81
82 if (timeoutCb_ != nullptr) {
83 QueueTask* cbTask = GetQueueTaskByFuncStorageOffset(timeoutCb_);
84 cbTask->DecDeleteRef();
85 }
86 }
87 FFRT_LOGI("destruct %s leave", name_.c_str());
88 }
89
SetLoop(Loop* loop)90 bool QueueHandler::SetLoop(Loop* loop)
91 {
92 FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueId=%u] constructed failed", GetQueueId());
93 if (queue_->GetQueueType() == ffrt_queue_eventhandler_interactive) {
94 return true;
95 }
96 FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_concurrent),
97 return false, "[queueId=%u] type invalid", GetQueueId());
98 return reinterpret_cast<ConcurrentQueue*>(queue_.get())->SetLoop(loop);
99 }
100
ClearLoop()101 bool QueueHandler::ClearLoop()
102 {
103 FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueId=%u] constructed failed", GetQueueId());
104 FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_concurrent),
105 return false, "[queueId=%u] type invalid", GetQueueId());
106 return reinterpret_cast<ConcurrentQueue*>(queue_.get())->ClearLoop();
107 }
108
PickUpTask()109 QueueTask* QueueHandler::PickUpTask()
110 {
111 FFRT_COND_DO_ERR((queue_ == nullptr), return nullptr, "[queueId=%u] constructed failed", GetQueueId());
112 return queue_->Pull();
113 }
114
Submit(QueueTask* task)115 void QueueHandler::Submit(QueueTask* task)
116 {
117 FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot submit, [queueId=%u] constructed failed", GetQueueId());
118 FFRT_COND_DO_ERR((task == nullptr), return, "input invalid, serial task is nullptr");
119
120 // if qos not specified, qos of the queue is inherited by task
121 if (task->GetQos() == qos_inherit || task->GetQos() == qos_default) {
122 task->SetQos(qos_);
123 }
124
125 uint64_t gid = task->gid;
126 FFRT_SERIAL_QUEUE_TASK_SUBMIT_MARKER(GetQueueId(), gid);
127 FFRTTraceRecord::TaskSubmit(&(task->createTime), &(task->fromTid));
128 #if (FFRT_TRACE_RECORD_LEVEL < FFRT_TRACE_RECORD_LEVEL_1)
129 if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
130 task->fromTid = ExecuteCtx::Cur()->tid;
131 }
132 #endif
133 int ret = queue_->Push(task);
134 if (ret == SUCC) {
135 FFRT_LOGD("submit task[%lu] into %s", gid, name_.c_str());
136 return;
137 }
138 if (ret == FAILED) {
139 return;
140 }
141
142 if (!isUsed_.load()) {
143 isUsed_.store(true);
144 }
145
146 // activate queue
147 if (task->GetDelay() == 0) {
148 FFRT_LOGD("task [%llu] activate %s", gid, name_.c_str());
149 TransferTask(task);
150 } else {
151 FFRT_LOGD("task [%llu] with delay [%llu] activate %s", gid, task->GetDelay(), name_.c_str());
152 if (ret == INACTIVE) {
153 queue_->Push(task);
154 }
155 TransferInitTask();
156 }
157 }
158
Cancel()159 void QueueHandler::Cancel()
160 {
161 FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot cancel, [queueId=%u] constructed failed", GetQueueId());
162 queue_->Remove();
163 }
164
CancelAndWait()165 void QueueHandler::CancelAndWait()
166 {
167 FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot cancelAndWait, [queueId=%u] constructed failed",
168 GetQueueId());
169 queue_->Remove();
170 while (QueueMonitor::GetInstance().QueryQueueStatus(GetQueueId()) != 0 || queue_->GetActiveStatus()) {
171 std::this_thread::sleep_for(std::chrono::microseconds(TASK_DONE_WAIT_UNIT));
172 }
173 }
174
Cancel(const char* name)175 int QueueHandler::Cancel(const char* name)
176 {
177 FFRT_COND_DO_ERR((queue_ == nullptr), return INACTIVE,
178 "cannot cancel, [queueId=%u] constructed failed", GetQueueId());
179 int ret = queue_->Remove(name);
180 if (ret != SUCC) {
181 FFRT_LOGD("cancel task %s failed, task may have been executed", name);
182 }
183
184 return ret;
185 }
186
Cancel(QueueTask* task)187 int QueueHandler::Cancel(QueueTask* task)
188 {
189 FFRT_COND_DO_ERR((queue_ == nullptr), return INACTIVE,
190 "cannot cancel, [queueId=%u] constructed failed", GetQueueId());
191 FFRT_COND_DO_ERR((task == nullptr), return INACTIVE, "input invalid, serial task is nullptr");
192
193 int ret = queue_->Remove(task);
194 if (ret == SUCC) {
195 FFRT_LOGD("cancel task[%llu] %s succ", task->gid, task->label.c_str());
196 task->Notify();
197 task->Destroy();
198 } else {
199 FFRT_LOGD("cancel task[%llu] %s failed, task may have been executed", task->gid, task->label.c_str());
200 }
201 return ret;
202 }
203
Dispatch(QueueTask* inTask)204 void QueueHandler::Dispatch(QueueTask* inTask)
205 {
206 QueueTask* nextTask = nullptr;
207 for (QueueTask* task = inTask; task != nullptr; task = nextTask) {
208 // dfx watchdog
209 SetTimeoutMonitor(task);
210 QueueMonitor::GetInstance().UpdateQueueInfo(GetQueueId(), task->gid);
211
212 // run user task
213 FFRT_LOGD("run task [gid=%llu], queueId=%u", task->gid, GetQueueId());
214 auto f = reinterpret_cast<ffrt_function_header_t*>(task->func_storage);
215 FFRT_SERIAL_QUEUE_TASK_EXECUTE_MARKER(task->gid);
216 FFRTTraceRecord::TaskExecute(&(task->executeTime));
217 uint64_t triggerTime{0};
218 if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
219 triggerTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(
220 std::chrono::steady_clock::now().time_since_epoch()).count());
221 }
222
223 f->exec(f);
224 FFRTTraceRecord::TaskDone<ffrt_queue_task>(task->GetQos(), task);
225 if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
226 uint64_t completeTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(
227 std::chrono::steady_clock::now().time_since_epoch()).count());
228 reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->PushHistoryTask(task, triggerTime, completeTime);
229 }
230
231 f->destroy(f);
232 task->Notify();
233 RemoveTimeoutMonitor(task);
234
235 // run task batch
236 nextTask = task->GetNextTask();
237 if (nextTask == nullptr) {
238 QueueMonitor::GetInstance().ResetQueueInfo(GetQueueId());
239 if (!queue_->IsOnLoop()) {
240 Deliver();
241 }
242 }
243 task->DecDeleteRef();
244 }
245 }
246
Deliver()247 void QueueHandler::Deliver()
248 {
249 QueueTask* task = queue_->Pull();
250 if (task != nullptr) {
251 TransferTask(task);
252 }
253 }
254
TransferTask(QueueTask* task)255 void QueueHandler::TransferTask(QueueTask* task)
256 {
257 auto entry = &task->fq_we;
258 if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
259 reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->SetCurrentRunningTask(task);
260 }
261 FFRTScheduler* sch = FFRTFacade::GetSchedInstance();
262 FFRT_READY_MARKER(task->gid); // ffrt queue task ready to enque
263 if (!sch->InsertNode(&entry->node, task->GetQos())) {
264 FFRT_LOGE("failed to insert task [%llu] into %s", task->gid, name_.c_str());
265 return;
266 }
267 }
268
TransferInitTask()269 void QueueHandler::TransferInitTask()
270 {
271 std::function<void()> initFunc = [] {};
272 auto f = create_function_wrapper(initFunc, ffrt_function_kind_queue);
273 QueueTask* initTask = GetQueueTaskByFuncStorageOffset(f);
274 new (initTask)ffrt::QueueTask(this);
275 initTask->SetQos(qos_);
276 TransferTask(initTask);
277 }
278
SetTimeoutMonitor(QueueTask* task)279 void QueueHandler::SetTimeoutMonitor(QueueTask* task)
280 {
281 if (timeout_ <= 0) {
282 return;
283 }
284
285 task->IncDeleteRef();
286 timeoutWe_ = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
287 // set delayed worker callback
288 timeoutWe_->cb = ([this, task](WaitEntry* timeoutWe_) {
289 if (!task->GetFinishStatus()) {
290 RunTimeOutCallback(task);
291 }
292 delayedCbCnt_.fetch_sub(1);
293 task->DecDeleteRef();
294 SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(timeoutWe_));
295 });
296
297 // set delayed worker wakeup time
298 std::chrono::microseconds timeout(timeout_);
299 auto now = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
300 timeoutWe_->tp = std::chrono::time_point_cast<std::chrono::steady_clock::duration>(now + timeout);
301
302 if (!DelayedWakeup(timeoutWe_->tp, timeoutWe_, timeoutWe_->cb)) {
303 task->DecDeleteRef();
304 SimpleAllocator<WaitUntilEntry>::FreeMem(timeoutWe_);
305 FFRT_LOGW("failed to set watchdog for task gid=%llu in %s with timeout [%llu us] ", task->gid,
306 name_.c_str(), timeout_);
307 return;
308 }
309
310 delayedCbCnt_.fetch_add(1);
311 FFRT_LOGD("set watchdog of task gid=%llu of %s succ", task->gid, name_.c_str());
312 }
313
RemoveTimeoutMonitor(QueueTask* task)314 void QueueHandler::RemoveTimeoutMonitor(QueueTask* task)
315 {
316 if (timeout_ <= 0) {
317 return;
318 }
319
320 WaitEntry* dwe = static_cast<WaitEntry*>(timeoutWe_);
321 if (DelayedRemove(timeoutWe_->tp, dwe)) {
322 delayedCbCnt_.fetch_sub(1);
323 SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(timeoutWe_));
324 }
325 return;
326 }
327
RunTimeOutCallback(QueueTask* task)328 void QueueHandler::RunTimeOutCallback(QueueTask* task)
329 {
330 std::stringstream ss;
331 static std::once_flag flag;
332 static char processName[PROCESS_NAME_BUFFER_LENGTH];
333 std::call_once(flag, []() {
334 GetProcessName(processName, PROCESS_NAME_BUFFER_LENGTH);
335 });
336 std::string processNameStr = std::string(processName);
337 ss << "[Serial_Queue_Timeout_Callback] process name:[" << processNameStr << "], serial queue:[" <<
338 name_ << "], queueId:[" << GetQueueId() << "], serial task gid:[" << task->gid << "], task name:["
339 << task->label << "], execution time exceeds[" << timeout_ << "] us";
340 FFRT_LOGE("%s", ss.str().c_str());
341 if (timeoutCb_ != nullptr) {
342 timeoutCb_->exec(timeoutCb_);
343 }
344 }
345
GetDfxInfo() const346 std::string QueueHandler::GetDfxInfo() const
347 {
348 std::stringstream ss;
349 ss << " queue name [" << name_ << "]";
350 if (queue_ != nullptr) {
351 ss << ", remaining tasks count=" << queue_->GetMapSize();
352 }
353 return ss.str();
354 }
355
IsIdle()356 bool QueueHandler::IsIdle()
357 {
358 FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueId=%u] constructed failed", GetQueueId());
359 FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_eventhandler_adapter),
360 return false, "[queueId=%u] type invalid", GetQueueId());
361
362 return reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->IsIdle();
363 }
364
SetEventHandler(void* eventHandler)365 void QueueHandler::SetEventHandler(void* eventHandler)
366 {
367 FFRT_COND_DO_ERR((queue_ == nullptr), return, "[queueId=%u] constructed failed", GetQueueId());
368
369 bool typeInvalid = (queue_->GetQueueType() != ffrt_queue_eventhandler_interactive) &&
370 (queue_->GetQueueType() != ffrt_queue_eventhandler_adapter);
371 FFRT_COND_DO_ERR(typeInvalid, return, "[queueId=%u] type invalid", GetQueueId());
372
373 reinterpret_cast<EventHandlerInteractiveQueue*>(queue_.get())->SetEventHandler(eventHandler);
374 }
375
GetEventHandler()376 void* QueueHandler::GetEventHandler()
377 {
378 FFRT_COND_DO_ERR((queue_ == nullptr), return nullptr, "[queueId=%u] constructed failed", GetQueueId());
379
380 bool typeInvalid = (queue_->GetQueueType() != ffrt_queue_eventhandler_interactive) &&
381 (queue_->GetQueueType() != ffrt_queue_eventhandler_adapter);
382 FFRT_COND_DO_ERR(typeInvalid, return nullptr, "[queueId=%u] type invalid", GetQueueId());
383
384 return reinterpret_cast<EventHandlerInteractiveQueue*>(queue_.get())->GetEventHandler();
385 }
386
Dump(const char* tag, char* buf, uint32_t len, bool historyInfo)387 int QueueHandler::Dump(const char* tag, char* buf, uint32_t len, bool historyInfo)
388 {
389 FFRT_COND_DO_ERR((queue_ == nullptr), return -1, "[queueId=%u] constructed failed", GetQueueId());
390 FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_eventhandler_adapter),
391 return -1, "[queueId=%u] type invalid", GetQueueId());
392 return reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->Dump(tag, buf, len, historyInfo);
393 }
394
DumpSize(ffrt_inner_queue_priority_t priority)395 int QueueHandler::DumpSize(ffrt_inner_queue_priority_t priority)
396 {
397 FFRT_COND_DO_ERR((queue_ == nullptr), return -1, "[queueId=%u] constructed failed", GetQueueId());
398 FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_eventhandler_adapter),
399 return -1, "[queueId=%u] type invalid", GetQueueId());
400 return reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->DumpSize(priority);
401 }
402
403 } // namespace ffrt
404