1/*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15#include "thread_manager.h"
16
17#include <errno.h>
18#include <pthread.h>
19#include <stdatomic.h>
20
21#include "appspawn_utils.h"
22#include "list.h"
23
24typedef struct {
25    atomic_uint threadExit;
26    uint32_t index;
27    pthread_t threadId;
28} ThreadNode;
29
30typedef struct {
31    pthread_mutex_t mutex;        // 保护执行队列
32    pthread_cond_t cond;          // 线程等待条件
33    ListNode taskList;            // 任务队列,任务还没有启动
34    ListNode waitingTaskQueue;    // 启动的任务,排队等待执行
35    ListNode executingTaskQueue;  // 正在执行
36    ListNode executorQueue;       // 执行节点,保存 TaskExecuteNode
37    uint32_t executorCount;
38    uint32_t maxThreadCount;
39    uint32_t currTaskId;
40    struct timespec lastAdjust;
41    uint32_t validThreadCount;
42    ThreadNode threadNode[1];  // 线程信息,控制线程的退出和结束
43} ThreadManager;
44
45typedef struct {
46    uint32_t taskId;
47    ListNode node;
48    ListNode executorList;
49    uint32_t totalTask;
50    atomic_uint taskFlags;  // 表示任务是否被取消,各线程检查后决定任务线程是否结束
51    atomic_uint finishTaskCount;
52    const ThreadContext *context;
53    TaskFinishProcessor finishProcess;
54    pthread_mutex_t mutex;  // 保护执行队列
55    pthread_cond_t cond;    // 同步执行时,等待确认
56} TaskNode;
57
58typedef struct {
59    ListNode node;         // 保存sub task到对应的task,方便管理
60    ListNode executeNode;  // 等待处理的任务节点
61    TaskNode *task;
62    const ThreadContext *context;
63    TaskExecutor executor;
64} TaskExecuteNode;
65
66static ThreadManager *g_threadManager = NULL;
67
68static void *ManagerThreadProc(void *args);
69static void *ThreadExecute(void *args);
70
71static void SetCondAttr(pthread_cond_t *cond)
72{
73    pthread_condattr_t attr;
74    pthread_condattr_init(&attr);
75    pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
76    pthread_cond_init(cond, &attr);
77    pthread_condattr_destroy(&attr);
78}
79
80static void ConvertToTimespec(int time, struct timespec *tm)
81{
82    struct timespec start;
83    clock_gettime(CLOCK_MONOTONIC, &start);
84    uint64_t ns = time;
85    ns *= APPSPAWN_MSEC_TO_NSEC;
86    ns += start.tv_sec * APPSPAWN_SEC_TO_NSEC + start.tv_nsec;
87    tm->tv_sec = ns / APPSPAWN_SEC_TO_NSEC;
88    tm->tv_nsec = ns % APPSPAWN_SEC_TO_NSEC;
89}
90
91static TaskExecuteNode *PopTaskExecutor(ThreadManager *mgr)
92{
93    TaskExecuteNode *executor = NULL;
94    pthread_mutex_lock(&mgr->mutex);
95    ListNode *node = mgr->executorQueue.next;
96    if (node != &mgr->executorQueue) {
97        OH_ListRemove(node);
98        OH_ListInit(node);
99        executor = ListEntry(node, TaskExecuteNode, executeNode);
100        mgr->executorCount--;
101    }
102    pthread_mutex_unlock(&mgr->mutex);
103    return executor;
104}
105
106static int AddExecutor(ThreadManager *mgr, const TaskNode *task)
107{
108    ListNode *node = task->executorList.next;
109    while (node != &task->executorList) {
110        TaskExecuteNode *executor = ListEntry(node, TaskExecuteNode, node);
111        APPSPAWN_LOGV("AddExecutor task: %{public}u executorCount: %{public}u executor: %{public}u",
112            task->taskId, mgr->executorCount, executor->task->taskId);
113
114        // 插入尾部执行
115        pthread_mutex_lock(&mgr->mutex);
116        OH_ListRemove(&executor->executeNode);
117        OH_ListInit(&executor->executeNode);
118        OH_ListAddTail(&mgr->executorQueue, &executor->executeNode);
119        mgr->executorCount++;
120        pthread_mutex_unlock(&mgr->mutex);
121
122        node = node->next;
123    }
124    return 0;
125}
126
127static void RunExecutor(ThreadManager *mgr, ThreadNode *threadNode, uint32_t maxCount)
128{
129    APPSPAWN_LOGV("RunExecutor in thread: %{public}d executorCount: %{public}u ",
130        threadNode->index, mgr->executorCount);
131    TaskExecuteNode *executor = PopTaskExecutor(mgr);
132    uint32_t count = 0;
133    while (executor != NULL && !threadNode->threadExit) {
134        APPSPAWN_LOGV("RunExecutor task: %{public}u", executor->task->taskId);
135        atomic_fetch_add(&executor->task->finishTaskCount, 1);
136        executor->executor(executor->task->taskId, executor->context);
137        count++;
138        if (count >= maxCount) {
139            break;
140        }
141        executor = PopTaskExecutor(mgr);
142    }
143    APPSPAWN_LOGV("RunExecutor executorCount: %{public}u end", mgr->executorCount);
144}
145
146static int TaskCompareTaskId(ListNode *node, void *data)
147{
148    TaskNode *task = ListEntry(node, TaskNode, node);
149    return task->taskId - *(uint32_t *)data;
150}
151
152static TaskNode *GetTask(ThreadManager *mgr, ListNode *queue, uint32_t taskId)
153{
154    ListNode *node = NULL;
155    pthread_mutex_lock(&mgr->mutex);
156    node = OH_ListFind(queue, &taskId, TaskCompareTaskId);
157    pthread_mutex_unlock(&mgr->mutex);
158    if (node == NULL) {
159        return NULL;
160    }
161    return ListEntry(node, TaskNode, node);
162}
163
164static void DeleteTask(TaskNode *task)
165{
166    APPSPAWN_LOGV("DeleteTask task: %{public}u ", task->taskId);
167
168    if (!ListEmpty(task->node)) {
169        return;
170    }
171    OH_ListRemoveAll(&task->executorList, NULL);
172    pthread_cond_destroy(&task->cond);
173    pthread_mutex_destroy(&task->mutex);
174    free(task);
175}
176
177static TaskNode *PopTask(ThreadManager *mgr, ListNode *queue)
178{
179    TaskNode *task = NULL;
180    pthread_mutex_lock(&mgr->mutex);
181    ListNode *node = queue->next;
182    if (node != queue) {
183        OH_ListRemove(node);
184        OH_ListInit(node);
185        task = ListEntry(node, TaskNode, node);
186    }
187    pthread_mutex_unlock(&mgr->mutex);
188    return task;
189}
190
191static void PushTask(ThreadManager *mgr, TaskNode *task, ListNode *queue)
192{
193    pthread_mutex_lock(&mgr->mutex);
194    OH_ListAddTail(queue, &task->node);
195    pthread_cond_broadcast(&mgr->cond);
196    pthread_mutex_unlock(&mgr->mutex);
197}
198
199static void SafeRemoveTask(ThreadManager *mgr, TaskNode *task)
200{
201    pthread_mutex_lock(&mgr->mutex);
202    OH_ListRemove(&task->node);
203    OH_ListInit(&task->node);
204    pthread_mutex_unlock(&mgr->mutex);
205
206    ListNode *node = task->executorList.next;
207    while (node != &task->executorList) {
208        OH_ListRemove(node);
209        OH_ListInit(node);
210        TaskExecuteNode *executor = ListEntry(node, TaskExecuteNode, node);
211        pthread_mutex_lock(&mgr->mutex);
212        if (!ListEmpty(executor->executeNode)) {
213            OH_ListRemove(&executor->executeNode);
214            OH_ListInit(&executor->executeNode);
215            mgr->executorCount--;
216        }
217        pthread_mutex_unlock(&mgr->mutex);
218        free(executor);
219
220        node = task->executorList.next;
221    }
222}
223
224static void ExecuteTask(ThreadManager *mgr)
225{
226    TaskNode *task = PopTask(mgr, &mgr->waitingTaskQueue);
227    if (task == NULL) {
228        return;
229    }
230
231    APPSPAWN_LOGV("ExecuteTask task: %{public}u ", task->taskId);
232    AddExecutor(mgr, task);
233    PushTask(mgr, task, &mgr->executingTaskQueue);
234    return;
235}
236
237static void CheckTaskComplete(ThreadManager *mgr)
238{
239    TaskNode *task = PopTask(mgr, &mgr->executingTaskQueue);
240    if (task == NULL) {
241        return;
242    }
243    if (task->totalTask <= atomic_load(&task->finishTaskCount)) {
244        if (task->finishProcess != NULL) {
245            task->finishProcess(task->taskId, task->context);
246            DeleteTask(task);
247            return;
248        }
249        pthread_mutex_lock(&task->mutex);
250        pthread_cond_signal(&task->cond);
251        pthread_mutex_unlock(&task->mutex);
252        return;
253    }
254    PushTask(mgr, task, &mgr->executingTaskQueue);
255    return;
256}
257
258static void TaskQueueDestroyProc(ListNode *node)
259{
260    OH_ListRemove(node);
261    TaskNode *task = ListEntry(node, TaskNode, node);
262    DeleteTask(task);
263}
264
265int CreateThreadMgr(uint32_t maxThreadCount, ThreadMgr *instance)
266{
267    if (g_threadManager != NULL) {
268        *instance = (ThreadMgr)g_threadManager;
269        return 0;
270    }
271
272    ThreadManager *mgr = (ThreadManager *)malloc(sizeof(ThreadManager) + maxThreadCount * sizeof(ThreadNode));
273    APPSPAWN_CHECK(mgr != NULL, return -1, "Failed to create thread manager");
274
275    mgr->executorCount = 0;
276    mgr->currTaskId = 0;
277    mgr->validThreadCount = 0;
278    mgr->maxThreadCount = maxThreadCount;
279    OH_ListInit(&mgr->taskList);
280    OH_ListInit(&mgr->waitingTaskQueue);
281    OH_ListInit(&mgr->executingTaskQueue);
282    OH_ListInit(&mgr->executorQueue);
283    pthread_mutex_init(&mgr->mutex, NULL);
284    SetCondAttr(&mgr->cond);
285
286    for (uint32_t index = 0; index < maxThreadCount + 1; index++) {
287        mgr->threadNode[index].index = index;
288        mgr->threadNode[index].threadId = INVALID_THREAD_ID;
289        atomic_init(&mgr->threadNode[index].threadExit, 0);
290    }
291    g_threadManager = mgr;
292    int ret = pthread_create(&mgr->threadNode[0].threadId, NULL, ManagerThreadProc, (void *)&mgr->threadNode[0]);
293    if (ret != 0) {
294        APPSPAWN_LOGE("Failed to create thread for manager");
295        g_threadManager = NULL;
296        free(mgr);
297        return -1;
298    }
299    *instance = (ThreadMgr)mgr;
300    APPSPAWN_LOGV("Create thread manager success maxThreadCount: %{public}u", maxThreadCount);
301    return 0;
302}
303
304int DestroyThreadMgr(ThreadMgr instance)
305{
306    APPSPAWN_LOGV("DestroyThreadMgr");
307    ThreadManager *mgr = (ThreadManager *)instance;
308    APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
309
310    for (uint32_t index = 0; index < mgr->maxThreadCount + 1; index++) {
311        if (mgr->threadNode[index].threadId != INVALID_THREAD_ID) {
312            atomic_store(&mgr->threadNode[index].threadExit, 1);
313            APPSPAWN_LOGV("DestroyThreadMgr index %{public}d %{public}d", index, mgr->threadNode[index].threadExit);
314        }
315    }
316    pthread_mutex_lock(&mgr->mutex);
317    pthread_cond_broadcast(&mgr->cond);
318    pthread_mutex_unlock(&mgr->mutex);
319    for (uint32_t index = 0; index < mgr->maxThreadCount + 1; index++) {
320        if (mgr->threadNode[index].threadId != INVALID_THREAD_ID) {
321            pthread_join(mgr->threadNode[index].threadId, NULL);
322            APPSPAWN_LOGV("DestroyThreadMgr index %{public}d end", index);
323        }
324    }
325
326    pthread_mutex_lock(&mgr->mutex);
327    OH_ListRemoveAll(&mgr->taskList, TaskQueueDestroyProc);
328    OH_ListRemoveAll(&mgr->waitingTaskQueue, TaskQueueDestroyProc);
329    OH_ListRemoveAll(&mgr->executingTaskQueue, TaskQueueDestroyProc);
330    OH_ListRemoveAll(&mgr->executorQueue, TaskQueueDestroyProc);
331    pthread_mutex_unlock(&mgr->mutex);
332
333    pthread_cond_destroy(&mgr->cond);
334    pthread_mutex_destroy(&mgr->mutex);
335    return 0;
336}
337
338int ThreadMgrAddTask(ThreadMgr instance, ThreadTaskHandle *taskHandle)
339{
340    ThreadManager *mgr = (ThreadManager *)instance;
341    APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
342    TaskNode *task = (TaskNode *)malloc(sizeof(TaskNode));
343    APPSPAWN_CHECK(task != NULL, return -1, "Failed to create thread task");
344
345    task->context = NULL;
346    task->finishProcess = NULL;
347    task->totalTask = 0;
348    atomic_init(&task->taskFlags, 0);
349    atomic_init(&task->finishTaskCount, 0);
350    OH_ListInit(&task->node);
351    OH_ListInit(&task->executorList);
352    pthread_mutex_init(&task->mutex, NULL);
353    SetCondAttr(&task->cond);
354
355    pthread_mutex_lock(&mgr->mutex);
356    task->taskId = mgr->currTaskId++;
357    OH_ListAddTail(&mgr->taskList, &task->node);
358    pthread_mutex_unlock(&mgr->mutex);
359    *taskHandle = task->taskId;
360    APPSPAWN_LOGV("Create thread task success task id: %{public}u", task->taskId);
361    return 0;
362}
363
364int ThreadMgrAddExecutor(ThreadMgr instance,
365    ThreadTaskHandle taskHandle, TaskExecutor executor, const ThreadContext *context)
366{
367    ThreadManager *mgr = (ThreadManager *)instance;
368    APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
369    TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle);
370    APPSPAWN_CHECK(task != NULL, return -1, "Invalid thread task %{public}u", taskHandle);
371
372    TaskExecuteNode *node = (TaskExecuteNode *)malloc(sizeof(TaskExecuteNode));
373    APPSPAWN_CHECK(node != NULL, return -1, "Failed to create thread executor for task %{public}u", taskHandle);
374    node->task = task;
375    OH_ListInit(&node->node);
376    OH_ListInit(&node->executeNode);
377    node->context = context;
378    node->executor = executor;
379    task->totalTask++;
380    OH_ListAddTail(&task->executorList, &node->node);
381    return 0;
382}
383
384int ThreadMgrCancelTask(ThreadMgr instance, ThreadTaskHandle taskHandle)
385{
386    ThreadManager *mgr = (ThreadManager *)instance;
387    APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
388    TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle);
389    if (task != NULL) {
390        SafeRemoveTask(mgr, task);
391        DeleteTask(task);
392        return 0;
393    }
394    task = GetTask(mgr, &mgr->waitingTaskQueue, taskHandle);
395    if (task != NULL) {
396        SafeRemoveTask(mgr, task);
397        DeleteTask(task);
398        return 0;
399    }
400    task = GetTask(mgr, &mgr->executingTaskQueue, taskHandle);
401    if (task != NULL) {
402        SafeRemoveTask(mgr, task);
403        DeleteTask(task);
404        return 0;
405    }
406    return 0;
407}
408
409int TaskSyncExecute(ThreadMgr instance, ThreadTaskHandle taskHandle)
410{
411    ThreadManager *mgr = (ThreadManager *)instance;
412    APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
413    TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle);
414    APPSPAWN_CHECK(task != NULL, return -1, "Invalid thread task %{public}u", taskHandle);
415
416    pthread_mutex_lock(&task->mutex);
417    OH_ListRemove(&task->node);
418    OH_ListInit(&task->node);
419    OH_ListAddTail(&mgr->waitingTaskQueue, &task->node);
420    pthread_cond_broadcast(&mgr->cond);
421    pthread_mutex_unlock(&task->mutex);
422    APPSPAWN_LOGV("TaskSyncExecute task: %{public}u", task->taskId);
423    struct timespec abstime;
424    int ret = 0;
425    do {
426        ConvertToTimespec(60 * 1000, &abstime);  // wait 60 * 1000 60s
427        pthread_mutex_lock(&task->mutex);
428        ret = pthread_cond_timedwait(&task->cond, &task->mutex, &abstime);
429        pthread_mutex_unlock(&task->mutex);
430        APPSPAWN_LOGV("TaskSyncExecute success task id: %{public}u ret: %{public}d", task->taskId, ret);
431    } while (ret == ETIMEDOUT);
432
433    DeleteTask(task);
434    return ret;
435}
436
437int TaskExecute(ThreadMgr instance,
438    ThreadTaskHandle taskHandle, TaskFinishProcessor process, const ThreadContext *context)
439{
440    ThreadManager *mgr = (ThreadManager *)instance;
441    APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
442    TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle);
443    APPSPAWN_CHECK(task != NULL, return -1, "Invalid thread task %{public}u", taskHandle);
444
445    task->finishProcess = process;
446    task->context = context;
447    pthread_mutex_lock(&mgr->mutex);
448    OH_ListRemove(&task->node);
449    OH_ListInit(&task->node);
450    OH_ListAddTail(&mgr->waitingTaskQueue, &task->node);
451    pthread_cond_broadcast(&mgr->cond);
452    pthread_mutex_unlock(&mgr->mutex);
453    APPSPAWN_LOGV("TaskExecute task: %{public}u", task->taskId);
454    return 0;
455}
456
457static void CheckAndCreateNewThread(ThreadManager *mgr)
458{
459    if (mgr->maxThreadCount <= mgr->validThreadCount) {
460        return;
461    }
462    if (mgr->executorCount <= mgr->validThreadCount) {
463        return;
464    }
465    APPSPAWN_LOGV("CheckAndCreateNewThread maxThreadCount: %{public}u validThreadCount: %{public}u %{public}u",
466        mgr->maxThreadCount, mgr->validThreadCount, mgr->executorCount);
467
468    uint32_t totalThread = mgr->maxThreadCount;
469    if (mgr->executorCount <= mgr->maxThreadCount) {
470        totalThread = mgr->executorCount;
471    }
472
473    for (uint32_t index = 0; index < mgr->maxThreadCount + 1; index++) {
474        if (mgr->threadNode[index].threadId != INVALID_THREAD_ID) {
475            continue;
476        }
477        int ret = pthread_create(&mgr->threadNode[index].threadId,
478            NULL, ThreadExecute, (void *)&(mgr->threadNode[index]));
479        APPSPAWN_CHECK(ret == 0, return, "Failed to create thread for %{public}u", index);
480        APPSPAWN_LOGV("Create thread success index: %{public}u", mgr->threadNode[index].index);
481        mgr->validThreadCount++;
482        if (mgr->validThreadCount >= totalThread) {
483            return;
484        }
485    }
486    return;
487}
488
489static void *ManagerThreadProc(void *args)
490{
491    ThreadManager *mgr = g_threadManager;
492    ThreadNode *threadNode = (ThreadNode *)args;
493    struct timespec abstime;
494    while (!threadNode->threadExit) {
495        pthread_mutex_lock(&mgr->mutex);
496        do {
497            uint32_t timeout = 60 * 1000; // 60 * 1000 60s
498            if (!ListEmpty(mgr->waitingTaskQueue)) {
499                break;
500            }
501            if (!ListEmpty(mgr->executingTaskQueue)) {
502                timeout = 500; // 500ms
503            }
504            ConvertToTimespec(timeout, &abstime);
505            int ret = pthread_cond_timedwait(&mgr->cond, &mgr->mutex, &abstime);
506            if (!ListEmpty(mgr->executingTaskQueue) || ret == ETIMEDOUT) {
507                break;
508            }
509            if (threadNode->threadExit) {
510                break;
511            }
512        } while (1);
513        pthread_mutex_unlock(&mgr->mutex);
514
515        ExecuteTask(mgr);
516        CheckAndCreateNewThread(mgr);
517
518        if (mgr->validThreadCount == 0) {
519            RunExecutor(mgr, threadNode, 5); // 5 max thread
520        }
521        CheckTaskComplete(mgr);
522    }
523    return 0;
524}
525
526static void *ThreadExecute(void *args)
527{
528    ThreadManager *mgr = g_threadManager;
529    ThreadNode *threadNode = (ThreadNode *)args;
530    struct timespec abstime;
531    while (!threadNode->threadExit) {
532        pthread_mutex_lock(&mgr->mutex);
533        while (ListEmpty(mgr->executorQueue) && !threadNode->threadExit) {
534            ConvertToTimespec(60 * 1000, &abstime); // 60 * 1000 60s
535            pthread_cond_timedwait(&mgr->cond, &mgr->mutex, &abstime);
536        }
537        pthread_mutex_unlock(&mgr->mutex);
538        APPSPAWN_LOGV("bbbb threadNode->threadExit %{public}d", threadNode->threadExit);
539        RunExecutor(mgr, threadNode, 1);
540    }
541    return NULL;
542}
543