1/* 2 * Copyright (c) 2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15#include "thread_manager.h" 16 17#include <errno.h> 18#include <pthread.h> 19#include <stdatomic.h> 20 21#include "appspawn_utils.h" 22#include "list.h" 23 24typedef struct { 25 atomic_uint threadExit; 26 uint32_t index; 27 pthread_t threadId; 28} ThreadNode; 29 30typedef struct { 31 pthread_mutex_t mutex; // 保护执行队列 32 pthread_cond_t cond; // 线程等待条件 33 ListNode taskList; // 任务队列,任务还没有启动 34 ListNode waitingTaskQueue; // 启动的任务,排队等待执行 35 ListNode executingTaskQueue; // 正在执行 36 ListNode executorQueue; // 执行节点,保存 TaskExecuteNode 37 uint32_t executorCount; 38 uint32_t maxThreadCount; 39 uint32_t currTaskId; 40 struct timespec lastAdjust; 41 uint32_t validThreadCount; 42 ThreadNode threadNode[1]; // 线程信息,控制线程的退出和结束 43} ThreadManager; 44 45typedef struct { 46 uint32_t taskId; 47 ListNode node; 48 ListNode executorList; 49 uint32_t totalTask; 50 atomic_uint taskFlags; // 表示任务是否被取消,各线程检查后决定任务线程是否结束 51 atomic_uint finishTaskCount; 52 const ThreadContext *context; 53 TaskFinishProcessor finishProcess; 54 pthread_mutex_t mutex; // 保护执行队列 55 pthread_cond_t cond; // 同步执行时,等待确认 56} TaskNode; 57 58typedef struct { 59 ListNode node; // 保存sub task到对应的task,方便管理 60 ListNode executeNode; // 等待处理的任务节点 61 TaskNode *task; 62 const ThreadContext *context; 63 TaskExecutor executor; 64} TaskExecuteNode; 65 66static ThreadManager *g_threadManager = NULL; 67 68static void *ManagerThreadProc(void *args); 69static void *ThreadExecute(void *args); 70 71static void SetCondAttr(pthread_cond_t *cond) 72{ 73 pthread_condattr_t attr; 74 pthread_condattr_init(&attr); 75 pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); 76 pthread_cond_init(cond, &attr); 77 pthread_condattr_destroy(&attr); 78} 79 80static void ConvertToTimespec(int time, struct timespec *tm) 81{ 82 struct timespec start; 83 clock_gettime(CLOCK_MONOTONIC, &start); 84 uint64_t ns = time; 85 ns *= APPSPAWN_MSEC_TO_NSEC; 86 ns += start.tv_sec * APPSPAWN_SEC_TO_NSEC + start.tv_nsec; 87 tm->tv_sec = ns / APPSPAWN_SEC_TO_NSEC; 88 tm->tv_nsec = ns % APPSPAWN_SEC_TO_NSEC; 89} 90 91static TaskExecuteNode *PopTaskExecutor(ThreadManager *mgr) 92{ 93 TaskExecuteNode *executor = NULL; 94 pthread_mutex_lock(&mgr->mutex); 95 ListNode *node = mgr->executorQueue.next; 96 if (node != &mgr->executorQueue) { 97 OH_ListRemove(node); 98 OH_ListInit(node); 99 executor = ListEntry(node, TaskExecuteNode, executeNode); 100 mgr->executorCount--; 101 } 102 pthread_mutex_unlock(&mgr->mutex); 103 return executor; 104} 105 106static int AddExecutor(ThreadManager *mgr, const TaskNode *task) 107{ 108 ListNode *node = task->executorList.next; 109 while (node != &task->executorList) { 110 TaskExecuteNode *executor = ListEntry(node, TaskExecuteNode, node); 111 APPSPAWN_LOGV("AddExecutor task: %{public}u executorCount: %{public}u executor: %{public}u", 112 task->taskId, mgr->executorCount, executor->task->taskId); 113 114 // 插入尾部执行 115 pthread_mutex_lock(&mgr->mutex); 116 OH_ListRemove(&executor->executeNode); 117 OH_ListInit(&executor->executeNode); 118 OH_ListAddTail(&mgr->executorQueue, &executor->executeNode); 119 mgr->executorCount++; 120 pthread_mutex_unlock(&mgr->mutex); 121 122 node = node->next; 123 } 124 return 0; 125} 126 127static void RunExecutor(ThreadManager *mgr, ThreadNode *threadNode, uint32_t maxCount) 128{ 129 APPSPAWN_LOGV("RunExecutor in thread: %{public}d executorCount: %{public}u ", 130 threadNode->index, mgr->executorCount); 131 TaskExecuteNode *executor = PopTaskExecutor(mgr); 132 uint32_t count = 0; 133 while (executor != NULL && !threadNode->threadExit) { 134 APPSPAWN_LOGV("RunExecutor task: %{public}u", executor->task->taskId); 135 atomic_fetch_add(&executor->task->finishTaskCount, 1); 136 executor->executor(executor->task->taskId, executor->context); 137 count++; 138 if (count >= maxCount) { 139 break; 140 } 141 executor = PopTaskExecutor(mgr); 142 } 143 APPSPAWN_LOGV("RunExecutor executorCount: %{public}u end", mgr->executorCount); 144} 145 146static int TaskCompareTaskId(ListNode *node, void *data) 147{ 148 TaskNode *task = ListEntry(node, TaskNode, node); 149 return task->taskId - *(uint32_t *)data; 150} 151 152static TaskNode *GetTask(ThreadManager *mgr, ListNode *queue, uint32_t taskId) 153{ 154 ListNode *node = NULL; 155 pthread_mutex_lock(&mgr->mutex); 156 node = OH_ListFind(queue, &taskId, TaskCompareTaskId); 157 pthread_mutex_unlock(&mgr->mutex); 158 if (node == NULL) { 159 return NULL; 160 } 161 return ListEntry(node, TaskNode, node); 162} 163 164static void DeleteTask(TaskNode *task) 165{ 166 APPSPAWN_LOGV("DeleteTask task: %{public}u ", task->taskId); 167 168 if (!ListEmpty(task->node)) { 169 return; 170 } 171 OH_ListRemoveAll(&task->executorList, NULL); 172 pthread_cond_destroy(&task->cond); 173 pthread_mutex_destroy(&task->mutex); 174 free(task); 175} 176 177static TaskNode *PopTask(ThreadManager *mgr, ListNode *queue) 178{ 179 TaskNode *task = NULL; 180 pthread_mutex_lock(&mgr->mutex); 181 ListNode *node = queue->next; 182 if (node != queue) { 183 OH_ListRemove(node); 184 OH_ListInit(node); 185 task = ListEntry(node, TaskNode, node); 186 } 187 pthread_mutex_unlock(&mgr->mutex); 188 return task; 189} 190 191static void PushTask(ThreadManager *mgr, TaskNode *task, ListNode *queue) 192{ 193 pthread_mutex_lock(&mgr->mutex); 194 OH_ListAddTail(queue, &task->node); 195 pthread_cond_broadcast(&mgr->cond); 196 pthread_mutex_unlock(&mgr->mutex); 197} 198 199static void SafeRemoveTask(ThreadManager *mgr, TaskNode *task) 200{ 201 pthread_mutex_lock(&mgr->mutex); 202 OH_ListRemove(&task->node); 203 OH_ListInit(&task->node); 204 pthread_mutex_unlock(&mgr->mutex); 205 206 ListNode *node = task->executorList.next; 207 while (node != &task->executorList) { 208 OH_ListRemove(node); 209 OH_ListInit(node); 210 TaskExecuteNode *executor = ListEntry(node, TaskExecuteNode, node); 211 pthread_mutex_lock(&mgr->mutex); 212 if (!ListEmpty(executor->executeNode)) { 213 OH_ListRemove(&executor->executeNode); 214 OH_ListInit(&executor->executeNode); 215 mgr->executorCount--; 216 } 217 pthread_mutex_unlock(&mgr->mutex); 218 free(executor); 219 220 node = task->executorList.next; 221 } 222} 223 224static void ExecuteTask(ThreadManager *mgr) 225{ 226 TaskNode *task = PopTask(mgr, &mgr->waitingTaskQueue); 227 if (task == NULL) { 228 return; 229 } 230 231 APPSPAWN_LOGV("ExecuteTask task: %{public}u ", task->taskId); 232 AddExecutor(mgr, task); 233 PushTask(mgr, task, &mgr->executingTaskQueue); 234 return; 235} 236 237static void CheckTaskComplete(ThreadManager *mgr) 238{ 239 TaskNode *task = PopTask(mgr, &mgr->executingTaskQueue); 240 if (task == NULL) { 241 return; 242 } 243 if (task->totalTask <= atomic_load(&task->finishTaskCount)) { 244 if (task->finishProcess != NULL) { 245 task->finishProcess(task->taskId, task->context); 246 DeleteTask(task); 247 return; 248 } 249 pthread_mutex_lock(&task->mutex); 250 pthread_cond_signal(&task->cond); 251 pthread_mutex_unlock(&task->mutex); 252 return; 253 } 254 PushTask(mgr, task, &mgr->executingTaskQueue); 255 return; 256} 257 258static void TaskQueueDestroyProc(ListNode *node) 259{ 260 OH_ListRemove(node); 261 TaskNode *task = ListEntry(node, TaskNode, node); 262 DeleteTask(task); 263} 264 265int CreateThreadMgr(uint32_t maxThreadCount, ThreadMgr *instance) 266{ 267 if (g_threadManager != NULL) { 268 *instance = (ThreadMgr)g_threadManager; 269 return 0; 270 } 271 272 ThreadManager *mgr = (ThreadManager *)malloc(sizeof(ThreadManager) + maxThreadCount * sizeof(ThreadNode)); 273 APPSPAWN_CHECK(mgr != NULL, return -1, "Failed to create thread manager"); 274 275 mgr->executorCount = 0; 276 mgr->currTaskId = 0; 277 mgr->validThreadCount = 0; 278 mgr->maxThreadCount = maxThreadCount; 279 OH_ListInit(&mgr->taskList); 280 OH_ListInit(&mgr->waitingTaskQueue); 281 OH_ListInit(&mgr->executingTaskQueue); 282 OH_ListInit(&mgr->executorQueue); 283 pthread_mutex_init(&mgr->mutex, NULL); 284 SetCondAttr(&mgr->cond); 285 286 for (uint32_t index = 0; index < maxThreadCount + 1; index++) { 287 mgr->threadNode[index].index = index; 288 mgr->threadNode[index].threadId = INVALID_THREAD_ID; 289 atomic_init(&mgr->threadNode[index].threadExit, 0); 290 } 291 g_threadManager = mgr; 292 int ret = pthread_create(&mgr->threadNode[0].threadId, NULL, ManagerThreadProc, (void *)&mgr->threadNode[0]); 293 if (ret != 0) { 294 APPSPAWN_LOGE("Failed to create thread for manager"); 295 g_threadManager = NULL; 296 free(mgr); 297 return -1; 298 } 299 *instance = (ThreadMgr)mgr; 300 APPSPAWN_LOGV("Create thread manager success maxThreadCount: %{public}u", maxThreadCount); 301 return 0; 302} 303 304int DestroyThreadMgr(ThreadMgr instance) 305{ 306 APPSPAWN_LOGV("DestroyThreadMgr"); 307 ThreadManager *mgr = (ThreadManager *)instance; 308 APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager"); 309 310 for (uint32_t index = 0; index < mgr->maxThreadCount + 1; index++) { 311 if (mgr->threadNode[index].threadId != INVALID_THREAD_ID) { 312 atomic_store(&mgr->threadNode[index].threadExit, 1); 313 APPSPAWN_LOGV("DestroyThreadMgr index %{public}d %{public}d", index, mgr->threadNode[index].threadExit); 314 } 315 } 316 pthread_mutex_lock(&mgr->mutex); 317 pthread_cond_broadcast(&mgr->cond); 318 pthread_mutex_unlock(&mgr->mutex); 319 for (uint32_t index = 0; index < mgr->maxThreadCount + 1; index++) { 320 if (mgr->threadNode[index].threadId != INVALID_THREAD_ID) { 321 pthread_join(mgr->threadNode[index].threadId, NULL); 322 APPSPAWN_LOGV("DestroyThreadMgr index %{public}d end", index); 323 } 324 } 325 326 pthread_mutex_lock(&mgr->mutex); 327 OH_ListRemoveAll(&mgr->taskList, TaskQueueDestroyProc); 328 OH_ListRemoveAll(&mgr->waitingTaskQueue, TaskQueueDestroyProc); 329 OH_ListRemoveAll(&mgr->executingTaskQueue, TaskQueueDestroyProc); 330 OH_ListRemoveAll(&mgr->executorQueue, TaskQueueDestroyProc); 331 pthread_mutex_unlock(&mgr->mutex); 332 333 pthread_cond_destroy(&mgr->cond); 334 pthread_mutex_destroy(&mgr->mutex); 335 return 0; 336} 337 338int ThreadMgrAddTask(ThreadMgr instance, ThreadTaskHandle *taskHandle) 339{ 340 ThreadManager *mgr = (ThreadManager *)instance; 341 APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager"); 342 TaskNode *task = (TaskNode *)malloc(sizeof(TaskNode)); 343 APPSPAWN_CHECK(task != NULL, return -1, "Failed to create thread task"); 344 345 task->context = NULL; 346 task->finishProcess = NULL; 347 task->totalTask = 0; 348 atomic_init(&task->taskFlags, 0); 349 atomic_init(&task->finishTaskCount, 0); 350 OH_ListInit(&task->node); 351 OH_ListInit(&task->executorList); 352 pthread_mutex_init(&task->mutex, NULL); 353 SetCondAttr(&task->cond); 354 355 pthread_mutex_lock(&mgr->mutex); 356 task->taskId = mgr->currTaskId++; 357 OH_ListAddTail(&mgr->taskList, &task->node); 358 pthread_mutex_unlock(&mgr->mutex); 359 *taskHandle = task->taskId; 360 APPSPAWN_LOGV("Create thread task success task id: %{public}u", task->taskId); 361 return 0; 362} 363 364int ThreadMgrAddExecutor(ThreadMgr instance, 365 ThreadTaskHandle taskHandle, TaskExecutor executor, const ThreadContext *context) 366{ 367 ThreadManager *mgr = (ThreadManager *)instance; 368 APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager"); 369 TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle); 370 APPSPAWN_CHECK(task != NULL, return -1, "Invalid thread task %{public}u", taskHandle); 371 372 TaskExecuteNode *node = (TaskExecuteNode *)malloc(sizeof(TaskExecuteNode)); 373 APPSPAWN_CHECK(node != NULL, return -1, "Failed to create thread executor for task %{public}u", taskHandle); 374 node->task = task; 375 OH_ListInit(&node->node); 376 OH_ListInit(&node->executeNode); 377 node->context = context; 378 node->executor = executor; 379 task->totalTask++; 380 OH_ListAddTail(&task->executorList, &node->node); 381 return 0; 382} 383 384int ThreadMgrCancelTask(ThreadMgr instance, ThreadTaskHandle taskHandle) 385{ 386 ThreadManager *mgr = (ThreadManager *)instance; 387 APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager"); 388 TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle); 389 if (task != NULL) { 390 SafeRemoveTask(mgr, task); 391 DeleteTask(task); 392 return 0; 393 } 394 task = GetTask(mgr, &mgr->waitingTaskQueue, taskHandle); 395 if (task != NULL) { 396 SafeRemoveTask(mgr, task); 397 DeleteTask(task); 398 return 0; 399 } 400 task = GetTask(mgr, &mgr->executingTaskQueue, taskHandle); 401 if (task != NULL) { 402 SafeRemoveTask(mgr, task); 403 DeleteTask(task); 404 return 0; 405 } 406 return 0; 407} 408 409int TaskSyncExecute(ThreadMgr instance, ThreadTaskHandle taskHandle) 410{ 411 ThreadManager *mgr = (ThreadManager *)instance; 412 APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager"); 413 TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle); 414 APPSPAWN_CHECK(task != NULL, return -1, "Invalid thread task %{public}u", taskHandle); 415 416 pthread_mutex_lock(&task->mutex); 417 OH_ListRemove(&task->node); 418 OH_ListInit(&task->node); 419 OH_ListAddTail(&mgr->waitingTaskQueue, &task->node); 420 pthread_cond_broadcast(&mgr->cond); 421 pthread_mutex_unlock(&task->mutex); 422 APPSPAWN_LOGV("TaskSyncExecute task: %{public}u", task->taskId); 423 struct timespec abstime; 424 int ret = 0; 425 do { 426 ConvertToTimespec(60 * 1000, &abstime); // wait 60 * 1000 60s 427 pthread_mutex_lock(&task->mutex); 428 ret = pthread_cond_timedwait(&task->cond, &task->mutex, &abstime); 429 pthread_mutex_unlock(&task->mutex); 430 APPSPAWN_LOGV("TaskSyncExecute success task id: %{public}u ret: %{public}d", task->taskId, ret); 431 } while (ret == ETIMEDOUT); 432 433 DeleteTask(task); 434 return ret; 435} 436 437int TaskExecute(ThreadMgr instance, 438 ThreadTaskHandle taskHandle, TaskFinishProcessor process, const ThreadContext *context) 439{ 440 ThreadManager *mgr = (ThreadManager *)instance; 441 APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager"); 442 TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle); 443 APPSPAWN_CHECK(task != NULL, return -1, "Invalid thread task %{public}u", taskHandle); 444 445 task->finishProcess = process; 446 task->context = context; 447 pthread_mutex_lock(&mgr->mutex); 448 OH_ListRemove(&task->node); 449 OH_ListInit(&task->node); 450 OH_ListAddTail(&mgr->waitingTaskQueue, &task->node); 451 pthread_cond_broadcast(&mgr->cond); 452 pthread_mutex_unlock(&mgr->mutex); 453 APPSPAWN_LOGV("TaskExecute task: %{public}u", task->taskId); 454 return 0; 455} 456 457static void CheckAndCreateNewThread(ThreadManager *mgr) 458{ 459 if (mgr->maxThreadCount <= mgr->validThreadCount) { 460 return; 461 } 462 if (mgr->executorCount <= mgr->validThreadCount) { 463 return; 464 } 465 APPSPAWN_LOGV("CheckAndCreateNewThread maxThreadCount: %{public}u validThreadCount: %{public}u %{public}u", 466 mgr->maxThreadCount, mgr->validThreadCount, mgr->executorCount); 467 468 uint32_t totalThread = mgr->maxThreadCount; 469 if (mgr->executorCount <= mgr->maxThreadCount) { 470 totalThread = mgr->executorCount; 471 } 472 473 for (uint32_t index = 0; index < mgr->maxThreadCount + 1; index++) { 474 if (mgr->threadNode[index].threadId != INVALID_THREAD_ID) { 475 continue; 476 } 477 int ret = pthread_create(&mgr->threadNode[index].threadId, 478 NULL, ThreadExecute, (void *)&(mgr->threadNode[index])); 479 APPSPAWN_CHECK(ret == 0, return, "Failed to create thread for %{public}u", index); 480 APPSPAWN_LOGV("Create thread success index: %{public}u", mgr->threadNode[index].index); 481 mgr->validThreadCount++; 482 if (mgr->validThreadCount >= totalThread) { 483 return; 484 } 485 } 486 return; 487} 488 489static void *ManagerThreadProc(void *args) 490{ 491 ThreadManager *mgr = g_threadManager; 492 ThreadNode *threadNode = (ThreadNode *)args; 493 struct timespec abstime; 494 while (!threadNode->threadExit) { 495 pthread_mutex_lock(&mgr->mutex); 496 do { 497 uint32_t timeout = 60 * 1000; // 60 * 1000 60s 498 if (!ListEmpty(mgr->waitingTaskQueue)) { 499 break; 500 } 501 if (!ListEmpty(mgr->executingTaskQueue)) { 502 timeout = 500; // 500ms 503 } 504 ConvertToTimespec(timeout, &abstime); 505 int ret = pthread_cond_timedwait(&mgr->cond, &mgr->mutex, &abstime); 506 if (!ListEmpty(mgr->executingTaskQueue) || ret == ETIMEDOUT) { 507 break; 508 } 509 if (threadNode->threadExit) { 510 break; 511 } 512 } while (1); 513 pthread_mutex_unlock(&mgr->mutex); 514 515 ExecuteTask(mgr); 516 CheckAndCreateNewThread(mgr); 517 518 if (mgr->validThreadCount == 0) { 519 RunExecutor(mgr, threadNode, 5); // 5 max thread 520 } 521 CheckTaskComplete(mgr); 522 } 523 return 0; 524} 525 526static void *ThreadExecute(void *args) 527{ 528 ThreadManager *mgr = g_threadManager; 529 ThreadNode *threadNode = (ThreadNode *)args; 530 struct timespec abstime; 531 while (!threadNode->threadExit) { 532 pthread_mutex_lock(&mgr->mutex); 533 while (ListEmpty(mgr->executorQueue) && !threadNode->threadExit) { 534 ConvertToTimespec(60 * 1000, &abstime); // 60 * 1000 60s 535 pthread_cond_timedwait(&mgr->cond, &mgr->mutex, &abstime); 536 } 537 pthread_mutex_unlock(&mgr->mutex); 538 APPSPAWN_LOGV("bbbb threadNode->threadExit %{public}d", threadNode->threadExit); 539 RunExecutor(mgr, threadNode, 1); 540 } 541 return NULL; 542} 543