1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "queue_handler.h"
16 #include <sstream>
17 #include "dfx/log/ffrt_log_api.h"
18 #include "dfx/trace_record/ffrt_trace_record.h"
19 #include "queue_monitor.h"
20 #include "util/event_handler_adapter.h"
21 #include "util/ffrt_facade.h"
22 #include "util/slab.h"
23 #include "tm/queue_task.h"
24 #include "concurrent_queue.h"
25 #include "eventhandler_adapter_queue.h"
26 #include "sched/scheduler.h"
27 
28 namespace {
29 constexpr int PROCESS_NAME_BUFFER_LENGTH = 1024;
30 constexpr uint32_t STRING_SIZE_MAX = 128;
31 constexpr uint32_t TASK_DONE_WAIT_UNIT = 10;
32 }
33 
34 namespace ffrt {
QueueHandler(const char* name, const ffrt_queue_attr_t* attr, const int type)35 QueueHandler::QueueHandler(const char* name, const ffrt_queue_attr_t* attr, const int type)
36 {
37     // parse queue attribute
38     if (attr) {
39         qos_ = (ffrt_queue_attr_get_qos(attr) >= ffrt_qos_background) ? ffrt_queue_attr_get_qos(attr) : qos_;
40         timeout_ = ffrt_queue_attr_get_timeout(attr);
41         timeoutCb_ = ffrt_queue_attr_get_callback(attr);
42     }
43 
44     // callback reference counting is to ensure life cycle
45     if (timeout_ > 0 && timeoutCb_ != nullptr) {
46         QueueTask* cbTask = GetQueueTaskByFuncStorageOffset(timeoutCb_);
47         cbTask->IncDeleteRef();
48     }
49 
50     queue_ = CreateQueue(type, attr);
51     FFRT_COND_DO_ERR((queue_ == nullptr), return, "[queueId=%u] constructed failed", GetQueueId());
52 
53     if (name != nullptr && std::string(name).size() <= STRING_SIZE_MAX) {
54         name_ = "sq_" + std::string(name) + "_" + std::to_string(GetQueueId());
55     } else {
56         name_ += "sq_unnamed_" + std::to_string(GetQueueId());
57         FFRT_LOGW("failed to set [queueId=%u] name due to invalid name or length.", GetQueueId());
58     }
59 
60     QueueMonitor::GetInstance().RegisterQueueId(GetQueueId(), this);
61     FFRT_LOGI("construct %s succ, qos[%d]", name_.c_str(), qos_);
62 }
63 
~QueueHandler()64 QueueHandler::~QueueHandler()
65 {
66     FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot destruct, [queueId=%u] constructed failed", GetQueueId());
67     FFRT_LOGI("destruct %s enter", name_.c_str());
68     // clear tasks in queue
69     queue_->Stop();
70     while (QueueMonitor::GetInstance().QueryQueueStatus(GetQueueId()) || queue_->GetActiveStatus()) {
71         std::this_thread::sleep_for(std::chrono::microseconds(TASK_DONE_WAIT_UNIT));
72     }
73     QueueMonitor::GetInstance().ResetQueueStruct(GetQueueId());
74 
75     // release callback resource
76     if (timeout_ > 0) {
77         // wait for all delayedWorker to complete.
78         while (delayedCbCnt_.load() > 0) {
79             this_task::sleep_for(std::chrono::microseconds(timeout_));
80         }
81 
82         if (timeoutCb_ != nullptr) {
83             QueueTask* cbTask = GetQueueTaskByFuncStorageOffset(timeoutCb_);
84             cbTask->DecDeleteRef();
85         }
86     }
87     FFRT_LOGI("destruct %s leave", name_.c_str());
88 }
89 
SetLoop(Loop* loop)90 bool QueueHandler::SetLoop(Loop* loop)
91 {
92     FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueId=%u] constructed failed", GetQueueId());
93     if (queue_->GetQueueType() == ffrt_queue_eventhandler_interactive) {
94         return true;
95     }
96     FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_concurrent),
97         return false, "[queueId=%u] type invalid", GetQueueId());
98     return reinterpret_cast<ConcurrentQueue*>(queue_.get())->SetLoop(loop);
99 }
100 
ClearLoop()101 bool QueueHandler::ClearLoop()
102 {
103     FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueId=%u] constructed failed", GetQueueId());
104     FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_concurrent),
105         return false, "[queueId=%u] type invalid", GetQueueId());
106     return reinterpret_cast<ConcurrentQueue*>(queue_.get())->ClearLoop();
107 }
108 
PickUpTask()109 QueueTask* QueueHandler::PickUpTask()
110 {
111     FFRT_COND_DO_ERR((queue_ == nullptr), return nullptr, "[queueId=%u] constructed failed", GetQueueId());
112     return queue_->Pull();
113 }
114 
Submit(QueueTask* task)115 void QueueHandler::Submit(QueueTask* task)
116 {
117     FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot submit, [queueId=%u] constructed failed", GetQueueId());
118     FFRT_COND_DO_ERR((task == nullptr), return, "input invalid, serial task is nullptr");
119 
120     // if qos not specified, qos of the queue is inherited by task
121     if (task->GetQos() == qos_inherit || task->GetQos() == qos_default) {
122         task->SetQos(qos_);
123     }
124 
125     uint64_t gid = task->gid;
126     FFRT_SERIAL_QUEUE_TASK_SUBMIT_MARKER(GetQueueId(), gid);
127     FFRTTraceRecord::TaskSubmit(&(task->createTime), &(task->fromTid));
128 #if (FFRT_TRACE_RECORD_LEVEL < FFRT_TRACE_RECORD_LEVEL_1)
129     if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
130         task->fromTid = ExecuteCtx::Cur()->tid;
131     }
132 #endif
133     int ret = queue_->Push(task);
134     if (ret == SUCC) {
135         FFRT_LOGD("submit task[%lu] into %s", gid, name_.c_str());
136         return;
137     }
138     if (ret == FAILED) {
139         return;
140     }
141 
142     if (!isUsed_.load()) {
143         isUsed_.store(true);
144     }
145 
146     // activate queue
147     if (task->GetDelay() == 0) {
148         FFRT_LOGD("task [%llu] activate %s", gid, name_.c_str());
149         TransferTask(task);
150     } else {
151         FFRT_LOGD("task [%llu] with delay [%llu] activate %s", gid, task->GetDelay(), name_.c_str());
152         if (ret == INACTIVE) {
153             queue_->Push(task);
154         }
155         TransferInitTask();
156     }
157 }
158 
Cancel()159 void QueueHandler::Cancel()
160 {
161     FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot cancel, [queueId=%u] constructed failed", GetQueueId());
162     queue_->Remove();
163 }
164 
CancelAndWait()165 void QueueHandler::CancelAndWait()
166 {
167     FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot cancelAndWait, [queueId=%u] constructed failed",
168         GetQueueId());
169     queue_->Remove();
170     while (QueueMonitor::GetInstance().QueryQueueStatus(GetQueueId()) != 0 || queue_->GetActiveStatus()) {
171         std::this_thread::sleep_for(std::chrono::microseconds(TASK_DONE_WAIT_UNIT));
172     }
173 }
174 
Cancel(const char* name)175 int QueueHandler::Cancel(const char* name)
176 {
177     FFRT_COND_DO_ERR((queue_ == nullptr), return INACTIVE,
178          "cannot cancel, [queueId=%u] constructed failed", GetQueueId());
179     int ret = queue_->Remove(name);
180     if (ret != SUCC) {
181         FFRT_LOGD("cancel task %s failed, task may have been executed", name);
182     }
183 
184     return ret;
185 }
186 
Cancel(QueueTask* task)187 int QueueHandler::Cancel(QueueTask* task)
188 {
189     FFRT_COND_DO_ERR((queue_ == nullptr), return INACTIVE,
190          "cannot cancel, [queueId=%u] constructed failed", GetQueueId());
191     FFRT_COND_DO_ERR((task == nullptr), return INACTIVE, "input invalid, serial task is nullptr");
192 
193     int ret = queue_->Remove(task);
194     if (ret == SUCC) {
195         FFRT_LOGD("cancel task[%llu] %s succ", task->gid, task->label.c_str());
196         task->Notify();
197         task->Destroy();
198     } else {
199         FFRT_LOGD("cancel task[%llu] %s failed, task may have been executed", task->gid, task->label.c_str());
200     }
201     return ret;
202 }
203 
Dispatch(QueueTask* inTask)204 void QueueHandler::Dispatch(QueueTask* inTask)
205 {
206     QueueTask* nextTask = nullptr;
207     for (QueueTask* task = inTask; task != nullptr; task = nextTask) {
208         // dfx watchdog
209         SetTimeoutMonitor(task);
210         QueueMonitor::GetInstance().UpdateQueueInfo(GetQueueId(), task->gid);
211 
212         // run user task
213         FFRT_LOGD("run task [gid=%llu], queueId=%u", task->gid, GetQueueId());
214         auto f = reinterpret_cast<ffrt_function_header_t*>(task->func_storage);
215         FFRT_SERIAL_QUEUE_TASK_EXECUTE_MARKER(task->gid);
216         FFRTTraceRecord::TaskExecute(&(task->executeTime));
217         uint64_t triggerTime{0};
218         if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
219             triggerTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(
220                 std::chrono::steady_clock::now().time_since_epoch()).count());
221         }
222 
223         f->exec(f);
224         FFRTTraceRecord::TaskDone<ffrt_queue_task>(task->GetQos(), task);
225         if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
226             uint64_t completeTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(
227                 std::chrono::steady_clock::now().time_since_epoch()).count());
228             reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->PushHistoryTask(task, triggerTime, completeTime);
229         }
230 
231         f->destroy(f);
232         task->Notify();
233         RemoveTimeoutMonitor(task);
234 
235         // run task batch
236         nextTask = task->GetNextTask();
237         if (nextTask == nullptr) {
238             QueueMonitor::GetInstance().ResetQueueInfo(GetQueueId());
239             if (!queue_->IsOnLoop()) {
240                 Deliver();
241             }
242         }
243         task->DecDeleteRef();
244     }
245 }
246 
Deliver()247 void QueueHandler::Deliver()
248 {
249     QueueTask* task = queue_->Pull();
250     if (task != nullptr) {
251         TransferTask(task);
252     }
253 }
254 
TransferTask(QueueTask* task)255 void QueueHandler::TransferTask(QueueTask* task)
256 {
257     auto entry = &task->fq_we;
258     if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
259         reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->SetCurrentRunningTask(task);
260     }
261     FFRTScheduler* sch = FFRTFacade::GetSchedInstance();
262     FFRT_READY_MARKER(task->gid); // ffrt queue task ready to enque
263     if (!sch->InsertNode(&entry->node, task->GetQos())) {
264         FFRT_LOGE("failed to insert task [%llu] into %s", task->gid, name_.c_str());
265         return;
266     }
267 }
268 
TransferInitTask()269 void QueueHandler::TransferInitTask()
270 {
271     std::function<void()> initFunc = [] {};
272     auto f = create_function_wrapper(initFunc, ffrt_function_kind_queue);
273     QueueTask* initTask = GetQueueTaskByFuncStorageOffset(f);
274     new (initTask)ffrt::QueueTask(this);
275     initTask->SetQos(qos_);
276     TransferTask(initTask);
277 }
278 
SetTimeoutMonitor(QueueTask* task)279 void QueueHandler::SetTimeoutMonitor(QueueTask* task)
280 {
281     if (timeout_ <= 0) {
282         return;
283     }
284 
285     task->IncDeleteRef();
286     timeoutWe_ = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
287     // set delayed worker callback
288     timeoutWe_->cb = ([this, task](WaitEntry* timeoutWe_) {
289         if (!task->GetFinishStatus()) {
290             RunTimeOutCallback(task);
291         }
292         delayedCbCnt_.fetch_sub(1);
293         task->DecDeleteRef();
294         SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(timeoutWe_));
295     });
296 
297     // set delayed worker wakeup time
298     std::chrono::microseconds timeout(timeout_);
299     auto now = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
300     timeoutWe_->tp = std::chrono::time_point_cast<std::chrono::steady_clock::duration>(now + timeout);
301 
302     if (!DelayedWakeup(timeoutWe_->tp, timeoutWe_, timeoutWe_->cb)) {
303         task->DecDeleteRef();
304         SimpleAllocator<WaitUntilEntry>::FreeMem(timeoutWe_);
305         FFRT_LOGW("failed to set watchdog for task gid=%llu in %s with timeout [%llu us] ", task->gid,
306             name_.c_str(), timeout_);
307         return;
308     }
309 
310     delayedCbCnt_.fetch_add(1);
311     FFRT_LOGD("set watchdog of task gid=%llu of %s succ", task->gid, name_.c_str());
312 }
313 
RemoveTimeoutMonitor(QueueTask* task)314 void QueueHandler::RemoveTimeoutMonitor(QueueTask* task)
315 {
316     if (timeout_ <= 0) {
317         return;
318     }
319 
320     WaitEntry* dwe = static_cast<WaitEntry*>(timeoutWe_);
321     if (DelayedRemove(timeoutWe_->tp, dwe)) {
322         delayedCbCnt_.fetch_sub(1);
323         SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(timeoutWe_));
324     }
325     return;
326 }
327 
RunTimeOutCallback(QueueTask* task)328 void QueueHandler::RunTimeOutCallback(QueueTask* task)
329 {
330     std::stringstream ss;
331     static std::once_flag flag;
332     static char processName[PROCESS_NAME_BUFFER_LENGTH];
333     std::call_once(flag, []() {
334         GetProcessName(processName, PROCESS_NAME_BUFFER_LENGTH);
335     });
336     std::string processNameStr = std::string(processName);
337     ss << "[Serial_Queue_Timeout_Callback] process name:[" << processNameStr << "], serial queue:[" <<
338         name_ << "], queueId:[" << GetQueueId() << "], serial task gid:[" << task->gid << "], task name:["
339         << task->label << "], execution time exceeds[" << timeout_ << "] us";
340     FFRT_LOGE("%s", ss.str().c_str());
341     if (timeoutCb_ != nullptr) {
342         timeoutCb_->exec(timeoutCb_);
343     }
344 }
345 
GetDfxInfo() const346 std::string QueueHandler::GetDfxInfo() const
347 {
348     std::stringstream ss;
349     ss << " queue name [" << name_ << "]";
350     if (queue_ != nullptr) {
351         ss << ", remaining tasks count=" << queue_->GetMapSize();
352     }
353     return ss.str();
354 }
355 
IsIdle()356 bool QueueHandler::IsIdle()
357 {
358     FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueId=%u] constructed failed", GetQueueId());
359     FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_eventhandler_adapter),
360         return false, "[queueId=%u] type invalid", GetQueueId());
361 
362     return reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->IsIdle();
363 }
364 
SetEventHandler(void* eventHandler)365 void QueueHandler::SetEventHandler(void* eventHandler)
366 {
367     FFRT_COND_DO_ERR((queue_ == nullptr), return, "[queueId=%u] constructed failed", GetQueueId());
368 
369     bool typeInvalid = (queue_->GetQueueType() != ffrt_queue_eventhandler_interactive) &&
370         (queue_->GetQueueType() != ffrt_queue_eventhandler_adapter);
371     FFRT_COND_DO_ERR(typeInvalid, return, "[queueId=%u] type invalid", GetQueueId());
372 
373     reinterpret_cast<EventHandlerInteractiveQueue*>(queue_.get())->SetEventHandler(eventHandler);
374 }
375 
GetEventHandler()376 void* QueueHandler::GetEventHandler()
377 {
378     FFRT_COND_DO_ERR((queue_ == nullptr), return nullptr, "[queueId=%u] constructed failed", GetQueueId());
379 
380     bool typeInvalid = (queue_->GetQueueType() != ffrt_queue_eventhandler_interactive) &&
381         (queue_->GetQueueType() != ffrt_queue_eventhandler_adapter);
382     FFRT_COND_DO_ERR(typeInvalid, return nullptr, "[queueId=%u] type invalid", GetQueueId());
383 
384     return reinterpret_cast<EventHandlerInteractiveQueue*>(queue_.get())->GetEventHandler();
385 }
386 
Dump(const char* tag, char* buf, uint32_t len, bool historyInfo)387 int QueueHandler::Dump(const char* tag, char* buf, uint32_t len, bool historyInfo)
388 {
389     FFRT_COND_DO_ERR((queue_ == nullptr), return -1, "[queueId=%u] constructed failed", GetQueueId());
390     FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_eventhandler_adapter),
391         return -1, "[queueId=%u] type invalid", GetQueueId());
392     return reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->Dump(tag, buf, len, historyInfo);
393 }
394 
DumpSize(ffrt_inner_queue_priority_t priority)395 int QueueHandler::DumpSize(ffrt_inner_queue_priority_t priority)
396 {
397     FFRT_COND_DO_ERR((queue_ == nullptr), return -1, "[queueId=%u] constructed failed", GetQueueId());
398     FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_eventhandler_adapter),
399         return -1, "[queueId=%u] type invalid", GetQueueId());
400     return reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->DumpSize(priority);
401 }
402 
403 } // namespace ffrt
404