1049e185fSopenharmony_ci/*
2049e185fSopenharmony_ci * Copyright (C) 2021 Huawei Device Co., Ltd.
3049e185fSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
4049e185fSopenharmony_ci * you may not use this file except in compliance with the License.
5049e185fSopenharmony_ci * You may obtain a copy of the License at
6049e185fSopenharmony_ci *
7049e185fSopenharmony_ci *     http://www.apache.org/licenses/LICENSE-2.0
8049e185fSopenharmony_ci *
9049e185fSopenharmony_ci * Unless required by applicable law or agreed to in writing, software
10049e185fSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
11049e185fSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12049e185fSopenharmony_ci * See the License for the specific language governing permissions and
13049e185fSopenharmony_ci * limitations under the License.
14049e185fSopenharmony_ci */
15049e185fSopenharmony_ci
16049e185fSopenharmony_ci#include <sys/types.h>
17049e185fSopenharmony_ci#include <unistd.h>
18049e185fSopenharmony_ci#include <malloc.h>
19049e185fSopenharmony_ci#include "task_queue.h"
20049e185fSopenharmony_ci#include "media_log.h"
21049e185fSopenharmony_ci#include "media_errors.h"
22049e185fSopenharmony_ci
23049e185fSopenharmony_ciusing namespace OHOS::QOS;
24049e185fSopenharmony_ci
25049e185fSopenharmony_cinamespace {
26049e185fSopenharmony_ci    constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_ONLY_PRERELEASE, LOG_DOMAIN_PLAYER, "TaskQueue" };
27049e185fSopenharmony_ci}
28049e185fSopenharmony_ci
29049e185fSopenharmony_cinamespace OHOS {
30049e185fSopenharmony_cinamespace Media {
31049e185fSopenharmony_ciTaskQueue::~TaskQueue()
32049e185fSopenharmony_ci{
33049e185fSopenharmony_ci    (void)Stop();
34049e185fSopenharmony_ci}
35049e185fSopenharmony_ci
36049e185fSopenharmony_ciint32_t TaskQueue::Start()
37049e185fSopenharmony_ci{
38049e185fSopenharmony_ci    std::unique_lock<std::mutex> lock(mutex_);
39049e185fSopenharmony_ci    CHECK_AND_RETURN_RET_LOG(thread_ == nullptr,
40049e185fSopenharmony_ci        MSERR_OK, "Started already, ignore ! [%{public}s]", name_.c_str());
41049e185fSopenharmony_ci    isExit_ = false;
42049e185fSopenharmony_ci    thread_ = std::make_unique<std::thread>(&TaskQueue::TaskProcessor, this);
43049e185fSopenharmony_ci    uint64_t curTimeNs = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count());
44049e185fSopenharmony_ci    MEDIA_LOGI("0x%{public}06" PRIXPTR " Instance thread started [%{public}s], curTimeUs: [%{public}" PRIu64 "]",
45049e185fSopenharmony_ci        FAKE_POINTER(this), name_.c_str(), curTimeNs);
46049e185fSopenharmony_ci    return MSERR_OK;
47049e185fSopenharmony_ci}
48049e185fSopenharmony_ci
49049e185fSopenharmony_ciint32_t TaskQueue::Stop() noexcept
50049e185fSopenharmony_ci{
51049e185fSopenharmony_ci    std::unique_lock<std::mutex> lock(mutex_);
52049e185fSopenharmony_ci    if (isExit_) {
53049e185fSopenharmony_ci        MEDIA_LOGD("Stopped already, ignore ! [%{public}s]", name_.c_str());
54049e185fSopenharmony_ci        return MSERR_OK;
55049e185fSopenharmony_ci    }
56049e185fSopenharmony_ci
57049e185fSopenharmony_ci    if (std::this_thread::get_id() == thread_->get_id()) {
58049e185fSopenharmony_ci        MEDIA_LOGI("Stop at the task thread, reject");
59049e185fSopenharmony_ci        return MSERR_INVALID_OPERATION;
60049e185fSopenharmony_ci    }
61049e185fSopenharmony_ci
62049e185fSopenharmony_ci    std::unique_ptr<std::thread> t;
63049e185fSopenharmony_ci    isExit_ = true;
64049e185fSopenharmony_ci    cond_.notify_all();
65049e185fSopenharmony_ci    std::swap(thread_, t);
66049e185fSopenharmony_ci    lock.unlock();
67049e185fSopenharmony_ci
68049e185fSopenharmony_ci    if (t != nullptr && t->joinable()) {
69049e185fSopenharmony_ci        t->join();
70049e185fSopenharmony_ci    }
71049e185fSopenharmony_ci
72049e185fSopenharmony_ci    lock.lock();
73049e185fSopenharmony_ci    CancelNotExecutedTaskLocked();
74049e185fSopenharmony_ci    return MSERR_OK;
75049e185fSopenharmony_ci}
76049e185fSopenharmony_ci
77049e185fSopenharmony_civoid TaskQueue::SetQos(const QosLevel level)
78049e185fSopenharmony_ci{
79049e185fSopenharmony_ci    if (tid_ == -1) {
80049e185fSopenharmony_ci        MEDIA_LOGW("SetQos thread level failed, tid invalid");
81049e185fSopenharmony_ci        return;
82049e185fSopenharmony_ci    }
83049e185fSopenharmony_ci    MEDIA_LOGI("SetQos thread [%{public}d] level [%{public}d]", static_cast<int>(tid_), static_cast<int>(level));
84049e185fSopenharmony_ci    SetQosForOtherThread(level, tid_);
85049e185fSopenharmony_ci}
86049e185fSopenharmony_ci
87049e185fSopenharmony_civoid TaskQueue::ResetQos()
88049e185fSopenharmony_ci{
89049e185fSopenharmony_ci    if (tid_ == -1) {
90049e185fSopenharmony_ci        MEDIA_LOGW("ResetQos thread level failed, tid invalid");
91049e185fSopenharmony_ci        return;
92049e185fSopenharmony_ci    }
93049e185fSopenharmony_ci    ResetQosForOtherThread(tid_);
94049e185fSopenharmony_ci    MEDIA_LOGI("ResetQos thread [%{public}d] ok", static_cast<int>(tid_));
95049e185fSopenharmony_ci}
96049e185fSopenharmony_ci
97049e185fSopenharmony_ci// cancelNotExecuted = false, delayUs = 0ULL.
98049e185fSopenharmony_ci__attribute__((no_sanitize("cfi"))) int32_t TaskQueue::EnqueueTask(const std::shared_ptr<ITaskHandler> &task,
99049e185fSopenharmony_ci    bool cancelNotExecuted, uint64_t delayUs)
100049e185fSopenharmony_ci{
101049e185fSopenharmony_ci    constexpr uint64_t MAX_DELAY_US = 10000000ULL; // max delay.
102049e185fSopenharmony_ci
103049e185fSopenharmony_ci    CHECK_AND_RETURN_RET_LOG(task != nullptr, MSERR_INVALID_VAL,
104049e185fSopenharmony_ci        "Enqueue task when taskqueue task is nullptr.[%{public}s]", name_.c_str());
105049e185fSopenharmony_ci
106049e185fSopenharmony_ci    task->Clear();
107049e185fSopenharmony_ci
108049e185fSopenharmony_ci    CHECK_AND_RETURN_RET_LOG(delayUs < MAX_DELAY_US, MSERR_INVALID_VAL,
109049e185fSopenharmony_ci        "Enqueue task when taskqueue delayUs[%{public}" PRIu64 "] is >= max delayUs[ %{public}" PRIu64
110049e185fSopenharmony_ci        "], invalid! [%{public}s]",
111049e185fSopenharmony_ci        delayUs, MAX_DELAY_US, name_.c_str());
112049e185fSopenharmony_ci
113049e185fSopenharmony_ci    std::unique_lock<std::mutex> lock(mutex_);
114049e185fSopenharmony_ci    CHECK_AND_RETURN_RET_LOG(!isExit_, MSERR_INVALID_OPERATION,
115049e185fSopenharmony_ci        "Enqueue task when taskqueue is stopped, failed ! [%{public}s]", name_.c_str());
116049e185fSopenharmony_ci
117049e185fSopenharmony_ci    if (cancelNotExecuted) {
118049e185fSopenharmony_ci        CancelNotExecutedTaskLocked();
119049e185fSopenharmony_ci    }
120049e185fSopenharmony_ci
121049e185fSopenharmony_ci    // 1000 is ns to us.
122049e185fSopenharmony_ci    constexpr uint32_t US_TO_NS = 1000;
123049e185fSopenharmony_ci    uint64_t curTimeNs = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count());
124049e185fSopenharmony_ci    CHECK_AND_RETURN_RET_LOG(curTimeNs < UINT64_MAX - delayUs * US_TO_NS, MSERR_INVALID_OPERATION,
125049e185fSopenharmony_ci        "Enqueue task but timestamp is overflow, why? [%{public}s]", name_.c_str());
126049e185fSopenharmony_ci
127049e185fSopenharmony_ci    uint64_t executeTimeNs = delayUs * US_TO_NS + curTimeNs;
128049e185fSopenharmony_ci    auto iter = std::find_if(taskList_.begin(), taskList_.end(), [executeTimeNs](const TaskHandlerItem &item) {
129049e185fSopenharmony_ci        return (item.executeTimeNs_ > executeTimeNs);
130049e185fSopenharmony_ci    });
131049e185fSopenharmony_ci    (void)taskList_.insert(iter, {task, executeTimeNs});
132049e185fSopenharmony_ci    cond_.notify_all();
133049e185fSopenharmony_ci
134049e185fSopenharmony_ci    return 0;
135049e185fSopenharmony_ci}
136049e185fSopenharmony_ci
137049e185fSopenharmony_ci__attribute__((no_sanitize("cfi"))) void TaskQueue::CancelNotExecutedTaskLocked()
138049e185fSopenharmony_ci{
139049e185fSopenharmony_ci    MEDIA_LOGD("All task not executed are being cancelled..........[%{public}s]", name_.c_str());
140049e185fSopenharmony_ci    while (!taskList_.empty()) {
141049e185fSopenharmony_ci        std::shared_ptr<ITaskHandler> task = taskList_.front().task_;
142049e185fSopenharmony_ci        taskList_.pop_front();
143049e185fSopenharmony_ci        if (task != nullptr) {
144049e185fSopenharmony_ci            task->Cancel();
145049e185fSopenharmony_ci        }
146049e185fSopenharmony_ci    }
147049e185fSopenharmony_ci}
148049e185fSopenharmony_ci
149049e185fSopenharmony_ci__attribute__((no_sanitize("cfi"))) void TaskQueue::TaskProcessor()
150049e185fSopenharmony_ci{
151049e185fSopenharmony_ci    constexpr uint32_t nameSizeMax = 15;
152049e185fSopenharmony_ci    tid_ = gettid();
153049e185fSopenharmony_ci    MEDIA_LOGI("Enter TaskProcessor [%{public}s], tid_: (%{public}d)", name_.c_str(), tid_);
154049e185fSopenharmony_ci    pthread_setname_np(pthread_self(), name_.substr(0, nameSizeMax).c_str());
155049e185fSopenharmony_ci    (void)mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
156049e185fSopenharmony_ci    while (true) {
157049e185fSopenharmony_ci        std::unique_lock<std::mutex> lock(mutex_);
158049e185fSopenharmony_ci        cond_.wait(lock, [this] { return isExit_ || !taskList_.empty(); });
159049e185fSopenharmony_ci        if (isExit_) {
160049e185fSopenharmony_ci            MEDIA_LOGI("Exit TaskProcessor [%{public}s], tid_: (%{public}d)", name_.c_str(), tid_);
161049e185fSopenharmony_ci            return;
162049e185fSopenharmony_ci        }
163049e185fSopenharmony_ci        TaskHandlerItem item = taskList_.front();
164049e185fSopenharmony_ci        uint64_t curTimeNs = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count());
165049e185fSopenharmony_ci        if (curTimeNs >= item.executeTimeNs_) {
166049e185fSopenharmony_ci            taskList_.pop_front();
167049e185fSopenharmony_ci        } else {
168049e185fSopenharmony_ci            uint64_t diff =  item.executeTimeNs_ - curTimeNs;
169049e185fSopenharmony_ci            (void)cond_.wait_for(lock, std::chrono::nanoseconds(diff));
170049e185fSopenharmony_ci            continue;
171049e185fSopenharmony_ci        }
172049e185fSopenharmony_ci        isTaskExecuting_ = true;
173049e185fSopenharmony_ci        lock.unlock();
174049e185fSopenharmony_ci
175049e185fSopenharmony_ci        if (item.task_ == nullptr || item.task_->IsCanceled()) {
176049e185fSopenharmony_ci            MEDIA_LOGD("task is nullptr or task canceled. [%{public}s]", name_.c_str());
177049e185fSopenharmony_ci            lock.lock();
178049e185fSopenharmony_ci            isTaskExecuting_ = false;
179049e185fSopenharmony_ci            lock.unlock();
180049e185fSopenharmony_ci            continue;
181049e185fSopenharmony_ci        }
182049e185fSopenharmony_ci
183049e185fSopenharmony_ci        item.task_->Execute();
184049e185fSopenharmony_ci        lock.lock();
185049e185fSopenharmony_ci        isTaskExecuting_ = false;
186049e185fSopenharmony_ci        lock.unlock();
187049e185fSopenharmony_ci        if (item.task_->GetAttribute().periodicTimeUs_ == UINT64_MAX) {
188049e185fSopenharmony_ci            continue;
189049e185fSopenharmony_ci        }
190049e185fSopenharmony_ci        int32_t res = EnqueueTask(item.task_, false, item.task_->GetAttribute().periodicTimeUs_);
191049e185fSopenharmony_ci        if (res != MSERR_OK) {
192049e185fSopenharmony_ci            MEDIA_LOGW("enqueue periodic task failed:%d, why? [%{public}s]", res, name_.c_str());
193049e185fSopenharmony_ci        }
194049e185fSopenharmony_ci    }
195049e185fSopenharmony_ci    (void)mallopt(M_FLUSH_THREAD_CACHE, 0);
196049e185fSopenharmony_ci    MEDIA_LOGI("Leave TaskProcessor [%{public}s]", name_.c_str());
197049e185fSopenharmony_ci}
198049e185fSopenharmony_ci
199049e185fSopenharmony_cibool TaskQueue::IsTaskExecuting()
200049e185fSopenharmony_ci{
201049e185fSopenharmony_ci    std::unique_lock<std::mutex> lock(mutex_);
202049e185fSopenharmony_ci    return isTaskExecuting_;
203049e185fSopenharmony_ci}
204049e185fSopenharmony_ci} // namespace Media
205049e185fSopenharmony_ci} // namespace OHOS
206