169570cc8Sopenharmony_ci/*
269570cc8Sopenharmony_ci * Copyright (c) 2024 Huawei Device Co., Ltd.
369570cc8Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
469570cc8Sopenharmony_ci * you may not use this file except in compliance with the License.
569570cc8Sopenharmony_ci * You may obtain a copy of the License at
669570cc8Sopenharmony_ci *
769570cc8Sopenharmony_ci *     http://www.apache.org/licenses/LICENSE-2.0
869570cc8Sopenharmony_ci *
969570cc8Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software
1069570cc8Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
1169570cc8Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1269570cc8Sopenharmony_ci * See the License for the specific language governing permissions and
1369570cc8Sopenharmony_ci * limitations under the License.
1469570cc8Sopenharmony_ci */
1569570cc8Sopenharmony_ci#include "thread_manager.h"
1669570cc8Sopenharmony_ci
1769570cc8Sopenharmony_ci#include <errno.h>
1869570cc8Sopenharmony_ci#include <pthread.h>
1969570cc8Sopenharmony_ci#include <stdatomic.h>
2069570cc8Sopenharmony_ci
2169570cc8Sopenharmony_ci#include "appspawn_utils.h"
2269570cc8Sopenharmony_ci#include "list.h"
2369570cc8Sopenharmony_ci
2469570cc8Sopenharmony_citypedef struct {
2569570cc8Sopenharmony_ci    atomic_uint threadExit;
2669570cc8Sopenharmony_ci    uint32_t index;
2769570cc8Sopenharmony_ci    pthread_t threadId;
2869570cc8Sopenharmony_ci} ThreadNode;
2969570cc8Sopenharmony_ci
3069570cc8Sopenharmony_citypedef struct {
3169570cc8Sopenharmony_ci    pthread_mutex_t mutex;        // 保护执行队列
3269570cc8Sopenharmony_ci    pthread_cond_t cond;          // 线程等待条件
3369570cc8Sopenharmony_ci    ListNode taskList;            // 任务队列,任务还没有启动
3469570cc8Sopenharmony_ci    ListNode waitingTaskQueue;    // 启动的任务,排队等待执行
3569570cc8Sopenharmony_ci    ListNode executingTaskQueue;  // 正在执行
3669570cc8Sopenharmony_ci    ListNode executorQueue;       // 执行节点,保存 TaskExecuteNode
3769570cc8Sopenharmony_ci    uint32_t executorCount;
3869570cc8Sopenharmony_ci    uint32_t maxThreadCount;
3969570cc8Sopenharmony_ci    uint32_t currTaskId;
4069570cc8Sopenharmony_ci    struct timespec lastAdjust;
4169570cc8Sopenharmony_ci    uint32_t validThreadCount;
4269570cc8Sopenharmony_ci    ThreadNode threadNode[1];  // 线程信息,控制线程的退出和结束
4369570cc8Sopenharmony_ci} ThreadManager;
4469570cc8Sopenharmony_ci
4569570cc8Sopenharmony_citypedef struct {
4669570cc8Sopenharmony_ci    uint32_t taskId;
4769570cc8Sopenharmony_ci    ListNode node;
4869570cc8Sopenharmony_ci    ListNode executorList;
4969570cc8Sopenharmony_ci    uint32_t totalTask;
5069570cc8Sopenharmony_ci    atomic_uint taskFlags;  // 表示任务是否被取消,各线程检查后决定任务线程是否结束
5169570cc8Sopenharmony_ci    atomic_uint finishTaskCount;
5269570cc8Sopenharmony_ci    const ThreadContext *context;
5369570cc8Sopenharmony_ci    TaskFinishProcessor finishProcess;
5469570cc8Sopenharmony_ci    pthread_mutex_t mutex;  // 保护执行队列
5569570cc8Sopenharmony_ci    pthread_cond_t cond;    // 同步执行时,等待确认
5669570cc8Sopenharmony_ci} TaskNode;
5769570cc8Sopenharmony_ci
5869570cc8Sopenharmony_citypedef struct {
5969570cc8Sopenharmony_ci    ListNode node;         // 保存sub task到对应的task,方便管理
6069570cc8Sopenharmony_ci    ListNode executeNode;  // 等待处理的任务节点
6169570cc8Sopenharmony_ci    TaskNode *task;
6269570cc8Sopenharmony_ci    const ThreadContext *context;
6369570cc8Sopenharmony_ci    TaskExecutor executor;
6469570cc8Sopenharmony_ci} TaskExecuteNode;
6569570cc8Sopenharmony_ci
6669570cc8Sopenharmony_cistatic ThreadManager *g_threadManager = NULL;
6769570cc8Sopenharmony_ci
6869570cc8Sopenharmony_cistatic void *ManagerThreadProc(void *args);
6969570cc8Sopenharmony_cistatic void *ThreadExecute(void *args);
7069570cc8Sopenharmony_ci
7169570cc8Sopenharmony_cistatic void SetCondAttr(pthread_cond_t *cond)
7269570cc8Sopenharmony_ci{
7369570cc8Sopenharmony_ci    pthread_condattr_t attr;
7469570cc8Sopenharmony_ci    pthread_condattr_init(&attr);
7569570cc8Sopenharmony_ci    pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
7669570cc8Sopenharmony_ci    pthread_cond_init(cond, &attr);
7769570cc8Sopenharmony_ci    pthread_condattr_destroy(&attr);
7869570cc8Sopenharmony_ci}
7969570cc8Sopenharmony_ci
8069570cc8Sopenharmony_cistatic void ConvertToTimespec(int time, struct timespec *tm)
8169570cc8Sopenharmony_ci{
8269570cc8Sopenharmony_ci    struct timespec start;
8369570cc8Sopenharmony_ci    clock_gettime(CLOCK_MONOTONIC, &start);
8469570cc8Sopenharmony_ci    uint64_t ns = time;
8569570cc8Sopenharmony_ci    ns *= APPSPAWN_MSEC_TO_NSEC;
8669570cc8Sopenharmony_ci    ns += start.tv_sec * APPSPAWN_SEC_TO_NSEC + start.tv_nsec;
8769570cc8Sopenharmony_ci    tm->tv_sec = ns / APPSPAWN_SEC_TO_NSEC;
8869570cc8Sopenharmony_ci    tm->tv_nsec = ns % APPSPAWN_SEC_TO_NSEC;
8969570cc8Sopenharmony_ci}
9069570cc8Sopenharmony_ci
9169570cc8Sopenharmony_cistatic TaskExecuteNode *PopTaskExecutor(ThreadManager *mgr)
9269570cc8Sopenharmony_ci{
9369570cc8Sopenharmony_ci    TaskExecuteNode *executor = NULL;
9469570cc8Sopenharmony_ci    pthread_mutex_lock(&mgr->mutex);
9569570cc8Sopenharmony_ci    ListNode *node = mgr->executorQueue.next;
9669570cc8Sopenharmony_ci    if (node != &mgr->executorQueue) {
9769570cc8Sopenharmony_ci        OH_ListRemove(node);
9869570cc8Sopenharmony_ci        OH_ListInit(node);
9969570cc8Sopenharmony_ci        executor = ListEntry(node, TaskExecuteNode, executeNode);
10069570cc8Sopenharmony_ci        mgr->executorCount--;
10169570cc8Sopenharmony_ci    }
10269570cc8Sopenharmony_ci    pthread_mutex_unlock(&mgr->mutex);
10369570cc8Sopenharmony_ci    return executor;
10469570cc8Sopenharmony_ci}
10569570cc8Sopenharmony_ci
10669570cc8Sopenharmony_cistatic int AddExecutor(ThreadManager *mgr, const TaskNode *task)
10769570cc8Sopenharmony_ci{
10869570cc8Sopenharmony_ci    ListNode *node = task->executorList.next;
10969570cc8Sopenharmony_ci    while (node != &task->executorList) {
11069570cc8Sopenharmony_ci        TaskExecuteNode *executor = ListEntry(node, TaskExecuteNode, node);
11169570cc8Sopenharmony_ci        APPSPAWN_LOGV("AddExecutor task: %{public}u executorCount: %{public}u executor: %{public}u",
11269570cc8Sopenharmony_ci            task->taskId, mgr->executorCount, executor->task->taskId);
11369570cc8Sopenharmony_ci
11469570cc8Sopenharmony_ci        // 插入尾部执行
11569570cc8Sopenharmony_ci        pthread_mutex_lock(&mgr->mutex);
11669570cc8Sopenharmony_ci        OH_ListRemove(&executor->executeNode);
11769570cc8Sopenharmony_ci        OH_ListInit(&executor->executeNode);
11869570cc8Sopenharmony_ci        OH_ListAddTail(&mgr->executorQueue, &executor->executeNode);
11969570cc8Sopenharmony_ci        mgr->executorCount++;
12069570cc8Sopenharmony_ci        pthread_mutex_unlock(&mgr->mutex);
12169570cc8Sopenharmony_ci
12269570cc8Sopenharmony_ci        node = node->next;
12369570cc8Sopenharmony_ci    }
12469570cc8Sopenharmony_ci    return 0;
12569570cc8Sopenharmony_ci}
12669570cc8Sopenharmony_ci
12769570cc8Sopenharmony_cistatic void RunExecutor(ThreadManager *mgr, ThreadNode *threadNode, uint32_t maxCount)
12869570cc8Sopenharmony_ci{
12969570cc8Sopenharmony_ci    APPSPAWN_LOGV("RunExecutor in thread: %{public}d executorCount: %{public}u ",
13069570cc8Sopenharmony_ci        threadNode->index, mgr->executorCount);
13169570cc8Sopenharmony_ci    TaskExecuteNode *executor = PopTaskExecutor(mgr);
13269570cc8Sopenharmony_ci    uint32_t count = 0;
13369570cc8Sopenharmony_ci    while (executor != NULL && !threadNode->threadExit) {
13469570cc8Sopenharmony_ci        APPSPAWN_LOGV("RunExecutor task: %{public}u", executor->task->taskId);
13569570cc8Sopenharmony_ci        atomic_fetch_add(&executor->task->finishTaskCount, 1);
13669570cc8Sopenharmony_ci        executor->executor(executor->task->taskId, executor->context);
13769570cc8Sopenharmony_ci        count++;
13869570cc8Sopenharmony_ci        if (count >= maxCount) {
13969570cc8Sopenharmony_ci            break;
14069570cc8Sopenharmony_ci        }
14169570cc8Sopenharmony_ci        executor = PopTaskExecutor(mgr);
14269570cc8Sopenharmony_ci    }
14369570cc8Sopenharmony_ci    APPSPAWN_LOGV("RunExecutor executorCount: %{public}u end", mgr->executorCount);
14469570cc8Sopenharmony_ci}
14569570cc8Sopenharmony_ci
14669570cc8Sopenharmony_cistatic int TaskCompareTaskId(ListNode *node, void *data)
14769570cc8Sopenharmony_ci{
14869570cc8Sopenharmony_ci    TaskNode *task = ListEntry(node, TaskNode, node);
14969570cc8Sopenharmony_ci    return task->taskId - *(uint32_t *)data;
15069570cc8Sopenharmony_ci}
15169570cc8Sopenharmony_ci
15269570cc8Sopenharmony_cistatic TaskNode *GetTask(ThreadManager *mgr, ListNode *queue, uint32_t taskId)
15369570cc8Sopenharmony_ci{
15469570cc8Sopenharmony_ci    ListNode *node = NULL;
15569570cc8Sopenharmony_ci    pthread_mutex_lock(&mgr->mutex);
15669570cc8Sopenharmony_ci    node = OH_ListFind(queue, &taskId, TaskCompareTaskId);
15769570cc8Sopenharmony_ci    pthread_mutex_unlock(&mgr->mutex);
15869570cc8Sopenharmony_ci    if (node == NULL) {
15969570cc8Sopenharmony_ci        return NULL;
16069570cc8Sopenharmony_ci    }
16169570cc8Sopenharmony_ci    return ListEntry(node, TaskNode, node);
16269570cc8Sopenharmony_ci}
16369570cc8Sopenharmony_ci
16469570cc8Sopenharmony_cistatic void DeleteTask(TaskNode *task)
16569570cc8Sopenharmony_ci{
16669570cc8Sopenharmony_ci    APPSPAWN_LOGV("DeleteTask task: %{public}u ", task->taskId);
16769570cc8Sopenharmony_ci
16869570cc8Sopenharmony_ci    if (!ListEmpty(task->node)) {
16969570cc8Sopenharmony_ci        return;
17069570cc8Sopenharmony_ci    }
17169570cc8Sopenharmony_ci    OH_ListRemoveAll(&task->executorList, NULL);
17269570cc8Sopenharmony_ci    pthread_cond_destroy(&task->cond);
17369570cc8Sopenharmony_ci    pthread_mutex_destroy(&task->mutex);
17469570cc8Sopenharmony_ci    free(task);
17569570cc8Sopenharmony_ci}
17669570cc8Sopenharmony_ci
17769570cc8Sopenharmony_cistatic TaskNode *PopTask(ThreadManager *mgr, ListNode *queue)
17869570cc8Sopenharmony_ci{
17969570cc8Sopenharmony_ci    TaskNode *task = NULL;
18069570cc8Sopenharmony_ci    pthread_mutex_lock(&mgr->mutex);
18169570cc8Sopenharmony_ci    ListNode *node = queue->next;
18269570cc8Sopenharmony_ci    if (node != queue) {
18369570cc8Sopenharmony_ci        OH_ListRemove(node);
18469570cc8Sopenharmony_ci        OH_ListInit(node);
18569570cc8Sopenharmony_ci        task = ListEntry(node, TaskNode, node);
18669570cc8Sopenharmony_ci    }
18769570cc8Sopenharmony_ci    pthread_mutex_unlock(&mgr->mutex);
18869570cc8Sopenharmony_ci    return task;
18969570cc8Sopenharmony_ci}
19069570cc8Sopenharmony_ci
19169570cc8Sopenharmony_cistatic void PushTask(ThreadManager *mgr, TaskNode *task, ListNode *queue)
19269570cc8Sopenharmony_ci{
19369570cc8Sopenharmony_ci    pthread_mutex_lock(&mgr->mutex);
19469570cc8Sopenharmony_ci    OH_ListAddTail(queue, &task->node);
19569570cc8Sopenharmony_ci    pthread_cond_broadcast(&mgr->cond);
19669570cc8Sopenharmony_ci    pthread_mutex_unlock(&mgr->mutex);
19769570cc8Sopenharmony_ci}
19869570cc8Sopenharmony_ci
19969570cc8Sopenharmony_cistatic void SafeRemoveTask(ThreadManager *mgr, TaskNode *task)
20069570cc8Sopenharmony_ci{
20169570cc8Sopenharmony_ci    pthread_mutex_lock(&mgr->mutex);
20269570cc8Sopenharmony_ci    OH_ListRemove(&task->node);
20369570cc8Sopenharmony_ci    OH_ListInit(&task->node);
20469570cc8Sopenharmony_ci    pthread_mutex_unlock(&mgr->mutex);
20569570cc8Sopenharmony_ci
20669570cc8Sopenharmony_ci    ListNode *node = task->executorList.next;
20769570cc8Sopenharmony_ci    while (node != &task->executorList) {
20869570cc8Sopenharmony_ci        OH_ListRemove(node);
20969570cc8Sopenharmony_ci        OH_ListInit(node);
21069570cc8Sopenharmony_ci        TaskExecuteNode *executor = ListEntry(node, TaskExecuteNode, node);
21169570cc8Sopenharmony_ci        pthread_mutex_lock(&mgr->mutex);
21269570cc8Sopenharmony_ci        if (!ListEmpty(executor->executeNode)) {
21369570cc8Sopenharmony_ci            OH_ListRemove(&executor->executeNode);
21469570cc8Sopenharmony_ci            OH_ListInit(&executor->executeNode);
21569570cc8Sopenharmony_ci            mgr->executorCount--;
21669570cc8Sopenharmony_ci        }
21769570cc8Sopenharmony_ci        pthread_mutex_unlock(&mgr->mutex);
21869570cc8Sopenharmony_ci        free(executor);
21969570cc8Sopenharmony_ci
22069570cc8Sopenharmony_ci        node = task->executorList.next;
22169570cc8Sopenharmony_ci    }
22269570cc8Sopenharmony_ci}
22369570cc8Sopenharmony_ci
22469570cc8Sopenharmony_cistatic void ExecuteTask(ThreadManager *mgr)
22569570cc8Sopenharmony_ci{
22669570cc8Sopenharmony_ci    TaskNode *task = PopTask(mgr, &mgr->waitingTaskQueue);
22769570cc8Sopenharmony_ci    if (task == NULL) {
22869570cc8Sopenharmony_ci        return;
22969570cc8Sopenharmony_ci    }
23069570cc8Sopenharmony_ci
23169570cc8Sopenharmony_ci    APPSPAWN_LOGV("ExecuteTask task: %{public}u ", task->taskId);
23269570cc8Sopenharmony_ci    AddExecutor(mgr, task);
23369570cc8Sopenharmony_ci    PushTask(mgr, task, &mgr->executingTaskQueue);
23469570cc8Sopenharmony_ci    return;
23569570cc8Sopenharmony_ci}
23669570cc8Sopenharmony_ci
23769570cc8Sopenharmony_cistatic void CheckTaskComplete(ThreadManager *mgr)
23869570cc8Sopenharmony_ci{
23969570cc8Sopenharmony_ci    TaskNode *task = PopTask(mgr, &mgr->executingTaskQueue);
24069570cc8Sopenharmony_ci    if (task == NULL) {
24169570cc8Sopenharmony_ci        return;
24269570cc8Sopenharmony_ci    }
24369570cc8Sopenharmony_ci    if (task->totalTask <= atomic_load(&task->finishTaskCount)) {
24469570cc8Sopenharmony_ci        if (task->finishProcess != NULL) {
24569570cc8Sopenharmony_ci            task->finishProcess(task->taskId, task->context);
24669570cc8Sopenharmony_ci            DeleteTask(task);
24769570cc8Sopenharmony_ci            return;
24869570cc8Sopenharmony_ci        }
24969570cc8Sopenharmony_ci        pthread_mutex_lock(&task->mutex);
25069570cc8Sopenharmony_ci        pthread_cond_signal(&task->cond);
25169570cc8Sopenharmony_ci        pthread_mutex_unlock(&task->mutex);
25269570cc8Sopenharmony_ci        return;
25369570cc8Sopenharmony_ci    }
25469570cc8Sopenharmony_ci    PushTask(mgr, task, &mgr->executingTaskQueue);
25569570cc8Sopenharmony_ci    return;
25669570cc8Sopenharmony_ci}
25769570cc8Sopenharmony_ci
25869570cc8Sopenharmony_cistatic void TaskQueueDestroyProc(ListNode *node)
25969570cc8Sopenharmony_ci{
26069570cc8Sopenharmony_ci    OH_ListRemove(node);
26169570cc8Sopenharmony_ci    TaskNode *task = ListEntry(node, TaskNode, node);
26269570cc8Sopenharmony_ci    DeleteTask(task);
26369570cc8Sopenharmony_ci}
26469570cc8Sopenharmony_ci
26569570cc8Sopenharmony_ciint CreateThreadMgr(uint32_t maxThreadCount, ThreadMgr *instance)
26669570cc8Sopenharmony_ci{
26769570cc8Sopenharmony_ci    if (g_threadManager != NULL) {
26869570cc8Sopenharmony_ci        *instance = (ThreadMgr)g_threadManager;
26969570cc8Sopenharmony_ci        return 0;
27069570cc8Sopenharmony_ci    }
27169570cc8Sopenharmony_ci
27269570cc8Sopenharmony_ci    ThreadManager *mgr = (ThreadManager *)malloc(sizeof(ThreadManager) + maxThreadCount * sizeof(ThreadNode));
27369570cc8Sopenharmony_ci    APPSPAWN_CHECK(mgr != NULL, return -1, "Failed to create thread manager");
27469570cc8Sopenharmony_ci
27569570cc8Sopenharmony_ci    mgr->executorCount = 0;
27669570cc8Sopenharmony_ci    mgr->currTaskId = 0;
27769570cc8Sopenharmony_ci    mgr->validThreadCount = 0;
27869570cc8Sopenharmony_ci    mgr->maxThreadCount = maxThreadCount;
27969570cc8Sopenharmony_ci    OH_ListInit(&mgr->taskList);
28069570cc8Sopenharmony_ci    OH_ListInit(&mgr->waitingTaskQueue);
28169570cc8Sopenharmony_ci    OH_ListInit(&mgr->executingTaskQueue);
28269570cc8Sopenharmony_ci    OH_ListInit(&mgr->executorQueue);
28369570cc8Sopenharmony_ci    pthread_mutex_init(&mgr->mutex, NULL);
28469570cc8Sopenharmony_ci    SetCondAttr(&mgr->cond);
28569570cc8Sopenharmony_ci
28669570cc8Sopenharmony_ci    for (uint32_t index = 0; index < maxThreadCount + 1; index++) {
28769570cc8Sopenharmony_ci        mgr->threadNode[index].index = index;
28869570cc8Sopenharmony_ci        mgr->threadNode[index].threadId = INVALID_THREAD_ID;
28969570cc8Sopenharmony_ci        atomic_init(&mgr->threadNode[index].threadExit, 0);
29069570cc8Sopenharmony_ci    }
29169570cc8Sopenharmony_ci    g_threadManager = mgr;
29269570cc8Sopenharmony_ci    int ret = pthread_create(&mgr->threadNode[0].threadId, NULL, ManagerThreadProc, (void *)&mgr->threadNode[0]);
29369570cc8Sopenharmony_ci    if (ret != 0) {
29469570cc8Sopenharmony_ci        APPSPAWN_LOGE("Failed to create thread for manager");
29569570cc8Sopenharmony_ci        g_threadManager = NULL;
29669570cc8Sopenharmony_ci        free(mgr);
29769570cc8Sopenharmony_ci        return -1;
29869570cc8Sopenharmony_ci    }
29969570cc8Sopenharmony_ci    *instance = (ThreadMgr)mgr;
30069570cc8Sopenharmony_ci    APPSPAWN_LOGV("Create thread manager success maxThreadCount: %{public}u", maxThreadCount);
30169570cc8Sopenharmony_ci    return 0;
30269570cc8Sopenharmony_ci}
30369570cc8Sopenharmony_ci
30469570cc8Sopenharmony_ciint DestroyThreadMgr(ThreadMgr instance)
30569570cc8Sopenharmony_ci{
30669570cc8Sopenharmony_ci    APPSPAWN_LOGV("DestroyThreadMgr");
30769570cc8Sopenharmony_ci    ThreadManager *mgr = (ThreadManager *)instance;
30869570cc8Sopenharmony_ci    APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
30969570cc8Sopenharmony_ci
31069570cc8Sopenharmony_ci    for (uint32_t index = 0; index < mgr->maxThreadCount + 1; index++) {
31169570cc8Sopenharmony_ci        if (mgr->threadNode[index].threadId != INVALID_THREAD_ID) {
31269570cc8Sopenharmony_ci            atomic_store(&mgr->threadNode[index].threadExit, 1);
31369570cc8Sopenharmony_ci            APPSPAWN_LOGV("DestroyThreadMgr index %{public}d %{public}d", index, mgr->threadNode[index].threadExit);
31469570cc8Sopenharmony_ci        }
31569570cc8Sopenharmony_ci    }
31669570cc8Sopenharmony_ci    pthread_mutex_lock(&mgr->mutex);
31769570cc8Sopenharmony_ci    pthread_cond_broadcast(&mgr->cond);
31869570cc8Sopenharmony_ci    pthread_mutex_unlock(&mgr->mutex);
31969570cc8Sopenharmony_ci    for (uint32_t index = 0; index < mgr->maxThreadCount + 1; index++) {
32069570cc8Sopenharmony_ci        if (mgr->threadNode[index].threadId != INVALID_THREAD_ID) {
32169570cc8Sopenharmony_ci            pthread_join(mgr->threadNode[index].threadId, NULL);
32269570cc8Sopenharmony_ci            APPSPAWN_LOGV("DestroyThreadMgr index %{public}d end", index);
32369570cc8Sopenharmony_ci        }
32469570cc8Sopenharmony_ci    }
32569570cc8Sopenharmony_ci
32669570cc8Sopenharmony_ci    pthread_mutex_lock(&mgr->mutex);
32769570cc8Sopenharmony_ci    OH_ListRemoveAll(&mgr->taskList, TaskQueueDestroyProc);
32869570cc8Sopenharmony_ci    OH_ListRemoveAll(&mgr->waitingTaskQueue, TaskQueueDestroyProc);
32969570cc8Sopenharmony_ci    OH_ListRemoveAll(&mgr->executingTaskQueue, TaskQueueDestroyProc);
33069570cc8Sopenharmony_ci    OH_ListRemoveAll(&mgr->executorQueue, TaskQueueDestroyProc);
33169570cc8Sopenharmony_ci    pthread_mutex_unlock(&mgr->mutex);
33269570cc8Sopenharmony_ci
33369570cc8Sopenharmony_ci    pthread_cond_destroy(&mgr->cond);
33469570cc8Sopenharmony_ci    pthread_mutex_destroy(&mgr->mutex);
33569570cc8Sopenharmony_ci    return 0;
33669570cc8Sopenharmony_ci}
33769570cc8Sopenharmony_ci
33869570cc8Sopenharmony_ciint ThreadMgrAddTask(ThreadMgr instance, ThreadTaskHandle *taskHandle)
33969570cc8Sopenharmony_ci{
34069570cc8Sopenharmony_ci    ThreadManager *mgr = (ThreadManager *)instance;
34169570cc8Sopenharmony_ci    APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
34269570cc8Sopenharmony_ci    TaskNode *task = (TaskNode *)malloc(sizeof(TaskNode));
34369570cc8Sopenharmony_ci    APPSPAWN_CHECK(task != NULL, return -1, "Failed to create thread task");
34469570cc8Sopenharmony_ci
34569570cc8Sopenharmony_ci    task->context = NULL;
34669570cc8Sopenharmony_ci    task->finishProcess = NULL;
34769570cc8Sopenharmony_ci    task->totalTask = 0;
34869570cc8Sopenharmony_ci    atomic_init(&task->taskFlags, 0);
34969570cc8Sopenharmony_ci    atomic_init(&task->finishTaskCount, 0);
35069570cc8Sopenharmony_ci    OH_ListInit(&task->node);
35169570cc8Sopenharmony_ci    OH_ListInit(&task->executorList);
35269570cc8Sopenharmony_ci    pthread_mutex_init(&task->mutex, NULL);
35369570cc8Sopenharmony_ci    SetCondAttr(&task->cond);
35469570cc8Sopenharmony_ci
35569570cc8Sopenharmony_ci    pthread_mutex_lock(&mgr->mutex);
35669570cc8Sopenharmony_ci    task->taskId = mgr->currTaskId++;
35769570cc8Sopenharmony_ci    OH_ListAddTail(&mgr->taskList, &task->node);
35869570cc8Sopenharmony_ci    pthread_mutex_unlock(&mgr->mutex);
35969570cc8Sopenharmony_ci    *taskHandle = task->taskId;
36069570cc8Sopenharmony_ci    APPSPAWN_LOGV("Create thread task success task id: %{public}u", task->taskId);
36169570cc8Sopenharmony_ci    return 0;
36269570cc8Sopenharmony_ci}
36369570cc8Sopenharmony_ci
36469570cc8Sopenharmony_ciint ThreadMgrAddExecutor(ThreadMgr instance,
36569570cc8Sopenharmony_ci    ThreadTaskHandle taskHandle, TaskExecutor executor, const ThreadContext *context)
36669570cc8Sopenharmony_ci{
36769570cc8Sopenharmony_ci    ThreadManager *mgr = (ThreadManager *)instance;
36869570cc8Sopenharmony_ci    APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
36969570cc8Sopenharmony_ci    TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle);
37069570cc8Sopenharmony_ci    APPSPAWN_CHECK(task != NULL, return -1, "Invalid thread task %{public}u", taskHandle);
37169570cc8Sopenharmony_ci
37269570cc8Sopenharmony_ci    TaskExecuteNode *node = (TaskExecuteNode *)malloc(sizeof(TaskExecuteNode));
37369570cc8Sopenharmony_ci    APPSPAWN_CHECK(node != NULL, return -1, "Failed to create thread executor for task %{public}u", taskHandle);
37469570cc8Sopenharmony_ci    node->task = task;
37569570cc8Sopenharmony_ci    OH_ListInit(&node->node);
37669570cc8Sopenharmony_ci    OH_ListInit(&node->executeNode);
37769570cc8Sopenharmony_ci    node->context = context;
37869570cc8Sopenharmony_ci    node->executor = executor;
37969570cc8Sopenharmony_ci    task->totalTask++;
38069570cc8Sopenharmony_ci    OH_ListAddTail(&task->executorList, &node->node);
38169570cc8Sopenharmony_ci    return 0;
38269570cc8Sopenharmony_ci}
38369570cc8Sopenharmony_ci
38469570cc8Sopenharmony_ciint ThreadMgrCancelTask(ThreadMgr instance, ThreadTaskHandle taskHandle)
38569570cc8Sopenharmony_ci{
38669570cc8Sopenharmony_ci    ThreadManager *mgr = (ThreadManager *)instance;
38769570cc8Sopenharmony_ci    APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
38869570cc8Sopenharmony_ci    TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle);
38969570cc8Sopenharmony_ci    if (task != NULL) {
39069570cc8Sopenharmony_ci        SafeRemoveTask(mgr, task);
39169570cc8Sopenharmony_ci        DeleteTask(task);
39269570cc8Sopenharmony_ci        return 0;
39369570cc8Sopenharmony_ci    }
39469570cc8Sopenharmony_ci    task = GetTask(mgr, &mgr->waitingTaskQueue, taskHandle);
39569570cc8Sopenharmony_ci    if (task != NULL) {
39669570cc8Sopenharmony_ci        SafeRemoveTask(mgr, task);
39769570cc8Sopenharmony_ci        DeleteTask(task);
39869570cc8Sopenharmony_ci        return 0;
39969570cc8Sopenharmony_ci    }
40069570cc8Sopenharmony_ci    task = GetTask(mgr, &mgr->executingTaskQueue, taskHandle);
40169570cc8Sopenharmony_ci    if (task != NULL) {
40269570cc8Sopenharmony_ci        SafeRemoveTask(mgr, task);
40369570cc8Sopenharmony_ci        DeleteTask(task);
40469570cc8Sopenharmony_ci        return 0;
40569570cc8Sopenharmony_ci    }
40669570cc8Sopenharmony_ci    return 0;
40769570cc8Sopenharmony_ci}
40869570cc8Sopenharmony_ci
40969570cc8Sopenharmony_ciint TaskSyncExecute(ThreadMgr instance, ThreadTaskHandle taskHandle)
41069570cc8Sopenharmony_ci{
41169570cc8Sopenharmony_ci    ThreadManager *mgr = (ThreadManager *)instance;
41269570cc8Sopenharmony_ci    APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
41369570cc8Sopenharmony_ci    TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle);
41469570cc8Sopenharmony_ci    APPSPAWN_CHECK(task != NULL, return -1, "Invalid thread task %{public}u", taskHandle);
41569570cc8Sopenharmony_ci
41669570cc8Sopenharmony_ci    pthread_mutex_lock(&task->mutex);
41769570cc8Sopenharmony_ci    OH_ListRemove(&task->node);
41869570cc8Sopenharmony_ci    OH_ListInit(&task->node);
41969570cc8Sopenharmony_ci    OH_ListAddTail(&mgr->waitingTaskQueue, &task->node);
42069570cc8Sopenharmony_ci    pthread_cond_broadcast(&mgr->cond);
42169570cc8Sopenharmony_ci    pthread_mutex_unlock(&task->mutex);
42269570cc8Sopenharmony_ci    APPSPAWN_LOGV("TaskSyncExecute task: %{public}u", task->taskId);
42369570cc8Sopenharmony_ci    struct timespec abstime;
42469570cc8Sopenharmony_ci    int ret = 0;
42569570cc8Sopenharmony_ci    do {
42669570cc8Sopenharmony_ci        ConvertToTimespec(60 * 1000, &abstime);  // wait 60 * 1000 60s
42769570cc8Sopenharmony_ci        pthread_mutex_lock(&task->mutex);
42869570cc8Sopenharmony_ci        ret = pthread_cond_timedwait(&task->cond, &task->mutex, &abstime);
42969570cc8Sopenharmony_ci        pthread_mutex_unlock(&task->mutex);
43069570cc8Sopenharmony_ci        APPSPAWN_LOGV("TaskSyncExecute success task id: %{public}u ret: %{public}d", task->taskId, ret);
43169570cc8Sopenharmony_ci    } while (ret == ETIMEDOUT);
43269570cc8Sopenharmony_ci
43369570cc8Sopenharmony_ci    DeleteTask(task);
43469570cc8Sopenharmony_ci    return ret;
43569570cc8Sopenharmony_ci}
43669570cc8Sopenharmony_ci
43769570cc8Sopenharmony_ciint TaskExecute(ThreadMgr instance,
43869570cc8Sopenharmony_ci    ThreadTaskHandle taskHandle, TaskFinishProcessor process, const ThreadContext *context)
43969570cc8Sopenharmony_ci{
44069570cc8Sopenharmony_ci    ThreadManager *mgr = (ThreadManager *)instance;
44169570cc8Sopenharmony_ci    APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
44269570cc8Sopenharmony_ci    TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle);
44369570cc8Sopenharmony_ci    APPSPAWN_CHECK(task != NULL, return -1, "Invalid thread task %{public}u", taskHandle);
44469570cc8Sopenharmony_ci
44569570cc8Sopenharmony_ci    task->finishProcess = process;
44669570cc8Sopenharmony_ci    task->context = context;
44769570cc8Sopenharmony_ci    pthread_mutex_lock(&mgr->mutex);
44869570cc8Sopenharmony_ci    OH_ListRemove(&task->node);
44969570cc8Sopenharmony_ci    OH_ListInit(&task->node);
45069570cc8Sopenharmony_ci    OH_ListAddTail(&mgr->waitingTaskQueue, &task->node);
45169570cc8Sopenharmony_ci    pthread_cond_broadcast(&mgr->cond);
45269570cc8Sopenharmony_ci    pthread_mutex_unlock(&mgr->mutex);
45369570cc8Sopenharmony_ci    APPSPAWN_LOGV("TaskExecute task: %{public}u", task->taskId);
45469570cc8Sopenharmony_ci    return 0;
45569570cc8Sopenharmony_ci}
45669570cc8Sopenharmony_ci
45769570cc8Sopenharmony_cistatic void CheckAndCreateNewThread(ThreadManager *mgr)
45869570cc8Sopenharmony_ci{
45969570cc8Sopenharmony_ci    if (mgr->maxThreadCount <= mgr->validThreadCount) {
46069570cc8Sopenharmony_ci        return;
46169570cc8Sopenharmony_ci    }
46269570cc8Sopenharmony_ci    if (mgr->executorCount <= mgr->validThreadCount) {
46369570cc8Sopenharmony_ci        return;
46469570cc8Sopenharmony_ci    }
46569570cc8Sopenharmony_ci    APPSPAWN_LOGV("CheckAndCreateNewThread maxThreadCount: %{public}u validThreadCount: %{public}u %{public}u",
46669570cc8Sopenharmony_ci        mgr->maxThreadCount, mgr->validThreadCount, mgr->executorCount);
46769570cc8Sopenharmony_ci
46869570cc8Sopenharmony_ci    uint32_t totalThread = mgr->maxThreadCount;
46969570cc8Sopenharmony_ci    if (mgr->executorCount <= mgr->maxThreadCount) {
47069570cc8Sopenharmony_ci        totalThread = mgr->executorCount;
47169570cc8Sopenharmony_ci    }
47269570cc8Sopenharmony_ci
47369570cc8Sopenharmony_ci    for (uint32_t index = 0; index < mgr->maxThreadCount + 1; index++) {
47469570cc8Sopenharmony_ci        if (mgr->threadNode[index].threadId != INVALID_THREAD_ID) {
47569570cc8Sopenharmony_ci            continue;
47669570cc8Sopenharmony_ci        }
47769570cc8Sopenharmony_ci        int ret = pthread_create(&mgr->threadNode[index].threadId,
47869570cc8Sopenharmony_ci            NULL, ThreadExecute, (void *)&(mgr->threadNode[index]));
47969570cc8Sopenharmony_ci        APPSPAWN_CHECK(ret == 0, return, "Failed to create thread for %{public}u", index);
48069570cc8Sopenharmony_ci        APPSPAWN_LOGV("Create thread success index: %{public}u", mgr->threadNode[index].index);
48169570cc8Sopenharmony_ci        mgr->validThreadCount++;
48269570cc8Sopenharmony_ci        if (mgr->validThreadCount >= totalThread) {
48369570cc8Sopenharmony_ci            return;
48469570cc8Sopenharmony_ci        }
48569570cc8Sopenharmony_ci    }
48669570cc8Sopenharmony_ci    return;
48769570cc8Sopenharmony_ci}
48869570cc8Sopenharmony_ci
48969570cc8Sopenharmony_cistatic void *ManagerThreadProc(void *args)
49069570cc8Sopenharmony_ci{
49169570cc8Sopenharmony_ci    ThreadManager *mgr = g_threadManager;
49269570cc8Sopenharmony_ci    ThreadNode *threadNode = (ThreadNode *)args;
49369570cc8Sopenharmony_ci    struct timespec abstime;
49469570cc8Sopenharmony_ci    while (!threadNode->threadExit) {
49569570cc8Sopenharmony_ci        pthread_mutex_lock(&mgr->mutex);
49669570cc8Sopenharmony_ci        do {
49769570cc8Sopenharmony_ci            uint32_t timeout = 60 * 1000; // 60 * 1000 60s
49869570cc8Sopenharmony_ci            if (!ListEmpty(mgr->waitingTaskQueue)) {
49969570cc8Sopenharmony_ci                break;
50069570cc8Sopenharmony_ci            }
50169570cc8Sopenharmony_ci            if (!ListEmpty(mgr->executingTaskQueue)) {
50269570cc8Sopenharmony_ci                timeout = 500; // 500ms
50369570cc8Sopenharmony_ci            }
50469570cc8Sopenharmony_ci            ConvertToTimespec(timeout, &abstime);
50569570cc8Sopenharmony_ci            int ret = pthread_cond_timedwait(&mgr->cond, &mgr->mutex, &abstime);
50669570cc8Sopenharmony_ci            if (!ListEmpty(mgr->executingTaskQueue) || ret == ETIMEDOUT) {
50769570cc8Sopenharmony_ci                break;
50869570cc8Sopenharmony_ci            }
50969570cc8Sopenharmony_ci            if (threadNode->threadExit) {
51069570cc8Sopenharmony_ci                break;
51169570cc8Sopenharmony_ci            }
51269570cc8Sopenharmony_ci        } while (1);
51369570cc8Sopenharmony_ci        pthread_mutex_unlock(&mgr->mutex);
51469570cc8Sopenharmony_ci
51569570cc8Sopenharmony_ci        ExecuteTask(mgr);
51669570cc8Sopenharmony_ci        CheckAndCreateNewThread(mgr);
51769570cc8Sopenharmony_ci
51869570cc8Sopenharmony_ci        if (mgr->validThreadCount == 0) {
51969570cc8Sopenharmony_ci            RunExecutor(mgr, threadNode, 5); // 5 max thread
52069570cc8Sopenharmony_ci        }
52169570cc8Sopenharmony_ci        CheckTaskComplete(mgr);
52269570cc8Sopenharmony_ci    }
52369570cc8Sopenharmony_ci    return 0;
52469570cc8Sopenharmony_ci}
52569570cc8Sopenharmony_ci
52669570cc8Sopenharmony_cistatic void *ThreadExecute(void *args)
52769570cc8Sopenharmony_ci{
52869570cc8Sopenharmony_ci    ThreadManager *mgr = g_threadManager;
52969570cc8Sopenharmony_ci    ThreadNode *threadNode = (ThreadNode *)args;
53069570cc8Sopenharmony_ci    struct timespec abstime;
53169570cc8Sopenharmony_ci    while (!threadNode->threadExit) {
53269570cc8Sopenharmony_ci        pthread_mutex_lock(&mgr->mutex);
53369570cc8Sopenharmony_ci        while (ListEmpty(mgr->executorQueue) && !threadNode->threadExit) {
53469570cc8Sopenharmony_ci            ConvertToTimespec(60 * 1000, &abstime); // 60 * 1000 60s
53569570cc8Sopenharmony_ci            pthread_cond_timedwait(&mgr->cond, &mgr->mutex, &abstime);
53669570cc8Sopenharmony_ci        }
53769570cc8Sopenharmony_ci        pthread_mutex_unlock(&mgr->mutex);
53869570cc8Sopenharmony_ci        APPSPAWN_LOGV("bbbb threadNode->threadExit %{public}d", threadNode->threadExit);
53969570cc8Sopenharmony_ci        RunExecutor(mgr, threadNode, 1);
54069570cc8Sopenharmony_ci    }
54169570cc8Sopenharmony_ci    return NULL;
54269570cc8Sopenharmony_ci}
543