1/*
2 * Copyright (c) 2021-2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#ifndef HISTREAMER_FOUNDATION_OSAL_TASK_INNER_H
17#define HISTREAMER_FOUNDATION_OSAL_TASK_INNER_H
18
19#include <atomic>
20#include <functional>
21#include <string>
22#include <list>
23#include <map>
24#include "osal/task/condition_variable.h"
25#include "osal/task/mutex.h"
26#include "osal/task/autolock.h"
27#include "osal/task/pipeline_threadpool.h"
28
29#ifdef MEDIA_FOUNDATION_FFRT
30    #include "ffrt.h"
31#else
32    #include <map>
33#endif
34
35
36namespace OHOS {
37namespace Media {
38
39class TaskInner : public std::enable_shared_from_this<TaskInner> {
40public:
41    explicit TaskInner(const std::string& name, const std::string& groupId, TaskType type,
42        TaskPriority priority, bool singleLoop);
43
44    virtual ~TaskInner();
45
46    virtual void Init();
47
48    virtual void DeInit();
49
50    virtual void Start();
51
52    virtual void Stop();
53
54    virtual void StopAsync();
55
56    virtual void Pause();
57
58    virtual void PauseAsync();
59
60    virtual void RegisterJob(const std::function<int64_t()>& job);
61
62    virtual void SubmitJobOnce(const std::function<void()>& job, int64_t delay, bool wait);
63
64    virtual void SubmitJob(const std::function<void()>& job, int64_t delay, bool wait);
65
66    virtual bool IsTaskRunning() { return runningState_ == RunningState::STARTED; }
67
68    virtual void UpdateDelayTime(int64_t delayUs);
69
70    void SetEnableStateChangeLog(bool enable) { isStateLogEnabled_ = enable; }
71
72    int64_t NextJobUs();
73
74    void HandleJob();
75
76    static void SleepInTask(unsigned ms);
77
78private:
79    enum class RunningState : int {
80        STARTED,
81        PAUSING,
82        PAUSED,
83        STOPPING,
84        STOPPED,
85    };
86
87    const std::string name_;
88    std::atomic<RunningState> runningState_{RunningState::PAUSED};
89    std::atomic<bool> jobState_{false};
90    std::function<int64_t()> job_;
91    bool singleLoop_ = false;
92    int64_t topProcessUs_ {-1};
93    bool topIsJob_ = false;
94    std::shared_ptr<PipeLineThread> pipelineThread_;
95    std::atomic<bool> isStateLogEnabled_{true};
96#ifdef MEDIA_FOUNDATION_FFRT
97    void DoJob(const std::function<void()>& job);
98    std::shared_ptr<ffrt::queue> jobQueue_;
99    Mutex stateMutex_;
100    ConditionVariable syncCond_;
101    ffrt::recursive_mutex jobMutex_;
102#else
103    void UpdateTop();
104
105    int64_t InsertJob(const std::function<void()>& job, int64_t delayUs, bool inJobQueue);
106
107    Mutex stateMutex_{};
108    FairMutex jobMutex_{};
109    ConditionVariable syncCond_{};
110    ConditionVariable replyCond_{};
111    std::map<int64_t, std::function<void()>> msgQueue_;  // msg will be sorted by timeUs
112    std::map<int64_t, std::function<void()>> jobQueue_;  // msg will be sorted by timeUs
113#endif
114};
115} // namespace Media
116} // namespace OHOS
117#endif // HISTREAMER_FOUNDATION_OSAL_TASK_H
118
119