1/*
2 * Copyright (c) 2021-2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15#define MEDIA_TASK_THREAD
16#define HST_LOG_TAG "Task"
17#include "osal/task/task.h"
18#include "osal/task/taskInner.h"
19#include "osal/task/thread.h"
20#include "osal/task/pipeline_threadpool.h"
21#include "osal/utils/util.h"
22#include "cpp_ext/memory_ext.h"
23#include "common/log.h"
24
25#include <mutex>
26
27namespace {
28constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "TaskInner" };
29}
30
31namespace OHOS {
32namespace Media {
33namespace {
34    constexpr int64_t INVALID_DELAY_TIME_US = 10000000; // 10s
35}
36static std::atomic<int32_t> singletonTaskId = 0;
37
38void TaskInner::SleepInTask(unsigned ms)
39{
40    OSAL::SleepFor(ms);
41}
42
43static int64_t GetNowUs()
44{
45    auto now = std::chrono::steady_clock::now();
46    return std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
47}
48
49TaskInner::TaskInner(const std::string& name, const std::string& groupId, TaskType type, TaskPriority priority,
50    bool singleLoop)
51    : name_(std::move(name)), runningState_(RunningState::PAUSED), singleLoop_(singleLoop)
52{
53    MEDIA_LOG_I(">> " PUBLIC_LOG_S " groupId:" PUBLIC_LOG_S " type:%{public}d ctor",
54        name_.c_str(), groupId.c_str(), type);
55    if (type == TaskType::SINGLETON) {
56        std::string newName = name_ + std::to_string(++singletonTaskId);
57        pipelineThread_ = PipeLineThreadPool::GetInstance().FindThread(newName, type, priority);
58    } else {
59        pipelineThread_ = PipeLineThreadPool::GetInstance().FindThread(groupId, type, priority);
60    }
61}
62
63void TaskInner::Init()
64{
65    MEDIA_LOG_I(">> " PUBLIC_LOG_S " Init", name_.c_str());
66    pipelineThread_->AddTask(shared_from_this());
67}
68
69void TaskInner::DeInit()
70{
71    MEDIA_LOG_I(PUBLIC_LOG_S " DeInit", name_.c_str());
72    pipelineThread_->RemoveTask(shared_from_this());
73    {
74        AutoLock lock1(jobMutex_);
75        AutoLock lock2(stateMutex_);
76        runningState_ = RunningState::STOPPED;
77        topProcessUs_ = -1;
78    }
79    MEDIA_LOG_I(PUBLIC_LOG_S " DeInit done", name_.c_str());
80}
81
82TaskInner::~TaskInner()
83{
84    MEDIA_LOG_D(PUBLIC_LOG_S " dtor", name_.c_str());
85}
86
87void TaskInner::UpdateDelayTime(int64_t delayUs)
88{
89    if (!singleLoop_) {
90        MEDIA_LOG_D("task " PUBLIC_LOG_S " UpdateDelayTime do nothing", name_.c_str());
91        return;
92    }
93    MEDIA_LOG_D("task " PUBLIC_LOG_S " UpdateDelayTime enter topProcessUs:" PUBLIC_LOG_D64
94        ", delayUs:" PUBLIC_LOG_D64, name_.c_str(), topProcessUs_, delayUs);
95    pipelineThread_->LockJobState();
96    AutoLock lock(stateMutex_);
97    if (runningState_ != RunningState::STARTED) {
98        pipelineThread_->UnLockJobState(false);
99        return;
100    }
101    topProcessUs_ = GetNowUs() + delayUs;
102    pipelineThread_->UnLockJobState(true);
103    MEDIA_LOG_D("task " PUBLIC_LOG_S " UpdateDelayTime exit topProcessUs:" PUBLIC_LOG_D64,
104        name_.c_str(), topProcessUs_);
105}
106
107void TaskInner::Start()
108{
109    MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " Start", name_.c_str());
110    pipelineThread_->LockJobState();
111    AutoLock lock(stateMutex_);
112    runningState_ = RunningState::STARTED;
113    if (singleLoop_) {
114        if (!job_) {
115            MEDIA_LOG_D("task " PUBLIC_LOG_S " Start, job invalid", name_.c_str());
116        }
117        topProcessUs_ = GetNowUs();
118    } else {
119        UpdateTop();
120    }
121    pipelineThread_->UnLockJobState(true);
122    MEDIA_LOG_I_FALSE_D(isStateLogEnabled_.load(), "task " PUBLIC_LOG_S " Start done", name_.c_str());
123}
124
125void TaskInner::Stop()
126{
127    if (pipelineThread_->IsRunningInSelf()) {
128        MEDIA_LOG_W(PUBLIC_LOG_S " Stop done in self task", name_.c_str());
129        runningState_ = RunningState::STOPPED;
130        topProcessUs_ = -1;
131        return;
132    }
133    MEDIA_LOG_I(">> " PUBLIC_LOG_S " Stop", name_.c_str());
134    AutoLock lock1(jobMutex_);
135    pipelineThread_->LockJobState();
136    AutoLock lock2(stateMutex_);
137    if (runningState_.load() == RunningState::STOPPED) {
138        pipelineThread_->UnLockJobState(false);
139        return;
140    }
141    runningState_ = RunningState::STOPPED;
142    topProcessUs_ = -1;
143    pipelineThread_->UnLockJobState(true);
144    MEDIA_LOG_I(PUBLIC_LOG_S " Stop <<", name_.c_str());
145}
146
147void TaskInner::StopAsync()
148{
149    if (pipelineThread_->IsRunningInSelf()) {
150        MEDIA_LOG_W(PUBLIC_LOG_S " Stop done in self task", name_.c_str());
151        runningState_ = RunningState::STOPPED;
152        topProcessUs_ = -1;
153        return;
154    }
155    MEDIA_LOG_I(PUBLIC_LOG_S " StopAsync", name_.c_str());
156    pipelineThread_->LockJobState();
157    AutoLock lock(stateMutex_);
158    bool stateChanged = false;
159    if (runningState_.load() != RunningState::STOPPED) {
160        runningState_ = RunningState::STOPPED;
161        topProcessUs_ = -1;
162        stateChanged = true;
163    }
164    pipelineThread_->UnLockJobState(stateChanged);
165}
166
167void TaskInner::Pause()
168{
169    if (pipelineThread_->IsRunningInSelf()) {
170        RunningState state = runningState_.load();
171        if (state == RunningState::STARTED) {
172            MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(),
173                PUBLIC_LOG_S " Pause done in self task", name_.c_str());
174            runningState_ = RunningState::PAUSED;
175            topProcessUs_ = -1;
176            return;
177        } else {
178            MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(),
179                PUBLIC_LOG_S " Pause skip in self task, curret State: " PUBLIC_LOG_D32, name_.c_str(), state);
180            return;
181        }
182    }
183    MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " Pause", name_.c_str());
184    AutoLock lock1(jobMutex_);
185    pipelineThread_->LockJobState();
186    AutoLock lock2(stateMutex_);
187    RunningState state = runningState_.load();
188    if (state != RunningState::STARTED) {
189        pipelineThread_->UnLockJobState(false);
190        return;
191    }
192    runningState_ = RunningState::PAUSED;
193    topProcessUs_ = -1;
194    pipelineThread_->UnLockJobState(true);
195    MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " Pause done.", name_.c_str());
196}
197
198// There is no need to perform notification, as no call would wait for PAUSING state.
199// If perform notification may cause unnecessasy running when the task is already in PAUSED state.
200void TaskInner::PauseAsync()
201{
202    if (pipelineThread_->IsRunningInSelf()) {
203        RunningState state = runningState_.load();
204        if (state == RunningState::STARTED) {
205            MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(),
206                PUBLIC_LOG_S " PauseAsync done in self task", name_.c_str());
207            runningState_ = RunningState::PAUSED;
208            topProcessUs_ = -1;
209            return;
210        } else {
211            MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(),
212                PUBLIC_LOG_S " PauseAsync skip in self task, curretState:%{public}d", name_.c_str(), state);
213            return;
214        }
215    }
216    MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " PauseAsync", name_.c_str());
217    pipelineThread_->LockJobState();
218    AutoLock lock(stateMutex_);
219    bool stateChanged = false;
220    if (runningState_.load() == RunningState::STARTED) {
221        runningState_ = RunningState::PAUSED;
222        topProcessUs_ = -1;
223        stateChanged = true;
224    }
225    pipelineThread_->UnLockJobState(stateChanged);
226}
227
228void TaskInner::RegisterJob(const std::function<int64_t()>& job)
229{
230    MEDIA_LOG_I(PUBLIC_LOG_S " RegisterHandler", name_.c_str());
231    job_ = std::move(job);
232}
233
234void TaskInner::SubmitJobOnce(const std::function<void()>& job, int64_t delayUs, bool wait)
235{
236    MEDIA_LOG_D(PUBLIC_LOG_S " SubmitJobOnce", name_.c_str());
237    int64_t time = InsertJob(job, delayUs, false);
238    if (wait) {
239        AutoLock lock(stateMutex_);
240        replyCond_.Wait(lock, [this, time] { return msgQueue_.find(time) == msgQueue_.end(); });
241    }
242}
243
244void TaskInner::SubmitJob(const std::function<void()>& job, int64_t delayUs, bool wait)
245{
246    MEDIA_LOG_D(PUBLIC_LOG_S " SubmitJob delayUs:%{public}" PRId64, name_.c_str(), delayUs);
247    int64_t time = InsertJob(job, delayUs, true);
248    if (wait) {
249        AutoLock lock(stateMutex_);
250        replyCond_.Wait(lock, [this, time] { return jobQueue_.find(time) == jobQueue_.end(); });
251    }
252}
253
254void TaskInner::UpdateTop()
255{
256    if (msgQueue_.empty() && ((runningState_.load() != RunningState::STARTED) || jobQueue_.empty())) {
257        topProcessUs_ = -1;
258        return;
259    }
260    if (msgQueue_.empty()) {
261        topProcessUs_ = jobQueue_.begin()->first;
262        topIsJob_ = true;
263    } else if ((runningState_.load() != RunningState::STARTED) || jobQueue_.empty()) {
264        topProcessUs_ = msgQueue_.begin()->first;
265        topIsJob_ = false;
266    } else {
267        int64_t msgProcessTime = msgQueue_.begin()->first;
268        int64_t jobProcessTime = jobQueue_.begin()->first;
269        int64_t nowUs =  GetNowUs();
270        if (msgProcessTime <= nowUs || msgProcessTime <= jobProcessTime) {
271            topProcessUs_ = msgProcessTime;
272            topIsJob_ = false;
273        } else  {
274            topProcessUs_ = jobProcessTime;
275            topIsJob_ = true;
276        }
277    }
278}
279
280int64_t TaskInner::NextJobUs()
281{
282    AutoLock lock(stateMutex_);
283    return topProcessUs_;
284}
285
286void TaskInner::HandleJob()
287{
288    AutoLock lock(jobMutex_);
289    if (singleLoop_) {
290        stateMutex_.lock();
291        int64_t currentTopProcessUs = topProcessUs_;
292        if (runningState_.load() == RunningState::PAUSED || runningState_.load() == RunningState::STOPPED) {
293            topProcessUs_ = -1;
294            stateMutex_.unlock();
295            return;
296        }
297        stateMutex_.unlock();
298        int64_t nextDelay = (!job_) ? INVALID_DELAY_TIME_US : job_();
299
300        AutoLock lock(stateMutex_);
301        // if topProcessUs_ is -1, we already pause/stop in job_()
302        // if topProcessUs_ is changed, we should ignore the returned delay time.
303        if (topProcessUs_ != -1 && currentTopProcessUs == topProcessUs_) {
304            topProcessUs_ = GetNowUs() + nextDelay;
305        }
306    } else {
307        std::function<void()> nextJob;
308        stateMutex_.lock();
309        if (topIsJob_) {
310            nextJob = std::move(jobQueue_.begin()->second);
311            jobQueue_.erase(jobQueue_.begin());
312        } else {
313            nextJob = std::move(msgQueue_.begin()->second);
314            msgQueue_.erase(msgQueue_.begin());
315        }
316        {
317            stateMutex_.unlock();
318            nextJob();
319            replyCond_.NotifyAll();
320        }
321        AutoLock lock(stateMutex_);
322        UpdateTop();
323    }
324}
325
326int64_t TaskInner::InsertJob(const std::function<void()>& job, int64_t delayUs, bool inJobQueue)
327{
328    pipelineThread_->LockJobState();
329    AutoLock lock(stateMutex_);
330    int64_t nowUs = GetNowUs();
331    if (delayUs < 0) {
332        delayUs = 0;
333    }
334    int64_t processTime = nowUs + delayUs;
335    if (inJobQueue) {
336        while (jobQueue_.find(processTime) != jobQueue_.end()) { // To prevent dropping job unexpectedly
337            MEDIA_LOG_W("DUPLICATIVE jobQueue_ TIMESTAMP!!!");
338            processTime++;
339        }
340        jobQueue_[processTime] = std::move(job);
341    } else {
342        while (msgQueue_.find(processTime) != msgQueue_.end()) { // To prevent dropping job unexpectedly
343            MEDIA_LOG_W("DUPLICATIVE msgQueue_ TIMESTAMP!!!");
344            processTime++;
345        }
346        msgQueue_[processTime] = std::move(job);
347    }
348    int64_t lastProcessUs = topProcessUs_;
349    if (processTime <= topProcessUs_ || topProcessUs_ == -1) {
350        UpdateTop();
351    }
352    pipelineThread_->UnLockJobState(lastProcessUs != topProcessUs_);
353    return processTime;
354}
355} // namespace Media
356} // namespace OHOS
357