169570cc8Sopenharmony_ci/* 269570cc8Sopenharmony_ci * Copyright (c) 2024 Huawei Device Co., Ltd. 369570cc8Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 469570cc8Sopenharmony_ci * you may not use this file except in compliance with the License. 569570cc8Sopenharmony_ci * You may obtain a copy of the License at 669570cc8Sopenharmony_ci * 769570cc8Sopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 869570cc8Sopenharmony_ci * 969570cc8Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software 1069570cc8Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 1169570cc8Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1269570cc8Sopenharmony_ci * See the License for the specific language governing permissions and 1369570cc8Sopenharmony_ci * limitations under the License. 1469570cc8Sopenharmony_ci */ 1569570cc8Sopenharmony_ci#include "thread_manager.h" 1669570cc8Sopenharmony_ci 1769570cc8Sopenharmony_ci#include <errno.h> 1869570cc8Sopenharmony_ci#include <pthread.h> 1969570cc8Sopenharmony_ci#include <stdatomic.h> 2069570cc8Sopenharmony_ci 2169570cc8Sopenharmony_ci#include "appspawn_utils.h" 2269570cc8Sopenharmony_ci#include "list.h" 2369570cc8Sopenharmony_ci 2469570cc8Sopenharmony_citypedef struct { 2569570cc8Sopenharmony_ci atomic_uint threadExit; 2669570cc8Sopenharmony_ci uint32_t index; 2769570cc8Sopenharmony_ci pthread_t threadId; 2869570cc8Sopenharmony_ci} ThreadNode; 2969570cc8Sopenharmony_ci 3069570cc8Sopenharmony_citypedef struct { 3169570cc8Sopenharmony_ci pthread_mutex_t mutex; // 保护执行队列 3269570cc8Sopenharmony_ci pthread_cond_t cond; // 线程等待条件 3369570cc8Sopenharmony_ci ListNode taskList; // 任务队列,任务还没有启动 3469570cc8Sopenharmony_ci ListNode waitingTaskQueue; // 启动的任务,排队等待执行 3569570cc8Sopenharmony_ci ListNode executingTaskQueue; // 正在执行 3669570cc8Sopenharmony_ci ListNode executorQueue; // 执行节点,保存 TaskExecuteNode 3769570cc8Sopenharmony_ci uint32_t executorCount; 3869570cc8Sopenharmony_ci uint32_t maxThreadCount; 3969570cc8Sopenharmony_ci uint32_t currTaskId; 4069570cc8Sopenharmony_ci struct timespec lastAdjust; 4169570cc8Sopenharmony_ci uint32_t validThreadCount; 4269570cc8Sopenharmony_ci ThreadNode threadNode[1]; // 线程信息,控制线程的退出和结束 4369570cc8Sopenharmony_ci} ThreadManager; 4469570cc8Sopenharmony_ci 4569570cc8Sopenharmony_citypedef struct { 4669570cc8Sopenharmony_ci uint32_t taskId; 4769570cc8Sopenharmony_ci ListNode node; 4869570cc8Sopenharmony_ci ListNode executorList; 4969570cc8Sopenharmony_ci uint32_t totalTask; 5069570cc8Sopenharmony_ci atomic_uint taskFlags; // 表示任务是否被取消,各线程检查后决定任务线程是否结束 5169570cc8Sopenharmony_ci atomic_uint finishTaskCount; 5269570cc8Sopenharmony_ci const ThreadContext *context; 5369570cc8Sopenharmony_ci TaskFinishProcessor finishProcess; 5469570cc8Sopenharmony_ci pthread_mutex_t mutex; // 保护执行队列 5569570cc8Sopenharmony_ci pthread_cond_t cond; // 同步执行时,等待确认 5669570cc8Sopenharmony_ci} TaskNode; 5769570cc8Sopenharmony_ci 5869570cc8Sopenharmony_citypedef struct { 5969570cc8Sopenharmony_ci ListNode node; // 保存sub task到对应的task,方便管理 6069570cc8Sopenharmony_ci ListNode executeNode; // 等待处理的任务节点 6169570cc8Sopenharmony_ci TaskNode *task; 6269570cc8Sopenharmony_ci const ThreadContext *context; 6369570cc8Sopenharmony_ci TaskExecutor executor; 6469570cc8Sopenharmony_ci} TaskExecuteNode; 6569570cc8Sopenharmony_ci 6669570cc8Sopenharmony_cistatic ThreadManager *g_threadManager = NULL; 6769570cc8Sopenharmony_ci 6869570cc8Sopenharmony_cistatic void *ManagerThreadProc(void *args); 6969570cc8Sopenharmony_cistatic void *ThreadExecute(void *args); 7069570cc8Sopenharmony_ci 7169570cc8Sopenharmony_cistatic void SetCondAttr(pthread_cond_t *cond) 7269570cc8Sopenharmony_ci{ 7369570cc8Sopenharmony_ci pthread_condattr_t attr; 7469570cc8Sopenharmony_ci pthread_condattr_init(&attr); 7569570cc8Sopenharmony_ci pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); 7669570cc8Sopenharmony_ci pthread_cond_init(cond, &attr); 7769570cc8Sopenharmony_ci pthread_condattr_destroy(&attr); 7869570cc8Sopenharmony_ci} 7969570cc8Sopenharmony_ci 8069570cc8Sopenharmony_cistatic void ConvertToTimespec(int time, struct timespec *tm) 8169570cc8Sopenharmony_ci{ 8269570cc8Sopenharmony_ci struct timespec start; 8369570cc8Sopenharmony_ci clock_gettime(CLOCK_MONOTONIC, &start); 8469570cc8Sopenharmony_ci uint64_t ns = time; 8569570cc8Sopenharmony_ci ns *= APPSPAWN_MSEC_TO_NSEC; 8669570cc8Sopenharmony_ci ns += start.tv_sec * APPSPAWN_SEC_TO_NSEC + start.tv_nsec; 8769570cc8Sopenharmony_ci tm->tv_sec = ns / APPSPAWN_SEC_TO_NSEC; 8869570cc8Sopenharmony_ci tm->tv_nsec = ns % APPSPAWN_SEC_TO_NSEC; 8969570cc8Sopenharmony_ci} 9069570cc8Sopenharmony_ci 9169570cc8Sopenharmony_cistatic TaskExecuteNode *PopTaskExecutor(ThreadManager *mgr) 9269570cc8Sopenharmony_ci{ 9369570cc8Sopenharmony_ci TaskExecuteNode *executor = NULL; 9469570cc8Sopenharmony_ci pthread_mutex_lock(&mgr->mutex); 9569570cc8Sopenharmony_ci ListNode *node = mgr->executorQueue.next; 9669570cc8Sopenharmony_ci if (node != &mgr->executorQueue) { 9769570cc8Sopenharmony_ci OH_ListRemove(node); 9869570cc8Sopenharmony_ci OH_ListInit(node); 9969570cc8Sopenharmony_ci executor = ListEntry(node, TaskExecuteNode, executeNode); 10069570cc8Sopenharmony_ci mgr->executorCount--; 10169570cc8Sopenharmony_ci } 10269570cc8Sopenharmony_ci pthread_mutex_unlock(&mgr->mutex); 10369570cc8Sopenharmony_ci return executor; 10469570cc8Sopenharmony_ci} 10569570cc8Sopenharmony_ci 10669570cc8Sopenharmony_cistatic int AddExecutor(ThreadManager *mgr, const TaskNode *task) 10769570cc8Sopenharmony_ci{ 10869570cc8Sopenharmony_ci ListNode *node = task->executorList.next; 10969570cc8Sopenharmony_ci while (node != &task->executorList) { 11069570cc8Sopenharmony_ci TaskExecuteNode *executor = ListEntry(node, TaskExecuteNode, node); 11169570cc8Sopenharmony_ci APPSPAWN_LOGV("AddExecutor task: %{public}u executorCount: %{public}u executor: %{public}u", 11269570cc8Sopenharmony_ci task->taskId, mgr->executorCount, executor->task->taskId); 11369570cc8Sopenharmony_ci 11469570cc8Sopenharmony_ci // 插入尾部执行 11569570cc8Sopenharmony_ci pthread_mutex_lock(&mgr->mutex); 11669570cc8Sopenharmony_ci OH_ListRemove(&executor->executeNode); 11769570cc8Sopenharmony_ci OH_ListInit(&executor->executeNode); 11869570cc8Sopenharmony_ci OH_ListAddTail(&mgr->executorQueue, &executor->executeNode); 11969570cc8Sopenharmony_ci mgr->executorCount++; 12069570cc8Sopenharmony_ci pthread_mutex_unlock(&mgr->mutex); 12169570cc8Sopenharmony_ci 12269570cc8Sopenharmony_ci node = node->next; 12369570cc8Sopenharmony_ci } 12469570cc8Sopenharmony_ci return 0; 12569570cc8Sopenharmony_ci} 12669570cc8Sopenharmony_ci 12769570cc8Sopenharmony_cistatic void RunExecutor(ThreadManager *mgr, ThreadNode *threadNode, uint32_t maxCount) 12869570cc8Sopenharmony_ci{ 12969570cc8Sopenharmony_ci APPSPAWN_LOGV("RunExecutor in thread: %{public}d executorCount: %{public}u ", 13069570cc8Sopenharmony_ci threadNode->index, mgr->executorCount); 13169570cc8Sopenharmony_ci TaskExecuteNode *executor = PopTaskExecutor(mgr); 13269570cc8Sopenharmony_ci uint32_t count = 0; 13369570cc8Sopenharmony_ci while (executor != NULL && !threadNode->threadExit) { 13469570cc8Sopenharmony_ci APPSPAWN_LOGV("RunExecutor task: %{public}u", executor->task->taskId); 13569570cc8Sopenharmony_ci atomic_fetch_add(&executor->task->finishTaskCount, 1); 13669570cc8Sopenharmony_ci executor->executor(executor->task->taskId, executor->context); 13769570cc8Sopenharmony_ci count++; 13869570cc8Sopenharmony_ci if (count >= maxCount) { 13969570cc8Sopenharmony_ci break; 14069570cc8Sopenharmony_ci } 14169570cc8Sopenharmony_ci executor = PopTaskExecutor(mgr); 14269570cc8Sopenharmony_ci } 14369570cc8Sopenharmony_ci APPSPAWN_LOGV("RunExecutor executorCount: %{public}u end", mgr->executorCount); 14469570cc8Sopenharmony_ci} 14569570cc8Sopenharmony_ci 14669570cc8Sopenharmony_cistatic int TaskCompareTaskId(ListNode *node, void *data) 14769570cc8Sopenharmony_ci{ 14869570cc8Sopenharmony_ci TaskNode *task = ListEntry(node, TaskNode, node); 14969570cc8Sopenharmony_ci return task->taskId - *(uint32_t *)data; 15069570cc8Sopenharmony_ci} 15169570cc8Sopenharmony_ci 15269570cc8Sopenharmony_cistatic TaskNode *GetTask(ThreadManager *mgr, ListNode *queue, uint32_t taskId) 15369570cc8Sopenharmony_ci{ 15469570cc8Sopenharmony_ci ListNode *node = NULL; 15569570cc8Sopenharmony_ci pthread_mutex_lock(&mgr->mutex); 15669570cc8Sopenharmony_ci node = OH_ListFind(queue, &taskId, TaskCompareTaskId); 15769570cc8Sopenharmony_ci pthread_mutex_unlock(&mgr->mutex); 15869570cc8Sopenharmony_ci if (node == NULL) { 15969570cc8Sopenharmony_ci return NULL; 16069570cc8Sopenharmony_ci } 16169570cc8Sopenharmony_ci return ListEntry(node, TaskNode, node); 16269570cc8Sopenharmony_ci} 16369570cc8Sopenharmony_ci 16469570cc8Sopenharmony_cistatic void DeleteTask(TaskNode *task) 16569570cc8Sopenharmony_ci{ 16669570cc8Sopenharmony_ci APPSPAWN_LOGV("DeleteTask task: %{public}u ", task->taskId); 16769570cc8Sopenharmony_ci 16869570cc8Sopenharmony_ci if (!ListEmpty(task->node)) { 16969570cc8Sopenharmony_ci return; 17069570cc8Sopenharmony_ci } 17169570cc8Sopenharmony_ci OH_ListRemoveAll(&task->executorList, NULL); 17269570cc8Sopenharmony_ci pthread_cond_destroy(&task->cond); 17369570cc8Sopenharmony_ci pthread_mutex_destroy(&task->mutex); 17469570cc8Sopenharmony_ci free(task); 17569570cc8Sopenharmony_ci} 17669570cc8Sopenharmony_ci 17769570cc8Sopenharmony_cistatic TaskNode *PopTask(ThreadManager *mgr, ListNode *queue) 17869570cc8Sopenharmony_ci{ 17969570cc8Sopenharmony_ci TaskNode *task = NULL; 18069570cc8Sopenharmony_ci pthread_mutex_lock(&mgr->mutex); 18169570cc8Sopenharmony_ci ListNode *node = queue->next; 18269570cc8Sopenharmony_ci if (node != queue) { 18369570cc8Sopenharmony_ci OH_ListRemove(node); 18469570cc8Sopenharmony_ci OH_ListInit(node); 18569570cc8Sopenharmony_ci task = ListEntry(node, TaskNode, node); 18669570cc8Sopenharmony_ci } 18769570cc8Sopenharmony_ci pthread_mutex_unlock(&mgr->mutex); 18869570cc8Sopenharmony_ci return task; 18969570cc8Sopenharmony_ci} 19069570cc8Sopenharmony_ci 19169570cc8Sopenharmony_cistatic void PushTask(ThreadManager *mgr, TaskNode *task, ListNode *queue) 19269570cc8Sopenharmony_ci{ 19369570cc8Sopenharmony_ci pthread_mutex_lock(&mgr->mutex); 19469570cc8Sopenharmony_ci OH_ListAddTail(queue, &task->node); 19569570cc8Sopenharmony_ci pthread_cond_broadcast(&mgr->cond); 19669570cc8Sopenharmony_ci pthread_mutex_unlock(&mgr->mutex); 19769570cc8Sopenharmony_ci} 19869570cc8Sopenharmony_ci 19969570cc8Sopenharmony_cistatic void SafeRemoveTask(ThreadManager *mgr, TaskNode *task) 20069570cc8Sopenharmony_ci{ 20169570cc8Sopenharmony_ci pthread_mutex_lock(&mgr->mutex); 20269570cc8Sopenharmony_ci OH_ListRemove(&task->node); 20369570cc8Sopenharmony_ci OH_ListInit(&task->node); 20469570cc8Sopenharmony_ci pthread_mutex_unlock(&mgr->mutex); 20569570cc8Sopenharmony_ci 20669570cc8Sopenharmony_ci ListNode *node = task->executorList.next; 20769570cc8Sopenharmony_ci while (node != &task->executorList) { 20869570cc8Sopenharmony_ci OH_ListRemove(node); 20969570cc8Sopenharmony_ci OH_ListInit(node); 21069570cc8Sopenharmony_ci TaskExecuteNode *executor = ListEntry(node, TaskExecuteNode, node); 21169570cc8Sopenharmony_ci pthread_mutex_lock(&mgr->mutex); 21269570cc8Sopenharmony_ci if (!ListEmpty(executor->executeNode)) { 21369570cc8Sopenharmony_ci OH_ListRemove(&executor->executeNode); 21469570cc8Sopenharmony_ci OH_ListInit(&executor->executeNode); 21569570cc8Sopenharmony_ci mgr->executorCount--; 21669570cc8Sopenharmony_ci } 21769570cc8Sopenharmony_ci pthread_mutex_unlock(&mgr->mutex); 21869570cc8Sopenharmony_ci free(executor); 21969570cc8Sopenharmony_ci 22069570cc8Sopenharmony_ci node = task->executorList.next; 22169570cc8Sopenharmony_ci } 22269570cc8Sopenharmony_ci} 22369570cc8Sopenharmony_ci 22469570cc8Sopenharmony_cistatic void ExecuteTask(ThreadManager *mgr) 22569570cc8Sopenharmony_ci{ 22669570cc8Sopenharmony_ci TaskNode *task = PopTask(mgr, &mgr->waitingTaskQueue); 22769570cc8Sopenharmony_ci if (task == NULL) { 22869570cc8Sopenharmony_ci return; 22969570cc8Sopenharmony_ci } 23069570cc8Sopenharmony_ci 23169570cc8Sopenharmony_ci APPSPAWN_LOGV("ExecuteTask task: %{public}u ", task->taskId); 23269570cc8Sopenharmony_ci AddExecutor(mgr, task); 23369570cc8Sopenharmony_ci PushTask(mgr, task, &mgr->executingTaskQueue); 23469570cc8Sopenharmony_ci return; 23569570cc8Sopenharmony_ci} 23669570cc8Sopenharmony_ci 23769570cc8Sopenharmony_cistatic void CheckTaskComplete(ThreadManager *mgr) 23869570cc8Sopenharmony_ci{ 23969570cc8Sopenharmony_ci TaskNode *task = PopTask(mgr, &mgr->executingTaskQueue); 24069570cc8Sopenharmony_ci if (task == NULL) { 24169570cc8Sopenharmony_ci return; 24269570cc8Sopenharmony_ci } 24369570cc8Sopenharmony_ci if (task->totalTask <= atomic_load(&task->finishTaskCount)) { 24469570cc8Sopenharmony_ci if (task->finishProcess != NULL) { 24569570cc8Sopenharmony_ci task->finishProcess(task->taskId, task->context); 24669570cc8Sopenharmony_ci DeleteTask(task); 24769570cc8Sopenharmony_ci return; 24869570cc8Sopenharmony_ci } 24969570cc8Sopenharmony_ci pthread_mutex_lock(&task->mutex); 25069570cc8Sopenharmony_ci pthread_cond_signal(&task->cond); 25169570cc8Sopenharmony_ci pthread_mutex_unlock(&task->mutex); 25269570cc8Sopenharmony_ci return; 25369570cc8Sopenharmony_ci } 25469570cc8Sopenharmony_ci PushTask(mgr, task, &mgr->executingTaskQueue); 25569570cc8Sopenharmony_ci return; 25669570cc8Sopenharmony_ci} 25769570cc8Sopenharmony_ci 25869570cc8Sopenharmony_cistatic void TaskQueueDestroyProc(ListNode *node) 25969570cc8Sopenharmony_ci{ 26069570cc8Sopenharmony_ci OH_ListRemove(node); 26169570cc8Sopenharmony_ci TaskNode *task = ListEntry(node, TaskNode, node); 26269570cc8Sopenharmony_ci DeleteTask(task); 26369570cc8Sopenharmony_ci} 26469570cc8Sopenharmony_ci 26569570cc8Sopenharmony_ciint CreateThreadMgr(uint32_t maxThreadCount, ThreadMgr *instance) 26669570cc8Sopenharmony_ci{ 26769570cc8Sopenharmony_ci if (g_threadManager != NULL) { 26869570cc8Sopenharmony_ci *instance = (ThreadMgr)g_threadManager; 26969570cc8Sopenharmony_ci return 0; 27069570cc8Sopenharmony_ci } 27169570cc8Sopenharmony_ci 27269570cc8Sopenharmony_ci ThreadManager *mgr = (ThreadManager *)malloc(sizeof(ThreadManager) + maxThreadCount * sizeof(ThreadNode)); 27369570cc8Sopenharmony_ci APPSPAWN_CHECK(mgr != NULL, return -1, "Failed to create thread manager"); 27469570cc8Sopenharmony_ci 27569570cc8Sopenharmony_ci mgr->executorCount = 0; 27669570cc8Sopenharmony_ci mgr->currTaskId = 0; 27769570cc8Sopenharmony_ci mgr->validThreadCount = 0; 27869570cc8Sopenharmony_ci mgr->maxThreadCount = maxThreadCount; 27969570cc8Sopenharmony_ci OH_ListInit(&mgr->taskList); 28069570cc8Sopenharmony_ci OH_ListInit(&mgr->waitingTaskQueue); 28169570cc8Sopenharmony_ci OH_ListInit(&mgr->executingTaskQueue); 28269570cc8Sopenharmony_ci OH_ListInit(&mgr->executorQueue); 28369570cc8Sopenharmony_ci pthread_mutex_init(&mgr->mutex, NULL); 28469570cc8Sopenharmony_ci SetCondAttr(&mgr->cond); 28569570cc8Sopenharmony_ci 28669570cc8Sopenharmony_ci for (uint32_t index = 0; index < maxThreadCount + 1; index++) { 28769570cc8Sopenharmony_ci mgr->threadNode[index].index = index; 28869570cc8Sopenharmony_ci mgr->threadNode[index].threadId = INVALID_THREAD_ID; 28969570cc8Sopenharmony_ci atomic_init(&mgr->threadNode[index].threadExit, 0); 29069570cc8Sopenharmony_ci } 29169570cc8Sopenharmony_ci g_threadManager = mgr; 29269570cc8Sopenharmony_ci int ret = pthread_create(&mgr->threadNode[0].threadId, NULL, ManagerThreadProc, (void *)&mgr->threadNode[0]); 29369570cc8Sopenharmony_ci if (ret != 0) { 29469570cc8Sopenharmony_ci APPSPAWN_LOGE("Failed to create thread for manager"); 29569570cc8Sopenharmony_ci g_threadManager = NULL; 29669570cc8Sopenharmony_ci free(mgr); 29769570cc8Sopenharmony_ci return -1; 29869570cc8Sopenharmony_ci } 29969570cc8Sopenharmony_ci *instance = (ThreadMgr)mgr; 30069570cc8Sopenharmony_ci APPSPAWN_LOGV("Create thread manager success maxThreadCount: %{public}u", maxThreadCount); 30169570cc8Sopenharmony_ci return 0; 30269570cc8Sopenharmony_ci} 30369570cc8Sopenharmony_ci 30469570cc8Sopenharmony_ciint DestroyThreadMgr(ThreadMgr instance) 30569570cc8Sopenharmony_ci{ 30669570cc8Sopenharmony_ci APPSPAWN_LOGV("DestroyThreadMgr"); 30769570cc8Sopenharmony_ci ThreadManager *mgr = (ThreadManager *)instance; 30869570cc8Sopenharmony_ci APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager"); 30969570cc8Sopenharmony_ci 31069570cc8Sopenharmony_ci for (uint32_t index = 0; index < mgr->maxThreadCount + 1; index++) { 31169570cc8Sopenharmony_ci if (mgr->threadNode[index].threadId != INVALID_THREAD_ID) { 31269570cc8Sopenharmony_ci atomic_store(&mgr->threadNode[index].threadExit, 1); 31369570cc8Sopenharmony_ci APPSPAWN_LOGV("DestroyThreadMgr index %{public}d %{public}d", index, mgr->threadNode[index].threadExit); 31469570cc8Sopenharmony_ci } 31569570cc8Sopenharmony_ci } 31669570cc8Sopenharmony_ci pthread_mutex_lock(&mgr->mutex); 31769570cc8Sopenharmony_ci pthread_cond_broadcast(&mgr->cond); 31869570cc8Sopenharmony_ci pthread_mutex_unlock(&mgr->mutex); 31969570cc8Sopenharmony_ci for (uint32_t index = 0; index < mgr->maxThreadCount + 1; index++) { 32069570cc8Sopenharmony_ci if (mgr->threadNode[index].threadId != INVALID_THREAD_ID) { 32169570cc8Sopenharmony_ci pthread_join(mgr->threadNode[index].threadId, NULL); 32269570cc8Sopenharmony_ci APPSPAWN_LOGV("DestroyThreadMgr index %{public}d end", index); 32369570cc8Sopenharmony_ci } 32469570cc8Sopenharmony_ci } 32569570cc8Sopenharmony_ci 32669570cc8Sopenharmony_ci pthread_mutex_lock(&mgr->mutex); 32769570cc8Sopenharmony_ci OH_ListRemoveAll(&mgr->taskList, TaskQueueDestroyProc); 32869570cc8Sopenharmony_ci OH_ListRemoveAll(&mgr->waitingTaskQueue, TaskQueueDestroyProc); 32969570cc8Sopenharmony_ci OH_ListRemoveAll(&mgr->executingTaskQueue, TaskQueueDestroyProc); 33069570cc8Sopenharmony_ci OH_ListRemoveAll(&mgr->executorQueue, TaskQueueDestroyProc); 33169570cc8Sopenharmony_ci pthread_mutex_unlock(&mgr->mutex); 33269570cc8Sopenharmony_ci 33369570cc8Sopenharmony_ci pthread_cond_destroy(&mgr->cond); 33469570cc8Sopenharmony_ci pthread_mutex_destroy(&mgr->mutex); 33569570cc8Sopenharmony_ci return 0; 33669570cc8Sopenharmony_ci} 33769570cc8Sopenharmony_ci 33869570cc8Sopenharmony_ciint ThreadMgrAddTask(ThreadMgr instance, ThreadTaskHandle *taskHandle) 33969570cc8Sopenharmony_ci{ 34069570cc8Sopenharmony_ci ThreadManager *mgr = (ThreadManager *)instance; 34169570cc8Sopenharmony_ci APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager"); 34269570cc8Sopenharmony_ci TaskNode *task = (TaskNode *)malloc(sizeof(TaskNode)); 34369570cc8Sopenharmony_ci APPSPAWN_CHECK(task != NULL, return -1, "Failed to create thread task"); 34469570cc8Sopenharmony_ci 34569570cc8Sopenharmony_ci task->context = NULL; 34669570cc8Sopenharmony_ci task->finishProcess = NULL; 34769570cc8Sopenharmony_ci task->totalTask = 0; 34869570cc8Sopenharmony_ci atomic_init(&task->taskFlags, 0); 34969570cc8Sopenharmony_ci atomic_init(&task->finishTaskCount, 0); 35069570cc8Sopenharmony_ci OH_ListInit(&task->node); 35169570cc8Sopenharmony_ci OH_ListInit(&task->executorList); 35269570cc8Sopenharmony_ci pthread_mutex_init(&task->mutex, NULL); 35369570cc8Sopenharmony_ci SetCondAttr(&task->cond); 35469570cc8Sopenharmony_ci 35569570cc8Sopenharmony_ci pthread_mutex_lock(&mgr->mutex); 35669570cc8Sopenharmony_ci task->taskId = mgr->currTaskId++; 35769570cc8Sopenharmony_ci OH_ListAddTail(&mgr->taskList, &task->node); 35869570cc8Sopenharmony_ci pthread_mutex_unlock(&mgr->mutex); 35969570cc8Sopenharmony_ci *taskHandle = task->taskId; 36069570cc8Sopenharmony_ci APPSPAWN_LOGV("Create thread task success task id: %{public}u", task->taskId); 36169570cc8Sopenharmony_ci return 0; 36269570cc8Sopenharmony_ci} 36369570cc8Sopenharmony_ci 36469570cc8Sopenharmony_ciint ThreadMgrAddExecutor(ThreadMgr instance, 36569570cc8Sopenharmony_ci ThreadTaskHandle taskHandle, TaskExecutor executor, const ThreadContext *context) 36669570cc8Sopenharmony_ci{ 36769570cc8Sopenharmony_ci ThreadManager *mgr = (ThreadManager *)instance; 36869570cc8Sopenharmony_ci APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager"); 36969570cc8Sopenharmony_ci TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle); 37069570cc8Sopenharmony_ci APPSPAWN_CHECK(task != NULL, return -1, "Invalid thread task %{public}u", taskHandle); 37169570cc8Sopenharmony_ci 37269570cc8Sopenharmony_ci TaskExecuteNode *node = (TaskExecuteNode *)malloc(sizeof(TaskExecuteNode)); 37369570cc8Sopenharmony_ci APPSPAWN_CHECK(node != NULL, return -1, "Failed to create thread executor for task %{public}u", taskHandle); 37469570cc8Sopenharmony_ci node->task = task; 37569570cc8Sopenharmony_ci OH_ListInit(&node->node); 37669570cc8Sopenharmony_ci OH_ListInit(&node->executeNode); 37769570cc8Sopenharmony_ci node->context = context; 37869570cc8Sopenharmony_ci node->executor = executor; 37969570cc8Sopenharmony_ci task->totalTask++; 38069570cc8Sopenharmony_ci OH_ListAddTail(&task->executorList, &node->node); 38169570cc8Sopenharmony_ci return 0; 38269570cc8Sopenharmony_ci} 38369570cc8Sopenharmony_ci 38469570cc8Sopenharmony_ciint ThreadMgrCancelTask(ThreadMgr instance, ThreadTaskHandle taskHandle) 38569570cc8Sopenharmony_ci{ 38669570cc8Sopenharmony_ci ThreadManager *mgr = (ThreadManager *)instance; 38769570cc8Sopenharmony_ci APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager"); 38869570cc8Sopenharmony_ci TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle); 38969570cc8Sopenharmony_ci if (task != NULL) { 39069570cc8Sopenharmony_ci SafeRemoveTask(mgr, task); 39169570cc8Sopenharmony_ci DeleteTask(task); 39269570cc8Sopenharmony_ci return 0; 39369570cc8Sopenharmony_ci } 39469570cc8Sopenharmony_ci task = GetTask(mgr, &mgr->waitingTaskQueue, taskHandle); 39569570cc8Sopenharmony_ci if (task != NULL) { 39669570cc8Sopenharmony_ci SafeRemoveTask(mgr, task); 39769570cc8Sopenharmony_ci DeleteTask(task); 39869570cc8Sopenharmony_ci return 0; 39969570cc8Sopenharmony_ci } 40069570cc8Sopenharmony_ci task = GetTask(mgr, &mgr->executingTaskQueue, taskHandle); 40169570cc8Sopenharmony_ci if (task != NULL) { 40269570cc8Sopenharmony_ci SafeRemoveTask(mgr, task); 40369570cc8Sopenharmony_ci DeleteTask(task); 40469570cc8Sopenharmony_ci return 0; 40569570cc8Sopenharmony_ci } 40669570cc8Sopenharmony_ci return 0; 40769570cc8Sopenharmony_ci} 40869570cc8Sopenharmony_ci 40969570cc8Sopenharmony_ciint TaskSyncExecute(ThreadMgr instance, ThreadTaskHandle taskHandle) 41069570cc8Sopenharmony_ci{ 41169570cc8Sopenharmony_ci ThreadManager *mgr = (ThreadManager *)instance; 41269570cc8Sopenharmony_ci APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager"); 41369570cc8Sopenharmony_ci TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle); 41469570cc8Sopenharmony_ci APPSPAWN_CHECK(task != NULL, return -1, "Invalid thread task %{public}u", taskHandle); 41569570cc8Sopenharmony_ci 41669570cc8Sopenharmony_ci pthread_mutex_lock(&task->mutex); 41769570cc8Sopenharmony_ci OH_ListRemove(&task->node); 41869570cc8Sopenharmony_ci OH_ListInit(&task->node); 41969570cc8Sopenharmony_ci OH_ListAddTail(&mgr->waitingTaskQueue, &task->node); 42069570cc8Sopenharmony_ci pthread_cond_broadcast(&mgr->cond); 42169570cc8Sopenharmony_ci pthread_mutex_unlock(&task->mutex); 42269570cc8Sopenharmony_ci APPSPAWN_LOGV("TaskSyncExecute task: %{public}u", task->taskId); 42369570cc8Sopenharmony_ci struct timespec abstime; 42469570cc8Sopenharmony_ci int ret = 0; 42569570cc8Sopenharmony_ci do { 42669570cc8Sopenharmony_ci ConvertToTimespec(60 * 1000, &abstime); // wait 60 * 1000 60s 42769570cc8Sopenharmony_ci pthread_mutex_lock(&task->mutex); 42869570cc8Sopenharmony_ci ret = pthread_cond_timedwait(&task->cond, &task->mutex, &abstime); 42969570cc8Sopenharmony_ci pthread_mutex_unlock(&task->mutex); 43069570cc8Sopenharmony_ci APPSPAWN_LOGV("TaskSyncExecute success task id: %{public}u ret: %{public}d", task->taskId, ret); 43169570cc8Sopenharmony_ci } while (ret == ETIMEDOUT); 43269570cc8Sopenharmony_ci 43369570cc8Sopenharmony_ci DeleteTask(task); 43469570cc8Sopenharmony_ci return ret; 43569570cc8Sopenharmony_ci} 43669570cc8Sopenharmony_ci 43769570cc8Sopenharmony_ciint TaskExecute(ThreadMgr instance, 43869570cc8Sopenharmony_ci ThreadTaskHandle taskHandle, TaskFinishProcessor process, const ThreadContext *context) 43969570cc8Sopenharmony_ci{ 44069570cc8Sopenharmony_ci ThreadManager *mgr = (ThreadManager *)instance; 44169570cc8Sopenharmony_ci APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager"); 44269570cc8Sopenharmony_ci TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle); 44369570cc8Sopenharmony_ci APPSPAWN_CHECK(task != NULL, return -1, "Invalid thread task %{public}u", taskHandle); 44469570cc8Sopenharmony_ci 44569570cc8Sopenharmony_ci task->finishProcess = process; 44669570cc8Sopenharmony_ci task->context = context; 44769570cc8Sopenharmony_ci pthread_mutex_lock(&mgr->mutex); 44869570cc8Sopenharmony_ci OH_ListRemove(&task->node); 44969570cc8Sopenharmony_ci OH_ListInit(&task->node); 45069570cc8Sopenharmony_ci OH_ListAddTail(&mgr->waitingTaskQueue, &task->node); 45169570cc8Sopenharmony_ci pthread_cond_broadcast(&mgr->cond); 45269570cc8Sopenharmony_ci pthread_mutex_unlock(&mgr->mutex); 45369570cc8Sopenharmony_ci APPSPAWN_LOGV("TaskExecute task: %{public}u", task->taskId); 45469570cc8Sopenharmony_ci return 0; 45569570cc8Sopenharmony_ci} 45669570cc8Sopenharmony_ci 45769570cc8Sopenharmony_cistatic void CheckAndCreateNewThread(ThreadManager *mgr) 45869570cc8Sopenharmony_ci{ 45969570cc8Sopenharmony_ci if (mgr->maxThreadCount <= mgr->validThreadCount) { 46069570cc8Sopenharmony_ci return; 46169570cc8Sopenharmony_ci } 46269570cc8Sopenharmony_ci if (mgr->executorCount <= mgr->validThreadCount) { 46369570cc8Sopenharmony_ci return; 46469570cc8Sopenharmony_ci } 46569570cc8Sopenharmony_ci APPSPAWN_LOGV("CheckAndCreateNewThread maxThreadCount: %{public}u validThreadCount: %{public}u %{public}u", 46669570cc8Sopenharmony_ci mgr->maxThreadCount, mgr->validThreadCount, mgr->executorCount); 46769570cc8Sopenharmony_ci 46869570cc8Sopenharmony_ci uint32_t totalThread = mgr->maxThreadCount; 46969570cc8Sopenharmony_ci if (mgr->executorCount <= mgr->maxThreadCount) { 47069570cc8Sopenharmony_ci totalThread = mgr->executorCount; 47169570cc8Sopenharmony_ci } 47269570cc8Sopenharmony_ci 47369570cc8Sopenharmony_ci for (uint32_t index = 0; index < mgr->maxThreadCount + 1; index++) { 47469570cc8Sopenharmony_ci if (mgr->threadNode[index].threadId != INVALID_THREAD_ID) { 47569570cc8Sopenharmony_ci continue; 47669570cc8Sopenharmony_ci } 47769570cc8Sopenharmony_ci int ret = pthread_create(&mgr->threadNode[index].threadId, 47869570cc8Sopenharmony_ci NULL, ThreadExecute, (void *)&(mgr->threadNode[index])); 47969570cc8Sopenharmony_ci APPSPAWN_CHECK(ret == 0, return, "Failed to create thread for %{public}u", index); 48069570cc8Sopenharmony_ci APPSPAWN_LOGV("Create thread success index: %{public}u", mgr->threadNode[index].index); 48169570cc8Sopenharmony_ci mgr->validThreadCount++; 48269570cc8Sopenharmony_ci if (mgr->validThreadCount >= totalThread) { 48369570cc8Sopenharmony_ci return; 48469570cc8Sopenharmony_ci } 48569570cc8Sopenharmony_ci } 48669570cc8Sopenharmony_ci return; 48769570cc8Sopenharmony_ci} 48869570cc8Sopenharmony_ci 48969570cc8Sopenharmony_cistatic void *ManagerThreadProc(void *args) 49069570cc8Sopenharmony_ci{ 49169570cc8Sopenharmony_ci ThreadManager *mgr = g_threadManager; 49269570cc8Sopenharmony_ci ThreadNode *threadNode = (ThreadNode *)args; 49369570cc8Sopenharmony_ci struct timespec abstime; 49469570cc8Sopenharmony_ci while (!threadNode->threadExit) { 49569570cc8Sopenharmony_ci pthread_mutex_lock(&mgr->mutex); 49669570cc8Sopenharmony_ci do { 49769570cc8Sopenharmony_ci uint32_t timeout = 60 * 1000; // 60 * 1000 60s 49869570cc8Sopenharmony_ci if (!ListEmpty(mgr->waitingTaskQueue)) { 49969570cc8Sopenharmony_ci break; 50069570cc8Sopenharmony_ci } 50169570cc8Sopenharmony_ci if (!ListEmpty(mgr->executingTaskQueue)) { 50269570cc8Sopenharmony_ci timeout = 500; // 500ms 50369570cc8Sopenharmony_ci } 50469570cc8Sopenharmony_ci ConvertToTimespec(timeout, &abstime); 50569570cc8Sopenharmony_ci int ret = pthread_cond_timedwait(&mgr->cond, &mgr->mutex, &abstime); 50669570cc8Sopenharmony_ci if (!ListEmpty(mgr->executingTaskQueue) || ret == ETIMEDOUT) { 50769570cc8Sopenharmony_ci break; 50869570cc8Sopenharmony_ci } 50969570cc8Sopenharmony_ci if (threadNode->threadExit) { 51069570cc8Sopenharmony_ci break; 51169570cc8Sopenharmony_ci } 51269570cc8Sopenharmony_ci } while (1); 51369570cc8Sopenharmony_ci pthread_mutex_unlock(&mgr->mutex); 51469570cc8Sopenharmony_ci 51569570cc8Sopenharmony_ci ExecuteTask(mgr); 51669570cc8Sopenharmony_ci CheckAndCreateNewThread(mgr); 51769570cc8Sopenharmony_ci 51869570cc8Sopenharmony_ci if (mgr->validThreadCount == 0) { 51969570cc8Sopenharmony_ci RunExecutor(mgr, threadNode, 5); // 5 max thread 52069570cc8Sopenharmony_ci } 52169570cc8Sopenharmony_ci CheckTaskComplete(mgr); 52269570cc8Sopenharmony_ci } 52369570cc8Sopenharmony_ci return 0; 52469570cc8Sopenharmony_ci} 52569570cc8Sopenharmony_ci 52669570cc8Sopenharmony_cistatic void *ThreadExecute(void *args) 52769570cc8Sopenharmony_ci{ 52869570cc8Sopenharmony_ci ThreadManager *mgr = g_threadManager; 52969570cc8Sopenharmony_ci ThreadNode *threadNode = (ThreadNode *)args; 53069570cc8Sopenharmony_ci struct timespec abstime; 53169570cc8Sopenharmony_ci while (!threadNode->threadExit) { 53269570cc8Sopenharmony_ci pthread_mutex_lock(&mgr->mutex); 53369570cc8Sopenharmony_ci while (ListEmpty(mgr->executorQueue) && !threadNode->threadExit) { 53469570cc8Sopenharmony_ci ConvertToTimespec(60 * 1000, &abstime); // 60 * 1000 60s 53569570cc8Sopenharmony_ci pthread_cond_timedwait(&mgr->cond, &mgr->mutex, &abstime); 53669570cc8Sopenharmony_ci } 53769570cc8Sopenharmony_ci pthread_mutex_unlock(&mgr->mutex); 53869570cc8Sopenharmony_ci APPSPAWN_LOGV("bbbb threadNode->threadExit %{public}d", threadNode->threadExit); 53969570cc8Sopenharmony_ci RunExecutor(mgr, threadNode, 1); 54069570cc8Sopenharmony_ci } 54169570cc8Sopenharmony_ci return NULL; 54269570cc8Sopenharmony_ci} 543