1/* 2 * Copyright (C) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16#include <sys/types.h> 17#include <unistd.h> 18#include <malloc.h> 19#include "task_queue.h" 20#include "media_log.h" 21#include "media_errors.h" 22 23using namespace OHOS::QOS; 24 25namespace { 26 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_ONLY_PRERELEASE, LOG_DOMAIN_PLAYER, "TaskQueue" }; 27} 28 29namespace OHOS { 30namespace Media { 31TaskQueue::~TaskQueue() 32{ 33 (void)Stop(); 34} 35 36int32_t TaskQueue::Start() 37{ 38 std::unique_lock<std::mutex> lock(mutex_); 39 CHECK_AND_RETURN_RET_LOG(thread_ == nullptr, 40 MSERR_OK, "Started already, ignore ! [%{public}s]", name_.c_str()); 41 isExit_ = false; 42 thread_ = std::make_unique<std::thread>(&TaskQueue::TaskProcessor, this); 43 uint64_t curTimeNs = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count()); 44 MEDIA_LOGI("0x%{public}06" PRIXPTR " Instance thread started [%{public}s], curTimeUs: [%{public}" PRIu64 "]", 45 FAKE_POINTER(this), name_.c_str(), curTimeNs); 46 return MSERR_OK; 47} 48 49int32_t TaskQueue::Stop() noexcept 50{ 51 std::unique_lock<std::mutex> lock(mutex_); 52 if (isExit_) { 53 MEDIA_LOGD("Stopped already, ignore ! [%{public}s]", name_.c_str()); 54 return MSERR_OK; 55 } 56 57 if (std::this_thread::get_id() == thread_->get_id()) { 58 MEDIA_LOGI("Stop at the task thread, reject"); 59 return MSERR_INVALID_OPERATION; 60 } 61 62 std::unique_ptr<std::thread> t; 63 isExit_ = true; 64 cond_.notify_all(); 65 std::swap(thread_, t); 66 lock.unlock(); 67 68 if (t != nullptr && t->joinable()) { 69 t->join(); 70 } 71 72 lock.lock(); 73 CancelNotExecutedTaskLocked(); 74 return MSERR_OK; 75} 76 77void TaskQueue::SetQos(const QosLevel level) 78{ 79 if (tid_ == -1) { 80 MEDIA_LOGW("SetQos thread level failed, tid invalid"); 81 return; 82 } 83 MEDIA_LOGI("SetQos thread [%{public}d] level [%{public}d]", static_cast<int>(tid_), static_cast<int>(level)); 84 SetQosForOtherThread(level, tid_); 85} 86 87void TaskQueue::ResetQos() 88{ 89 if (tid_ == -1) { 90 MEDIA_LOGW("ResetQos thread level failed, tid invalid"); 91 return; 92 } 93 ResetQosForOtherThread(tid_); 94 MEDIA_LOGI("ResetQos thread [%{public}d] ok", static_cast<int>(tid_)); 95} 96 97// cancelNotExecuted = false, delayUs = 0ULL. 98__attribute__((no_sanitize("cfi"))) int32_t TaskQueue::EnqueueTask(const std::shared_ptr<ITaskHandler> &task, 99 bool cancelNotExecuted, uint64_t delayUs) 100{ 101 constexpr uint64_t MAX_DELAY_US = 10000000ULL; // max delay. 102 103 CHECK_AND_RETURN_RET_LOG(task != nullptr, MSERR_INVALID_VAL, 104 "Enqueue task when taskqueue task is nullptr.[%{public}s]", name_.c_str()); 105 106 task->Clear(); 107 108 CHECK_AND_RETURN_RET_LOG(delayUs < MAX_DELAY_US, MSERR_INVALID_VAL, 109 "Enqueue task when taskqueue delayUs[%{public}" PRIu64 "] is >= max delayUs[ %{public}" PRIu64 110 "], invalid! [%{public}s]", 111 delayUs, MAX_DELAY_US, name_.c_str()); 112 113 std::unique_lock<std::mutex> lock(mutex_); 114 CHECK_AND_RETURN_RET_LOG(!isExit_, MSERR_INVALID_OPERATION, 115 "Enqueue task when taskqueue is stopped, failed ! [%{public}s]", name_.c_str()); 116 117 if (cancelNotExecuted) { 118 CancelNotExecutedTaskLocked(); 119 } 120 121 // 1000 is ns to us. 122 constexpr uint32_t US_TO_NS = 1000; 123 uint64_t curTimeNs = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count()); 124 CHECK_AND_RETURN_RET_LOG(curTimeNs < UINT64_MAX - delayUs * US_TO_NS, MSERR_INVALID_OPERATION, 125 "Enqueue task but timestamp is overflow, why? [%{public}s]", name_.c_str()); 126 127 uint64_t executeTimeNs = delayUs * US_TO_NS + curTimeNs; 128 auto iter = std::find_if(taskList_.begin(), taskList_.end(), [executeTimeNs](const TaskHandlerItem &item) { 129 return (item.executeTimeNs_ > executeTimeNs); 130 }); 131 (void)taskList_.insert(iter, {task, executeTimeNs}); 132 cond_.notify_all(); 133 134 return 0; 135} 136 137__attribute__((no_sanitize("cfi"))) void TaskQueue::CancelNotExecutedTaskLocked() 138{ 139 MEDIA_LOGD("All task not executed are being cancelled..........[%{public}s]", name_.c_str()); 140 while (!taskList_.empty()) { 141 std::shared_ptr<ITaskHandler> task = taskList_.front().task_; 142 taskList_.pop_front(); 143 if (task != nullptr) { 144 task->Cancel(); 145 } 146 } 147} 148 149__attribute__((no_sanitize("cfi"))) void TaskQueue::TaskProcessor() 150{ 151 constexpr uint32_t nameSizeMax = 15; 152 tid_ = gettid(); 153 MEDIA_LOGI("Enter TaskProcessor [%{public}s], tid_: (%{public}d)", name_.c_str(), tid_); 154 pthread_setname_np(pthread_self(), name_.substr(0, nameSizeMax).c_str()); 155 (void)mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE); 156 while (true) { 157 std::unique_lock<std::mutex> lock(mutex_); 158 cond_.wait(lock, [this] { return isExit_ || !taskList_.empty(); }); 159 if (isExit_) { 160 MEDIA_LOGI("Exit TaskProcessor [%{public}s], tid_: (%{public}d)", name_.c_str(), tid_); 161 return; 162 } 163 TaskHandlerItem item = taskList_.front(); 164 uint64_t curTimeNs = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count()); 165 if (curTimeNs >= item.executeTimeNs_) { 166 taskList_.pop_front(); 167 } else { 168 uint64_t diff = item.executeTimeNs_ - curTimeNs; 169 (void)cond_.wait_for(lock, std::chrono::nanoseconds(diff)); 170 continue; 171 } 172 isTaskExecuting_ = true; 173 lock.unlock(); 174 175 if (item.task_ == nullptr || item.task_->IsCanceled()) { 176 MEDIA_LOGD("task is nullptr or task canceled. [%{public}s]", name_.c_str()); 177 lock.lock(); 178 isTaskExecuting_ = false; 179 lock.unlock(); 180 continue; 181 } 182 183 item.task_->Execute(); 184 lock.lock(); 185 isTaskExecuting_ = false; 186 lock.unlock(); 187 if (item.task_->GetAttribute().periodicTimeUs_ == UINT64_MAX) { 188 continue; 189 } 190 int32_t res = EnqueueTask(item.task_, false, item.task_->GetAttribute().periodicTimeUs_); 191 if (res != MSERR_OK) { 192 MEDIA_LOGW("enqueue periodic task failed:%d, why? [%{public}s]", res, name_.c_str()); 193 } 194 } 195 (void)mallopt(M_FLUSH_THREAD_CACHE, 0); 196 MEDIA_LOGI("Leave TaskProcessor [%{public}s]", name_.c_str()); 197} 198 199bool TaskQueue::IsTaskExecuting() 200{ 201 std::unique_lock<std::mutex> lock(mutex_); 202 return isTaskExecuting_; 203} 204} // namespace Media 205} // namespace OHOS 206