1049e185fSopenharmony_ci/* 2049e185fSopenharmony_ci * Copyright (C) 2021 Huawei Device Co., Ltd. 3049e185fSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 4049e185fSopenharmony_ci * you may not use this file except in compliance with the License. 5049e185fSopenharmony_ci * You may obtain a copy of the License at 6049e185fSopenharmony_ci * 7049e185fSopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 8049e185fSopenharmony_ci * 9049e185fSopenharmony_ci * Unless required by applicable law or agreed to in writing, software 10049e185fSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 11049e185fSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12049e185fSopenharmony_ci * See the License for the specific language governing permissions and 13049e185fSopenharmony_ci * limitations under the License. 14049e185fSopenharmony_ci */ 15049e185fSopenharmony_ci 16049e185fSopenharmony_ci#include <sys/types.h> 17049e185fSopenharmony_ci#include <unistd.h> 18049e185fSopenharmony_ci#include <malloc.h> 19049e185fSopenharmony_ci#include "task_queue.h" 20049e185fSopenharmony_ci#include "media_log.h" 21049e185fSopenharmony_ci#include "media_errors.h" 22049e185fSopenharmony_ci 23049e185fSopenharmony_ciusing namespace OHOS::QOS; 24049e185fSopenharmony_ci 25049e185fSopenharmony_cinamespace { 26049e185fSopenharmony_ci constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_ONLY_PRERELEASE, LOG_DOMAIN_PLAYER, "TaskQueue" }; 27049e185fSopenharmony_ci} 28049e185fSopenharmony_ci 29049e185fSopenharmony_cinamespace OHOS { 30049e185fSopenharmony_cinamespace Media { 31049e185fSopenharmony_ciTaskQueue::~TaskQueue() 32049e185fSopenharmony_ci{ 33049e185fSopenharmony_ci (void)Stop(); 34049e185fSopenharmony_ci} 35049e185fSopenharmony_ci 36049e185fSopenharmony_ciint32_t TaskQueue::Start() 37049e185fSopenharmony_ci{ 38049e185fSopenharmony_ci std::unique_lock<std::mutex> lock(mutex_); 39049e185fSopenharmony_ci CHECK_AND_RETURN_RET_LOG(thread_ == nullptr, 40049e185fSopenharmony_ci MSERR_OK, "Started already, ignore ! [%{public}s]", name_.c_str()); 41049e185fSopenharmony_ci isExit_ = false; 42049e185fSopenharmony_ci thread_ = std::make_unique<std::thread>(&TaskQueue::TaskProcessor, this); 43049e185fSopenharmony_ci uint64_t curTimeNs = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count()); 44049e185fSopenharmony_ci MEDIA_LOGI("0x%{public}06" PRIXPTR " Instance thread started [%{public}s], curTimeUs: [%{public}" PRIu64 "]", 45049e185fSopenharmony_ci FAKE_POINTER(this), name_.c_str(), curTimeNs); 46049e185fSopenharmony_ci return MSERR_OK; 47049e185fSopenharmony_ci} 48049e185fSopenharmony_ci 49049e185fSopenharmony_ciint32_t TaskQueue::Stop() noexcept 50049e185fSopenharmony_ci{ 51049e185fSopenharmony_ci std::unique_lock<std::mutex> lock(mutex_); 52049e185fSopenharmony_ci if (isExit_) { 53049e185fSopenharmony_ci MEDIA_LOGD("Stopped already, ignore ! [%{public}s]", name_.c_str()); 54049e185fSopenharmony_ci return MSERR_OK; 55049e185fSopenharmony_ci } 56049e185fSopenharmony_ci 57049e185fSopenharmony_ci if (std::this_thread::get_id() == thread_->get_id()) { 58049e185fSopenharmony_ci MEDIA_LOGI("Stop at the task thread, reject"); 59049e185fSopenharmony_ci return MSERR_INVALID_OPERATION; 60049e185fSopenharmony_ci } 61049e185fSopenharmony_ci 62049e185fSopenharmony_ci std::unique_ptr<std::thread> t; 63049e185fSopenharmony_ci isExit_ = true; 64049e185fSopenharmony_ci cond_.notify_all(); 65049e185fSopenharmony_ci std::swap(thread_, t); 66049e185fSopenharmony_ci lock.unlock(); 67049e185fSopenharmony_ci 68049e185fSopenharmony_ci if (t != nullptr && t->joinable()) { 69049e185fSopenharmony_ci t->join(); 70049e185fSopenharmony_ci } 71049e185fSopenharmony_ci 72049e185fSopenharmony_ci lock.lock(); 73049e185fSopenharmony_ci CancelNotExecutedTaskLocked(); 74049e185fSopenharmony_ci return MSERR_OK; 75049e185fSopenharmony_ci} 76049e185fSopenharmony_ci 77049e185fSopenharmony_civoid TaskQueue::SetQos(const QosLevel level) 78049e185fSopenharmony_ci{ 79049e185fSopenharmony_ci if (tid_ == -1) { 80049e185fSopenharmony_ci MEDIA_LOGW("SetQos thread level failed, tid invalid"); 81049e185fSopenharmony_ci return; 82049e185fSopenharmony_ci } 83049e185fSopenharmony_ci MEDIA_LOGI("SetQos thread [%{public}d] level [%{public}d]", static_cast<int>(tid_), static_cast<int>(level)); 84049e185fSopenharmony_ci SetQosForOtherThread(level, tid_); 85049e185fSopenharmony_ci} 86049e185fSopenharmony_ci 87049e185fSopenharmony_civoid TaskQueue::ResetQos() 88049e185fSopenharmony_ci{ 89049e185fSopenharmony_ci if (tid_ == -1) { 90049e185fSopenharmony_ci MEDIA_LOGW("ResetQos thread level failed, tid invalid"); 91049e185fSopenharmony_ci return; 92049e185fSopenharmony_ci } 93049e185fSopenharmony_ci ResetQosForOtherThread(tid_); 94049e185fSopenharmony_ci MEDIA_LOGI("ResetQos thread [%{public}d] ok", static_cast<int>(tid_)); 95049e185fSopenharmony_ci} 96049e185fSopenharmony_ci 97049e185fSopenharmony_ci// cancelNotExecuted = false, delayUs = 0ULL. 98049e185fSopenharmony_ci__attribute__((no_sanitize("cfi"))) int32_t TaskQueue::EnqueueTask(const std::shared_ptr<ITaskHandler> &task, 99049e185fSopenharmony_ci bool cancelNotExecuted, uint64_t delayUs) 100049e185fSopenharmony_ci{ 101049e185fSopenharmony_ci constexpr uint64_t MAX_DELAY_US = 10000000ULL; // max delay. 102049e185fSopenharmony_ci 103049e185fSopenharmony_ci CHECK_AND_RETURN_RET_LOG(task != nullptr, MSERR_INVALID_VAL, 104049e185fSopenharmony_ci "Enqueue task when taskqueue task is nullptr.[%{public}s]", name_.c_str()); 105049e185fSopenharmony_ci 106049e185fSopenharmony_ci task->Clear(); 107049e185fSopenharmony_ci 108049e185fSopenharmony_ci CHECK_AND_RETURN_RET_LOG(delayUs < MAX_DELAY_US, MSERR_INVALID_VAL, 109049e185fSopenharmony_ci "Enqueue task when taskqueue delayUs[%{public}" PRIu64 "] is >= max delayUs[ %{public}" PRIu64 110049e185fSopenharmony_ci "], invalid! [%{public}s]", 111049e185fSopenharmony_ci delayUs, MAX_DELAY_US, name_.c_str()); 112049e185fSopenharmony_ci 113049e185fSopenharmony_ci std::unique_lock<std::mutex> lock(mutex_); 114049e185fSopenharmony_ci CHECK_AND_RETURN_RET_LOG(!isExit_, MSERR_INVALID_OPERATION, 115049e185fSopenharmony_ci "Enqueue task when taskqueue is stopped, failed ! [%{public}s]", name_.c_str()); 116049e185fSopenharmony_ci 117049e185fSopenharmony_ci if (cancelNotExecuted) { 118049e185fSopenharmony_ci CancelNotExecutedTaskLocked(); 119049e185fSopenharmony_ci } 120049e185fSopenharmony_ci 121049e185fSopenharmony_ci // 1000 is ns to us. 122049e185fSopenharmony_ci constexpr uint32_t US_TO_NS = 1000; 123049e185fSopenharmony_ci uint64_t curTimeNs = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count()); 124049e185fSopenharmony_ci CHECK_AND_RETURN_RET_LOG(curTimeNs < UINT64_MAX - delayUs * US_TO_NS, MSERR_INVALID_OPERATION, 125049e185fSopenharmony_ci "Enqueue task but timestamp is overflow, why? [%{public}s]", name_.c_str()); 126049e185fSopenharmony_ci 127049e185fSopenharmony_ci uint64_t executeTimeNs = delayUs * US_TO_NS + curTimeNs; 128049e185fSopenharmony_ci auto iter = std::find_if(taskList_.begin(), taskList_.end(), [executeTimeNs](const TaskHandlerItem &item) { 129049e185fSopenharmony_ci return (item.executeTimeNs_ > executeTimeNs); 130049e185fSopenharmony_ci }); 131049e185fSopenharmony_ci (void)taskList_.insert(iter, {task, executeTimeNs}); 132049e185fSopenharmony_ci cond_.notify_all(); 133049e185fSopenharmony_ci 134049e185fSopenharmony_ci return 0; 135049e185fSopenharmony_ci} 136049e185fSopenharmony_ci 137049e185fSopenharmony_ci__attribute__((no_sanitize("cfi"))) void TaskQueue::CancelNotExecutedTaskLocked() 138049e185fSopenharmony_ci{ 139049e185fSopenharmony_ci MEDIA_LOGD("All task not executed are being cancelled..........[%{public}s]", name_.c_str()); 140049e185fSopenharmony_ci while (!taskList_.empty()) { 141049e185fSopenharmony_ci std::shared_ptr<ITaskHandler> task = taskList_.front().task_; 142049e185fSopenharmony_ci taskList_.pop_front(); 143049e185fSopenharmony_ci if (task != nullptr) { 144049e185fSopenharmony_ci task->Cancel(); 145049e185fSopenharmony_ci } 146049e185fSopenharmony_ci } 147049e185fSopenharmony_ci} 148049e185fSopenharmony_ci 149049e185fSopenharmony_ci__attribute__((no_sanitize("cfi"))) void TaskQueue::TaskProcessor() 150049e185fSopenharmony_ci{ 151049e185fSopenharmony_ci constexpr uint32_t nameSizeMax = 15; 152049e185fSopenharmony_ci tid_ = gettid(); 153049e185fSopenharmony_ci MEDIA_LOGI("Enter TaskProcessor [%{public}s], tid_: (%{public}d)", name_.c_str(), tid_); 154049e185fSopenharmony_ci pthread_setname_np(pthread_self(), name_.substr(0, nameSizeMax).c_str()); 155049e185fSopenharmony_ci (void)mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE); 156049e185fSopenharmony_ci while (true) { 157049e185fSopenharmony_ci std::unique_lock<std::mutex> lock(mutex_); 158049e185fSopenharmony_ci cond_.wait(lock, [this] { return isExit_ || !taskList_.empty(); }); 159049e185fSopenharmony_ci if (isExit_) { 160049e185fSopenharmony_ci MEDIA_LOGI("Exit TaskProcessor [%{public}s], tid_: (%{public}d)", name_.c_str(), tid_); 161049e185fSopenharmony_ci return; 162049e185fSopenharmony_ci } 163049e185fSopenharmony_ci TaskHandlerItem item = taskList_.front(); 164049e185fSopenharmony_ci uint64_t curTimeNs = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count()); 165049e185fSopenharmony_ci if (curTimeNs >= item.executeTimeNs_) { 166049e185fSopenharmony_ci taskList_.pop_front(); 167049e185fSopenharmony_ci } else { 168049e185fSopenharmony_ci uint64_t diff = item.executeTimeNs_ - curTimeNs; 169049e185fSopenharmony_ci (void)cond_.wait_for(lock, std::chrono::nanoseconds(diff)); 170049e185fSopenharmony_ci continue; 171049e185fSopenharmony_ci } 172049e185fSopenharmony_ci isTaskExecuting_ = true; 173049e185fSopenharmony_ci lock.unlock(); 174049e185fSopenharmony_ci 175049e185fSopenharmony_ci if (item.task_ == nullptr || item.task_->IsCanceled()) { 176049e185fSopenharmony_ci MEDIA_LOGD("task is nullptr or task canceled. [%{public}s]", name_.c_str()); 177049e185fSopenharmony_ci lock.lock(); 178049e185fSopenharmony_ci isTaskExecuting_ = false; 179049e185fSopenharmony_ci lock.unlock(); 180049e185fSopenharmony_ci continue; 181049e185fSopenharmony_ci } 182049e185fSopenharmony_ci 183049e185fSopenharmony_ci item.task_->Execute(); 184049e185fSopenharmony_ci lock.lock(); 185049e185fSopenharmony_ci isTaskExecuting_ = false; 186049e185fSopenharmony_ci lock.unlock(); 187049e185fSopenharmony_ci if (item.task_->GetAttribute().periodicTimeUs_ == UINT64_MAX) { 188049e185fSopenharmony_ci continue; 189049e185fSopenharmony_ci } 190049e185fSopenharmony_ci int32_t res = EnqueueTask(item.task_, false, item.task_->GetAttribute().periodicTimeUs_); 191049e185fSopenharmony_ci if (res != MSERR_OK) { 192049e185fSopenharmony_ci MEDIA_LOGW("enqueue periodic task failed:%d, why? [%{public}s]", res, name_.c_str()); 193049e185fSopenharmony_ci } 194049e185fSopenharmony_ci } 195049e185fSopenharmony_ci (void)mallopt(M_FLUSH_THREAD_CACHE, 0); 196049e185fSopenharmony_ci MEDIA_LOGI("Leave TaskProcessor [%{public}s]", name_.c_str()); 197049e185fSopenharmony_ci} 198049e185fSopenharmony_ci 199049e185fSopenharmony_cibool TaskQueue::IsTaskExecuting() 200049e185fSopenharmony_ci{ 201049e185fSopenharmony_ci std::unique_lock<std::mutex> lock(mutex_); 202049e185fSopenharmony_ci return isTaskExecuting_; 203049e185fSopenharmony_ci} 204049e185fSopenharmony_ci} // namespace Media 205049e185fSopenharmony_ci} // namespace OHOS 206