1/*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <sys/types.h>
17#include <unistd.h>
18#include <malloc.h>
19#include "task_queue.h"
20#include "media_log.h"
21#include "media_errors.h"
22
23using namespace OHOS::QOS;
24
25namespace {
26    constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_ONLY_PRERELEASE, LOG_DOMAIN_PLAYER, "TaskQueue" };
27}
28
29namespace OHOS {
30namespace Media {
31TaskQueue::~TaskQueue()
32{
33    (void)Stop();
34}
35
36int32_t TaskQueue::Start()
37{
38    std::unique_lock<std::mutex> lock(mutex_);
39    CHECK_AND_RETURN_RET_LOG(thread_ == nullptr,
40        MSERR_OK, "Started already, ignore ! [%{public}s]", name_.c_str());
41    isExit_ = false;
42    thread_ = std::make_unique<std::thread>(&TaskQueue::TaskProcessor, this);
43    uint64_t curTimeNs = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count());
44    MEDIA_LOGI("0x%{public}06" PRIXPTR " Instance thread started [%{public}s], curTimeUs: [%{public}" PRIu64 "]",
45        FAKE_POINTER(this), name_.c_str(), curTimeNs);
46    return MSERR_OK;
47}
48
49int32_t TaskQueue::Stop() noexcept
50{
51    std::unique_lock<std::mutex> lock(mutex_);
52    if (isExit_) {
53        MEDIA_LOGD("Stopped already, ignore ! [%{public}s]", name_.c_str());
54        return MSERR_OK;
55    }
56
57    if (std::this_thread::get_id() == thread_->get_id()) {
58        MEDIA_LOGI("Stop at the task thread, reject");
59        return MSERR_INVALID_OPERATION;
60    }
61
62    std::unique_ptr<std::thread> t;
63    isExit_ = true;
64    cond_.notify_all();
65    std::swap(thread_, t);
66    lock.unlock();
67
68    if (t != nullptr && t->joinable()) {
69        t->join();
70    }
71
72    lock.lock();
73    CancelNotExecutedTaskLocked();
74    return MSERR_OK;
75}
76
77void TaskQueue::SetQos(const QosLevel level)
78{
79    if (tid_ == -1) {
80        MEDIA_LOGW("SetQos thread level failed, tid invalid");
81        return;
82    }
83    MEDIA_LOGI("SetQos thread [%{public}d] level [%{public}d]", static_cast<int>(tid_), static_cast<int>(level));
84    SetQosForOtherThread(level, tid_);
85}
86
87void TaskQueue::ResetQos()
88{
89    if (tid_ == -1) {
90        MEDIA_LOGW("ResetQos thread level failed, tid invalid");
91        return;
92    }
93    ResetQosForOtherThread(tid_);
94    MEDIA_LOGI("ResetQos thread [%{public}d] ok", static_cast<int>(tid_));
95}
96
97// cancelNotExecuted = false, delayUs = 0ULL.
98__attribute__((no_sanitize("cfi"))) int32_t TaskQueue::EnqueueTask(const std::shared_ptr<ITaskHandler> &task,
99    bool cancelNotExecuted, uint64_t delayUs)
100{
101    constexpr uint64_t MAX_DELAY_US = 10000000ULL; // max delay.
102
103    CHECK_AND_RETURN_RET_LOG(task != nullptr, MSERR_INVALID_VAL,
104        "Enqueue task when taskqueue task is nullptr.[%{public}s]", name_.c_str());
105
106    task->Clear();
107
108    CHECK_AND_RETURN_RET_LOG(delayUs < MAX_DELAY_US, MSERR_INVALID_VAL,
109        "Enqueue task when taskqueue delayUs[%{public}" PRIu64 "] is >= max delayUs[ %{public}" PRIu64
110        "], invalid! [%{public}s]",
111        delayUs, MAX_DELAY_US, name_.c_str());
112
113    std::unique_lock<std::mutex> lock(mutex_);
114    CHECK_AND_RETURN_RET_LOG(!isExit_, MSERR_INVALID_OPERATION,
115        "Enqueue task when taskqueue is stopped, failed ! [%{public}s]", name_.c_str());
116
117    if (cancelNotExecuted) {
118        CancelNotExecutedTaskLocked();
119    }
120
121    // 1000 is ns to us.
122    constexpr uint32_t US_TO_NS = 1000;
123    uint64_t curTimeNs = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count());
124    CHECK_AND_RETURN_RET_LOG(curTimeNs < UINT64_MAX - delayUs * US_TO_NS, MSERR_INVALID_OPERATION,
125        "Enqueue task but timestamp is overflow, why? [%{public}s]", name_.c_str());
126
127    uint64_t executeTimeNs = delayUs * US_TO_NS + curTimeNs;
128    auto iter = std::find_if(taskList_.begin(), taskList_.end(), [executeTimeNs](const TaskHandlerItem &item) {
129        return (item.executeTimeNs_ > executeTimeNs);
130    });
131    (void)taskList_.insert(iter, {task, executeTimeNs});
132    cond_.notify_all();
133
134    return 0;
135}
136
137__attribute__((no_sanitize("cfi"))) void TaskQueue::CancelNotExecutedTaskLocked()
138{
139    MEDIA_LOGD("All task not executed are being cancelled..........[%{public}s]", name_.c_str());
140    while (!taskList_.empty()) {
141        std::shared_ptr<ITaskHandler> task = taskList_.front().task_;
142        taskList_.pop_front();
143        if (task != nullptr) {
144            task->Cancel();
145        }
146    }
147}
148
149__attribute__((no_sanitize("cfi"))) void TaskQueue::TaskProcessor()
150{
151    constexpr uint32_t nameSizeMax = 15;
152    tid_ = gettid();
153    MEDIA_LOGI("Enter TaskProcessor [%{public}s], tid_: (%{public}d)", name_.c_str(), tid_);
154    pthread_setname_np(pthread_self(), name_.substr(0, nameSizeMax).c_str());
155    (void)mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
156    while (true) {
157        std::unique_lock<std::mutex> lock(mutex_);
158        cond_.wait(lock, [this] { return isExit_ || !taskList_.empty(); });
159        if (isExit_) {
160            MEDIA_LOGI("Exit TaskProcessor [%{public}s], tid_: (%{public}d)", name_.c_str(), tid_);
161            return;
162        }
163        TaskHandlerItem item = taskList_.front();
164        uint64_t curTimeNs = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count());
165        if (curTimeNs >= item.executeTimeNs_) {
166            taskList_.pop_front();
167        } else {
168            uint64_t diff =  item.executeTimeNs_ - curTimeNs;
169            (void)cond_.wait_for(lock, std::chrono::nanoseconds(diff));
170            continue;
171        }
172        isTaskExecuting_ = true;
173        lock.unlock();
174
175        if (item.task_ == nullptr || item.task_->IsCanceled()) {
176            MEDIA_LOGD("task is nullptr or task canceled. [%{public}s]", name_.c_str());
177            lock.lock();
178            isTaskExecuting_ = false;
179            lock.unlock();
180            continue;
181        }
182
183        item.task_->Execute();
184        lock.lock();
185        isTaskExecuting_ = false;
186        lock.unlock();
187        if (item.task_->GetAttribute().periodicTimeUs_ == UINT64_MAX) {
188            continue;
189        }
190        int32_t res = EnqueueTask(item.task_, false, item.task_->GetAttribute().periodicTimeUs_);
191        if (res != MSERR_OK) {
192            MEDIA_LOGW("enqueue periodic task failed:%d, why? [%{public}s]", res, name_.c_str());
193        }
194    }
195    (void)mallopt(M_FLUSH_THREAD_CACHE, 0);
196    MEDIA_LOGI("Leave TaskProcessor [%{public}s]", name_.c_str());
197}
198
199bool TaskQueue::IsTaskExecuting()
200{
201    std::unique_lock<std::mutex> lock(mutex_);
202    return isTaskExecuting_;
203}
204} // namespace Media
205} // namespace OHOS
206