1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "c/queue_ext.h"
16 #include "cpp/queue.h"
17 #include "util/event_handler_adapter.h"
18 #include "dm/dependence_manager.h"
19 #include "tm/queue_task.h"
20 #include "queue_handler.h"
21
22 using namespace std;
23 using namespace ffrt;
24
25 namespace {
ResetTimeoutCb(ffrt::queue_attr_private* p)26 inline void ResetTimeoutCb(ffrt::queue_attr_private* p)
27 {
28 if (p->timeoutCb_ == nullptr) {
29 return;
30 }
31 QueueTask* cbTask = GetQueueTaskByFuncStorageOffset(p->timeoutCb_);
32 cbTask->DecDeleteRef();
33 p->timeoutCb_ = nullptr;
34 }
35
ffrt_queue_submit_base(ffrt_queue_t queue, ffrt_function_header_t* f, bool withHandle, bool insertHead, const ffrt_task_attr_t* attr)36 inline QueueTask* ffrt_queue_submit_base(ffrt_queue_t queue, ffrt_function_header_t* f, bool withHandle,
37 bool insertHead, const ffrt_task_attr_t* attr)
38 {
39 FFRT_COND_DO_ERR(unlikely(queue == nullptr), return nullptr, "input invalid, queue == nullptr");
40 FFRT_COND_DO_ERR(unlikely(f == nullptr), return nullptr, "input invalid, function header == nullptr");
41 QueueHandler* handler = static_cast<QueueHandler*>(queue);
42 ffrt::task_attr_private *p = reinterpret_cast<ffrt::task_attr_private *>(const_cast<ffrt_task_attr_t *>(attr));
43 QueueTask* task = GetQueueTaskByFuncStorageOffset(f);
44 new (task)ffrt::QueueTask(handler, p, insertHead);
45 if (withHandle) {
46 task->IncDeleteRef();
47 }
48
49 handler->Submit(task);
50 return task;
51 }
52 } // namespace
53
54 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_init(ffrt_queue_attr_t* attr)55 int ffrt_queue_attr_init(ffrt_queue_attr_t* attr)
56 {
57 FFRT_COND_DO_ERR((attr == nullptr), return -1, "input invalid, attr == nullptr");
58 static_assert(sizeof(ffrt::queue_attr_private) <= ffrt_queue_attr_storage_size,
59 "size must be less than ffrt_queue_attr_storage_size");
60
61 new (attr) ffrt::queue_attr_private();
62 return 0;
63 }
64
65 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_destroy(ffrt_queue_attr_t* attr)66 void ffrt_queue_attr_destroy(ffrt_queue_attr_t* attr)
67 {
68 FFRT_COND_DO_ERR((attr == nullptr), return, "input invalid, attr == nullptr");
69 auto p = reinterpret_cast<ffrt::queue_attr_private*>(attr);
70 ResetTimeoutCb(p);
71 p->~queue_attr_private();
72 }
73
74 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_set_qos(ffrt_queue_attr_t* attr, ffrt_qos_t qos)75 void ffrt_queue_attr_set_qos(ffrt_queue_attr_t* attr, ffrt_qos_t qos)
76 {
77 FFRT_COND_DO_ERR((attr == nullptr), return, "input invalid, attr == nullptr");
78
79 (reinterpret_cast<ffrt::queue_attr_private*>(attr))->qos_ = ffrt::GetFuncQosMap()(qos);
80 }
81
82 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_get_qos(const ffrt_queue_attr_t* attr)83 ffrt_qos_t ffrt_queue_attr_get_qos(const ffrt_queue_attr_t* attr)
84 {
85 FFRT_COND_DO_ERR((attr == nullptr), return ffrt_qos_default, "input invalid, attr == nullptr");
86 ffrt_queue_attr_t* p = const_cast<ffrt_queue_attr_t*>(attr);
87 return (reinterpret_cast<ffrt::queue_attr_private*>(p))->qos_;
88 }
89
90 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_set_timeout(ffrt_queue_attr_t* attr, uint64_t timeout_us)91 void ffrt_queue_attr_set_timeout(ffrt_queue_attr_t* attr, uint64_t timeout_us)
92 {
93 FFRT_COND_DO_ERR((attr == nullptr), return, "input invalid, attr == nullptr");
94 (reinterpret_cast<ffrt::queue_attr_private*>(attr))->timeout_ = timeout_us;
95 }
96
97 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_get_timeout(const ffrt_queue_attr_t* attr)98 uint64_t ffrt_queue_attr_get_timeout(const ffrt_queue_attr_t* attr)
99 {
100 FFRT_COND_DO_ERR((attr == nullptr), return 0, "input invalid, attr == nullptr");
101 ffrt_queue_attr_t* p = const_cast<ffrt_queue_attr_t*>(attr);
102 return (reinterpret_cast<ffrt::queue_attr_private*>(p))->timeout_;
103 }
104
105 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_set_callback(ffrt_queue_attr_t* attr, ffrt_function_header_t* f)106 void ffrt_queue_attr_set_callback(ffrt_queue_attr_t* attr, ffrt_function_header_t* f)
107 {
108 FFRT_COND_DO_ERR((attr == nullptr), return, "input invalid, attr == nullptr");
109 ffrt::queue_attr_private* p = reinterpret_cast<ffrt::queue_attr_private*>(attr);
110 ResetTimeoutCb(p);
111 p->timeoutCb_ = f;
112 // the memory of timeoutCb are managed in the form of QueueTask
113 QueueTask* task = GetQueueTaskByFuncStorageOffset(f);
114 new (task)ffrt::QueueTask(nullptr);
115 }
116
117 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_get_callback(const ffrt_queue_attr_t* attr)118 ffrt_function_header_t* ffrt_queue_attr_get_callback(const ffrt_queue_attr_t* attr)
119 {
120 FFRT_COND_DO_ERR((attr == nullptr), return nullptr, "input invalid, attr == nullptr");
121 ffrt_queue_attr_t* p = const_cast<ffrt_queue_attr_t*>(attr);
122 return (reinterpret_cast<ffrt::queue_attr_private*>(p))->timeoutCb_;
123 }
124
125 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_set_max_concurrency(ffrt_queue_attr_t* attr, const int max_concurrency)126 void ffrt_queue_attr_set_max_concurrency(ffrt_queue_attr_t* attr, const int max_concurrency)
127 {
128 FFRT_COND_DO_ERR((attr == nullptr), return, "input invalid, attr == nullptr");
129
130 FFRT_COND_DO_ERR((max_concurrency <= 0), return,
131 "max concurrency should be a valid value");
132
133 (reinterpret_cast<ffrt::queue_attr_private*>(attr))->maxConcurrency_ = max_concurrency;
134 }
135
136 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_get_max_concurrency(const ffrt_queue_attr_t* attr)137 int ffrt_queue_attr_get_max_concurrency(const ffrt_queue_attr_t* attr)
138 {
139 FFRT_COND_DO_ERR((attr == nullptr), return 0, "input invalid, attr == nullptr");
140 ffrt_queue_attr_t* p = const_cast<ffrt_queue_attr_t*>(attr);
141 return (reinterpret_cast<ffrt::queue_attr_private*>(p))->maxConcurrency_;
142 }
143
144 API_ATTRIBUTE((visibility("default")))
ffrt_queue_create(ffrt_queue_type_t type, const char* name, const ffrt_queue_attr_t* attr)145 ffrt_queue_t ffrt_queue_create(ffrt_queue_type_t type, const char* name, const ffrt_queue_attr_t* attr)
146 {
147 bool invalidType = (type == ffrt_queue_max) || (type < ffrt_queue_serial) ||
148 (type >= static_cast<ffrt_queue_type_t>(ffrt_queue_inner_max));
149 FFRT_COND_DO_ERR(invalidType, return nullptr, "input invalid, type unsupport");
150 QueueHandler* handler = new (std::nothrow) QueueHandler(name, attr, type);
151 FFRT_COND_DO_ERR((handler == nullptr), return nullptr, "failed to construct QueueHandler");
152 return static_cast<ffrt_queue_t>(handler);
153 }
154
155 API_ATTRIBUTE((visibility("default")))
ffrt_queue_destroy(ffrt_queue_t queue)156 void ffrt_queue_destroy(ffrt_queue_t queue)
157 {
158 FFRT_COND_DO_ERR((queue == nullptr), return, "input invalid, queue is nullptr");
159 QueueHandler* handler = static_cast<QueueHandler*>(queue);
160 delete handler;
161 }
162
163 API_ATTRIBUTE((visibility("default")))
ffrt_queue_submit(ffrt_queue_t queue, ffrt_function_header_t* f, const ffrt_task_attr_t* attr)164 void ffrt_queue_submit(ffrt_queue_t queue, ffrt_function_header_t* f, const ffrt_task_attr_t* attr)
165 {
166 FFRT_COND_DO_ERR((f == nullptr), return, "input invalid, function is nullptr");
167 QueueTask* task = ffrt_queue_submit_base(queue, f, false, false, attr);
168 FFRT_COND_DO_ERR((task == nullptr), return, "failed to submit serial task");
169 }
170
171 API_ATTRIBUTE((visibility("default")))
ffrt_queue_submit_head(ffrt_queue_t queue, ffrt_function_header_t* f, const ffrt_task_attr_t* attr)172 void ffrt_queue_submit_head(ffrt_queue_t queue, ffrt_function_header_t* f, const ffrt_task_attr_t* attr)
173 {
174 FFRT_COND_DO_ERR((f == nullptr), return, "input invalid, function is nullptr");
175 QueueTask* task = ffrt_queue_submit_base(queue, f, false, true, attr);
176 FFRT_COND_DO_ERR((task == nullptr), return, "failed to submit serial task");
177 }
178
179 API_ATTRIBUTE((visibility("default")))
ffrt_queue_submit_h(ffrt_queue_t queue, ffrt_function_header_t* f, const ffrt_task_attr_t* attr)180 ffrt_task_handle_t ffrt_queue_submit_h(ffrt_queue_t queue, ffrt_function_header_t* f, const ffrt_task_attr_t* attr)
181 {
182 FFRT_COND_DO_ERR((f == nullptr), return nullptr, "input invalid, function is nullptr");
183 QueueTask* task = ffrt_queue_submit_base(queue, f, true, false, attr);
184 FFRT_COND_DO_ERR((task == nullptr), return nullptr, "failed to submit serial task");
185 return static_cast<ffrt_task_handle_t>(task);
186 }
187
188 API_ATTRIBUTE((visibility("default")))
ffrt_queue_submit_head_h(ffrt_queue_t queue, ffrt_function_header_t* f, const ffrt_task_attr_t* attr)189 ffrt_task_handle_t ffrt_queue_submit_head_h(ffrt_queue_t queue, ffrt_function_header_t* f, const ffrt_task_attr_t* attr)
190 {
191 FFRT_COND_DO_ERR((f == nullptr), return nullptr, "input invalid, function is nullptr");
192 QueueTask* task = ffrt_queue_submit_base(queue, f, true, true, attr);
193 FFRT_COND_DO_ERR((task == nullptr), return nullptr, "failed to submit serial task");
194 return static_cast<ffrt_task_handle_t>(task);
195 }
196
197 API_ATTRIBUTE((visibility("default")))
ffrt_queue_wait(ffrt_task_handle_t handle)198 void ffrt_queue_wait(ffrt_task_handle_t handle)
199 {
200 FFRT_COND_DO_ERR((handle == nullptr), return, "input invalid, task_handle is nullptr");
201 QueueTask* task = static_cast<QueueTask*>(handle);
202 task->Wait();
203 }
204
205 API_ATTRIBUTE((visibility("default")))
ffrt_queue_cancel(ffrt_task_handle_t handle)206 int ffrt_queue_cancel(ffrt_task_handle_t handle)
207 {
208 FFRT_COND_DO_ERR((handle == nullptr), return -1, "input invalid, handle is nullptr");
209 QueueTask* task = reinterpret_cast<QueueTask*>(static_cast<CPUEUTask*>(handle));
210 QueueHandler* handler = task->GetHandler();
211 FFRT_COND_DO_ERR((handler == nullptr), return -1, "task handler is nullptr");
212 int ret = handler->Cancel(task);
213 return ret;
214 }
215
216 API_ATTRIBUTE((visibility("default")))
ffrt_queue_cancel_all(ffrt_queue_t queue)217 void ffrt_queue_cancel_all(ffrt_queue_t queue)
218 {
219 FFRT_COND_DO_ERR(unlikely(queue == nullptr), return, "input invalid, queue is nullptr");
220 QueueHandler* handler = static_cast<QueueHandler*>(queue);
221 handler->Cancel();
222 }
223
224 API_ATTRIBUTE((visibility("default")))
ffrt_queue_cancel_and_wait(ffrt_queue_t queue)225 void ffrt_queue_cancel_and_wait(ffrt_queue_t queue)
226 {
227 FFRT_COND_DO_ERR(unlikely(queue == nullptr), return, "input invalid, queue is nullptr");
228 QueueHandler* handler = static_cast<QueueHandler*>(queue);
229 handler->CancelAndWait();
230 }
231
232 API_ATTRIBUTE((visibility("default")))
ffrt_queue_cancel_by_name(ffrt_queue_t queue, const char* name)233 int ffrt_queue_cancel_by_name(ffrt_queue_t queue, const char* name)
234 {
235 FFRT_COND_DO_ERR(unlikely(queue == nullptr), return -1, "input invalid, queue is nullptr");
236 FFRT_COND_DO_ERR(unlikely(name == nullptr), return -1, "input invalid, name is nullptr");
237 QueueHandler* handler = static_cast<QueueHandler*>(queue);
238 return handler->Cancel(name);
239 }
240
241 API_ATTRIBUTE((visibility("default")))
ffrt_queue_has_task(ffrt_queue_t queue, const char* name)242 bool ffrt_queue_has_task(ffrt_queue_t queue, const char* name)
243 {
244 FFRT_COND_DO_ERR(unlikely(queue == nullptr), return false, "input invalid, queue is nullptr");
245 FFRT_COND_DO_ERR(unlikely(name == nullptr), return false, "input invalid, name is nullptr");
246 QueueHandler* handler = static_cast<QueueHandler*>(queue);
247 return handler->HasTask(name);
248 }
249
250 API_ATTRIBUTE((visibility("default")))
ffrt_queue_is_idle(ffrt_queue_t queue)251 bool ffrt_queue_is_idle(ffrt_queue_t queue)
252 {
253 FFRT_COND_DO_ERR(unlikely(queue == nullptr), return false, "input invalid, queue is nullptr");
254 QueueHandler* handler = static_cast<QueueHandler*>(queue);
255 return handler->IsIdle();
256 }
257
258 API_ATTRIBUTE((visibility("default")))
ffrt_queue_set_eventhandler(ffrt_queue_t queue, void* eventhandler)259 void ffrt_queue_set_eventhandler(ffrt_queue_t queue, void* eventhandler)
260 {
261 FFRT_COND_DO_ERR(unlikely(queue == nullptr), return, "input invalid, queue is nullptr");
262 QueueHandler* handler = static_cast<QueueHandler*>(queue);
263 handler->SetEventHandler(eventhandler);
264 }
265
266 API_ATTRIBUTE((visibility("default")))
ffrt_get_current_queue_eventhandler()267 void* ffrt_get_current_queue_eventhandler()
268 {
269 CPUEUTask* curTask = ffrt::ExecuteCtx::Cur()->task;
270 if (curTask == nullptr || curTask->type != ffrt_queue_task) {
271 FFRT_LOGW("Current task is nullptr or is not a serial task.");
272 return nullptr;
273 }
274
275 QueueHandler* handler = reinterpret_cast<QueueTask*>(curTask)->GetHandler();
276 FFRT_COND_DO_ERR((handler == nullptr), return nullptr, "task handler is nullptr");
277 return handler->GetEventHandler();
278 }
279
280 API_ATTRIBUTE((visibility("default")))
ffrt_get_main_queue()281 ffrt_queue_t ffrt_get_main_queue()
282 {
283 FFRT_COND_DO_ERR((EventHandlerAdapter::Instance()->GetMainEventHandler == nullptr),
284 return nullptr, "failed to load GetMainEventHandler Func.");
285 void* mainHandler = EventHandlerAdapter::Instance()->GetMainEventHandler();
286 FFRT_COND_DO_ERR((mainHandler == nullptr), return nullptr, "failed to get main queue.");
287 QueueHandler *handler = new (std::nothrow) QueueHandler(
288 "main_queue", nullptr, ffrt_queue_eventhandler_interactive);
289 FFRT_COND_DO_ERR((handler == nullptr), return nullptr, "failed to construct MainThreadQueueHandler");
290 handler->SetEventHandler(mainHandler);
291 return static_cast<ffrt_queue_t>(handler);
292 }
293
294 API_ATTRIBUTE((visibility("default")))
ffrt_get_current_queue()295 ffrt_queue_t ffrt_get_current_queue()
296 {
297 FFRT_COND_DO_ERR((EventHandlerAdapter::Instance()->GetCurrentEventHandler == nullptr),
298 return nullptr, "failed to load GetCurrentEventHandler Func.");
299 void* workerHandler = EventHandlerAdapter::Instance()->GetCurrentEventHandler();
300 FFRT_COND_DO_ERR((workerHandler == nullptr), return nullptr, "failed to get ArkTs worker queue.");
301 QueueHandler *handler = new (std::nothrow) QueueHandler(
302 "current_queue", nullptr, ffrt_queue_eventhandler_interactive);
303 FFRT_COND_DO_ERR((handler == nullptr), return nullptr, "failed to construct WorkerThreadQueueHandler");
304 handler->SetEventHandler(workerHandler);
305 return static_cast<ffrt_queue_t>(handler);
306 }
307
308 API_ATTRIBUTE((visibility("default")))
ffrt_queue_dump(ffrt_queue_t queue, const char* tag, char* buf, uint32_t len, bool history_info)309 int ffrt_queue_dump(ffrt_queue_t queue, const char* tag, char* buf, uint32_t len, bool history_info)
310 {
311 FFRT_COND_DO_ERR((queue == nullptr), return -1, "input invalid, queue is nullptr");
312 FFRT_COND_DO_ERR((tag == nullptr || buf == nullptr), return -1, "input invalid, tag or buf is nullptr");
313 QueueHandler* handler = static_cast<QueueHandler*>(queue);
314 return handler->Dump(tag, buf, len, history_info);
315 }
316
317 API_ATTRIBUTE((visibility("default")))
ffrt_queue_size_dump(ffrt_queue_t queue, ffrt_inner_queue_priority_t priority)318 int ffrt_queue_size_dump(ffrt_queue_t queue, ffrt_inner_queue_priority_t priority)
319 {
320 FFRT_COND_DO_ERR((queue == nullptr), return -1, "input invalid, queue is nullptr");
321 QueueHandler* handler = static_cast<QueueHandler*>(queue);
322 return handler->DumpSize(priority);
323 }