1/*
2 * Copyright (c) 2024-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15#define HST_LOG_TAG "Task"
16#include "osal/task/task.h"
17#include "osal/task/taskInner.h"
18#include "osal/task/thread.h"
19#include "osal/utils/util.h"
20#include "cpp_ext/memory_ext.h"
21#include "common/log.h"
22
23#include <mutex>
24
25namespace {
26constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "PipelineTreadPool" };
27}
28
29namespace OHOS {
30namespace Media {
31namespace {
32    constexpr int64_t ADJUST_US = 500;
33    constexpr int64_t US_PER_MS = 1000;
34}
35
36static ThreadPriority ConvertPriorityType(TaskPriority priority)
37{
38    switch (priority) {
39        case TaskPriority::LOW:
40            return ThreadPriority::LOW;
41        case TaskPriority::NORMAL:
42            return ThreadPriority::NORMAL;
43        case TaskPriority::MIDDLE:
44            return ThreadPriority::MIDDLE;
45        case TaskPriority::HIGHEST:
46            return ThreadPriority::HIGHEST;
47        default:
48            return ThreadPriority::HIGH;
49    }
50}
51
52static std::string TaskTypeConvert(TaskType type)
53{
54    static const std::map<TaskType, std::string> table = {
55        {TaskType::GLOBAL, "G"},
56        {TaskType::VIDEO, "V"},
57        {TaskType::AUDIO, "A"},
58        {TaskType::SUBTITLE, "T"},
59        {TaskType::SINGLETON, "S"},
60    };
61    auto it = table.find(type);
62    if (it != table.end()) {
63        return it->second;
64    }
65    return "";
66}
67
68static int64_t GetNowUs()
69{
70    auto now = std::chrono::steady_clock::now();
71    return std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
72}
73
74PipeLineThreadPool& PipeLineThreadPool::GetInstance()
75{
76    static PipeLineThreadPool instance;
77    return instance;
78}
79
80std::shared_ptr<PipeLineThread> PipeLineThreadPool::FindThread(std::string groupId, TaskType taskType, TaskPriority priority)
81{
82    AutoLock lock(mutex_);
83    if (workerGroupMap.find(groupId) == workerGroupMap.end()) {
84        workerGroupMap[groupId] = std::make_shared<std::list<std::shared_ptr<PipeLineThread>>>();
85    }
86    std::shared_ptr<std::list<std::shared_ptr<PipeLineThread>>> threadList = workerGroupMap[groupId];
87    for (auto thread : *threadList.get()) {
88        if (thread->type_ == taskType) {
89            return thread;
90        }
91    }
92    std::shared_ptr<PipeLineThread> newThread = std::make_shared<PipeLineThread>(groupId, taskType, priority);
93    threadList->push_back(newThread);
94    return newThread;
95}
96
97void PipeLineThreadPool::DestroyThread(std::string groupId)
98{
99    MEDIA_LOG_I("PipeLineThread " PUBLIC_LOG_S " destroy", groupId.c_str());
100    std::shared_ptr<std::list<std::shared_ptr<PipeLineThread>>> threadList;
101    {
102        AutoLock lock(mutex_);
103        if (workerGroupMap.find(groupId) == workerGroupMap.end()) {
104            MEDIA_LOG_E("PipeLineThread  not exist");
105            return;
106        }
107        threadList = workerGroupMap[groupId];
108        workerGroupMap.erase(groupId);
109    }
110    for (auto thread : *threadList.get()) {
111        thread->Exit();
112    }
113}
114
115PipeLineThread::PipeLineThread(std::string groupId, TaskType type, TaskPriority priority)
116    : groupId_(groupId), type_(type)
117{
118    MEDIA_LOG_I("PipeLineThread name:" PUBLIC_LOG_S " type:%{public}d created call", groupId_.c_str(), type);
119    loop_ = CppExt::make_unique<Thread>(ConvertPriorityType(priority));
120    name_ = groupId_ + "_" + TaskTypeConvert(type);
121    loop_->SetName(name_);
122    threadExit_ = false;
123    if (loop_->CreateThread([this] { Run(); })) {
124        threadExit_ = false;
125    } else {
126        threadExit_ = true;
127        loop_ = nullptr;
128        MEDIA_LOG_E("PipeLineThread " PUBLIC_LOG_S " create failed", name_.c_str());
129    }
130}
131
132PipeLineThread::~PipeLineThread()
133{
134    Exit();
135}
136
137void PipeLineThread::Exit()
138{
139    {
140        AutoLock lock(mutex_);
141        FALSE_RETURN_W(!threadExit_.load() && loop_);
142
143        MEDIA_LOG_I("PipeLineThread " PUBLIC_LOG_S " exit", name_.c_str());
144        threadExit_ = true;
145        syncCond_.NotifyAll();
146
147        // trigger to quit thread in current running thread, must not wait,
148        // or else the current thread will be suspended and can not quit.
149        if (IsRunningInSelf()) {
150            return;
151        }
152    }
153    // loop_ destroy will wait thread join
154    loop_ = nullptr;
155}
156
157void PipeLineThread::Run()
158{
159    MEDIA_LOG_I("PipeLineThread " PUBLIC_LOG_S " run", name_.c_str());
160    while (true) {
161        std::shared_ptr<TaskInner> nextTask;
162        {
163            AutoLock lock(mutex_);
164            if (threadExit_.load()) {
165                break;
166            }
167            int64_t nextJobUs = INT64_MAX;
168            for (auto task: taskList_) {
169                int64_t taskJobUs = task->NextJobUs();
170                if (taskJobUs == -1) {
171                    continue;
172                }
173                if (taskJobUs < nextJobUs) {
174                    nextJobUs = taskJobUs;
175                    nextTask = task;
176                }
177            }
178            if (nextTask == nullptr) {
179                syncCond_.Wait(lock);
180                continue;
181            }
182            int64_t nowUs = GetNowUs();
183            if (nextJobUs > (nowUs + ADJUST_US)) {
184                syncCond_.WaitFor(lock, (nextJobUs - nowUs + ADJUST_US) / US_PER_MS);
185                continue;
186            }
187        }
188        nextTask->HandleJob();
189    }
190}
191
192void PipeLineThread::AddTask(std::shared_ptr<TaskInner> task)
193{
194    AutoLock lock(mutex_);
195    taskList_.push_back(task);
196}
197
198void PipeLineThread::RemoveTask(std::shared_ptr<TaskInner> task)
199{
200    {
201        AutoLock lock(mutex_);
202        taskList_.remove(task);
203        FALSE_LOG_MSG(!taskList_.empty(),
204         "PipeLineThread " PUBLIC_LOG_S " remove all Task", name_.c_str());
205    }
206    if (type_ == TaskType::SINGLETON) {
207        PipeLineThreadPool::GetInstance().DestroyThread(name_);
208    }
209}
210
211void PipeLineThread::LockJobState()
212{
213    if (IsRunningInSelf()) {
214        return;
215    }
216    mutex_.lock();
217}
218
219void PipeLineThread::UnLockJobState(bool notifyChange)
220{
221    if (IsRunningInSelf()) {
222        return;
223    }
224    if (notifyChange) {
225        syncCond_.NotifyAll();
226    }
227    mutex_.unlock();
228}
229
230bool PipeLineThread::IsRunningInSelf()
231{
232    return loop_ ? loop_->IsRunningInSelf() : false;
233}
234} // namespace Media
235} // namespace OHOS
236