1c29fa5a6Sopenharmony_ci/* 2c29fa5a6Sopenharmony_ci * Copyright (c) 2023 Huawei Device Co., Ltd. 3c29fa5a6Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 4c29fa5a6Sopenharmony_ci * you may not use this file except in compliance with the License. 5c29fa5a6Sopenharmony_ci * You may obtain a copy of the License at 6c29fa5a6Sopenharmony_ci * 7c29fa5a6Sopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 8c29fa5a6Sopenharmony_ci * 9c29fa5a6Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software 10c29fa5a6Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 11c29fa5a6Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12c29fa5a6Sopenharmony_ci * See the License for the specific language governing permissions and 13c29fa5a6Sopenharmony_ci * limitations under the License. 14c29fa5a6Sopenharmony_ci */ 15c29fa5a6Sopenharmony_ci 16c29fa5a6Sopenharmony_ci#include "task_scheduler.h" 17c29fa5a6Sopenharmony_ci 18c29fa5a6Sopenharmony_ci#include <fcntl.h> 19c29fa5a6Sopenharmony_ci#include <sys/syscall.h> 20c29fa5a6Sopenharmony_ci#include <unistd.h> 21c29fa5a6Sopenharmony_ci 22c29fa5a6Sopenharmony_ci#include "devicestatus_define.h" 23c29fa5a6Sopenharmony_ci 24c29fa5a6Sopenharmony_ci#undef LOG_TAG 25c29fa5a6Sopenharmony_ci#define LOG_TAG "TaskScheduler" 26c29fa5a6Sopenharmony_ci 27c29fa5a6Sopenharmony_cinamespace OHOS { 28c29fa5a6Sopenharmony_cinamespace Msdp { 29c29fa5a6Sopenharmony_cinamespace DeviceStatus { 30c29fa5a6Sopenharmony_ci 31c29fa5a6Sopenharmony_civoid TaskScheduler::Task::ProcessTask() 32c29fa5a6Sopenharmony_ci{ 33c29fa5a6Sopenharmony_ci CALL_DEBUG_ENTER; 34c29fa5a6Sopenharmony_ci if (hasWaited_) { 35c29fa5a6Sopenharmony_ci FI_HILOGE("Expired tasks will be discarded, id:%{public}d", id_); 36c29fa5a6Sopenharmony_ci return; 37c29fa5a6Sopenharmony_ci } 38c29fa5a6Sopenharmony_ci int32_t ret = fun_(); 39c29fa5a6Sopenharmony_ci std::string taskType = ((promise_ == nullptr) ? "Async" : "Sync"); 40c29fa5a6Sopenharmony_ci FI_HILOGD("process:%{public}s, task id:%{public}d, ret:%{public}d", taskType.c_str(), id_, ret); 41c29fa5a6Sopenharmony_ci if (!hasWaited_ && promise_ != nullptr) { 42c29fa5a6Sopenharmony_ci promise_->set_value(ret); 43c29fa5a6Sopenharmony_ci } 44c29fa5a6Sopenharmony_ci} 45c29fa5a6Sopenharmony_ci 46c29fa5a6Sopenharmony_ciTaskScheduler::~TaskScheduler() 47c29fa5a6Sopenharmony_ci{ 48c29fa5a6Sopenharmony_ci if (fds_[0] >= 0) { 49c29fa5a6Sopenharmony_ci if (close(fds_[0]) < 0) { 50c29fa5a6Sopenharmony_ci FI_HILOGE("Close fds_[0] failed, err:%{public}s, fds_[0]:%{public}d", strerror(errno), fds_[0]); 51c29fa5a6Sopenharmony_ci } 52c29fa5a6Sopenharmony_ci fds_[0] = -1; 53c29fa5a6Sopenharmony_ci } 54c29fa5a6Sopenharmony_ci if (fds_[1] >= 0) { 55c29fa5a6Sopenharmony_ci if (close(fds_[1]) < 0) { 56c29fa5a6Sopenharmony_ci FI_HILOGE("Close fds_[1] failed, err:%{public}s, fds_[1]:%{public}d", strerror(errno), fds_[1]); 57c29fa5a6Sopenharmony_ci } 58c29fa5a6Sopenharmony_ci fds_[1] = -1; 59c29fa5a6Sopenharmony_ci } 60c29fa5a6Sopenharmony_ci} 61c29fa5a6Sopenharmony_ci 62c29fa5a6Sopenharmony_cibool TaskScheduler::Init() 63c29fa5a6Sopenharmony_ci{ 64c29fa5a6Sopenharmony_ci CALL_DEBUG_ENTER; 65c29fa5a6Sopenharmony_ci if (::pipe2(fds_, O_CLOEXEC | O_NONBLOCK) != 0) { 66c29fa5a6Sopenharmony_ci FI_HILOGE("pipe2 failed, errno:%{public}s", ::strerror(errno)); 67c29fa5a6Sopenharmony_ci return false; 68c29fa5a6Sopenharmony_ci } 69c29fa5a6Sopenharmony_ci return true; 70c29fa5a6Sopenharmony_ci} 71c29fa5a6Sopenharmony_ci 72c29fa5a6Sopenharmony_civoid TaskScheduler::ProcessTasks() 73c29fa5a6Sopenharmony_ci{ 74c29fa5a6Sopenharmony_ci CALL_DEBUG_ENTER; 75c29fa5a6Sopenharmony_ci std::vector<TaskPtr> tasks; 76c29fa5a6Sopenharmony_ci PopPendingTaskList(tasks); 77c29fa5a6Sopenharmony_ci for (const auto &it : tasks) { 78c29fa5a6Sopenharmony_ci it->ProcessTask(); 79c29fa5a6Sopenharmony_ci } 80c29fa5a6Sopenharmony_ci} 81c29fa5a6Sopenharmony_ci 82c29fa5a6Sopenharmony_ciint32_t TaskScheduler::PostSyncTask(DTaskCallback cb) 83c29fa5a6Sopenharmony_ci{ 84c29fa5a6Sopenharmony_ci CALL_DEBUG_ENTER; 85c29fa5a6Sopenharmony_ci CHKPR(cb, ERROR_NULL_POINTER); 86c29fa5a6Sopenharmony_ci if (IsCallFromWorkerThread()) { 87c29fa5a6Sopenharmony_ci return cb(); 88c29fa5a6Sopenharmony_ci } 89c29fa5a6Sopenharmony_ci Promise promise; 90c29fa5a6Sopenharmony_ci Future future = promise.get_future(); 91c29fa5a6Sopenharmony_ci auto task = PostTask(cb, &promise); 92c29fa5a6Sopenharmony_ci CHKPR(task, ETASKS_POST_SYNCTASK_FAIL); 93c29fa5a6Sopenharmony_ci 94c29fa5a6Sopenharmony_ci static constexpr int32_t timeout = 3000; 95c29fa5a6Sopenharmony_ci std::chrono::milliseconds span(timeout); 96c29fa5a6Sopenharmony_ci auto res = future.wait_for(span); 97c29fa5a6Sopenharmony_ci task->SetWaited(); 98c29fa5a6Sopenharmony_ci if (res == std::future_status::timeout) { 99c29fa5a6Sopenharmony_ci FI_HILOGE("Task timeout"); 100c29fa5a6Sopenharmony_ci return ETASKS_WAIT_TIMEOUT; 101c29fa5a6Sopenharmony_ci } else if (res == std::future_status::deferred) { 102c29fa5a6Sopenharmony_ci FI_HILOGE("Task deferred"); 103c29fa5a6Sopenharmony_ci return ETASKS_WAIT_DEFERRED; 104c29fa5a6Sopenharmony_ci } 105c29fa5a6Sopenharmony_ci return future.get(); 106c29fa5a6Sopenharmony_ci} 107c29fa5a6Sopenharmony_ci 108c29fa5a6Sopenharmony_ciint32_t TaskScheduler::PostAsyncTask(DTaskCallback callback) 109c29fa5a6Sopenharmony_ci{ 110c29fa5a6Sopenharmony_ci CHKPR(callback, ERROR_NULL_POINTER); 111c29fa5a6Sopenharmony_ci auto task = PostTask(callback); 112c29fa5a6Sopenharmony_ci CHKPR(task, ETASKS_POST_ASYNCTASK_FAIL); 113c29fa5a6Sopenharmony_ci return RET_OK; 114c29fa5a6Sopenharmony_ci} 115c29fa5a6Sopenharmony_ci 116c29fa5a6Sopenharmony_civoid TaskScheduler::PopPendingTaskList(std::vector<TaskPtr> &tasks) 117c29fa5a6Sopenharmony_ci{ 118c29fa5a6Sopenharmony_ci static constexpr int32_t onceProcessTaskLimit = 10; 119c29fa5a6Sopenharmony_ci std::lock_guard<std::mutex> guard(mux_); 120c29fa5a6Sopenharmony_ci for (int32_t i = 0; i < onceProcessTaskLimit; i++) { 121c29fa5a6Sopenharmony_ci if (tasks_.empty()) { 122c29fa5a6Sopenharmony_ci break; 123c29fa5a6Sopenharmony_ci } 124c29fa5a6Sopenharmony_ci auto firstTask = tasks_.front(); 125c29fa5a6Sopenharmony_ci CHKPB(firstTask); 126c29fa5a6Sopenharmony_ci RecoveryId(firstTask->GetId()); 127c29fa5a6Sopenharmony_ci tasks.push_back(firstTask->GetSharedPtr()); 128c29fa5a6Sopenharmony_ci tasks_.pop(); 129c29fa5a6Sopenharmony_ci } 130c29fa5a6Sopenharmony_ci} 131c29fa5a6Sopenharmony_ci 132c29fa5a6Sopenharmony_ciTaskScheduler::TaskPtr TaskScheduler::PostTask(DTaskCallback callback, Promise *promise) 133c29fa5a6Sopenharmony_ci{ 134c29fa5a6Sopenharmony_ci FI_HILOGD("tasks_ size:%{public}zu", tasks_.size()); 135c29fa5a6Sopenharmony_ci static constexpr int32_t maxTasksLimit = 1000; 136c29fa5a6Sopenharmony_ci std::lock_guard<std::mutex> guard(mux_); 137c29fa5a6Sopenharmony_ci size_t tsize = tasks_.size(); 138c29fa5a6Sopenharmony_ci if (tsize > maxTasksLimit) { 139c29fa5a6Sopenharmony_ci FI_HILOGE("The task queue is full, size:%{public}zu/%{public}d", tsize, maxTasksLimit); 140c29fa5a6Sopenharmony_ci return nullptr; 141c29fa5a6Sopenharmony_ci } 142c29fa5a6Sopenharmony_ci int32_t id = GenerateId(); 143c29fa5a6Sopenharmony_ci TaskData data = { GetThisThreadId(), id }; 144c29fa5a6Sopenharmony_ci ssize_t res = write(fds_[1], &data, sizeof(data)); 145c29fa5a6Sopenharmony_ci if (res == -1) { 146c29fa5a6Sopenharmony_ci RecoveryId(id); 147c29fa5a6Sopenharmony_ci FI_HILOGE("Pipeline writes failed, errno:%{public}d", errno); 148c29fa5a6Sopenharmony_ci return nullptr; 149c29fa5a6Sopenharmony_ci } 150c29fa5a6Sopenharmony_ci TaskPtr task = std::make_shared<Task>(id, callback, promise); 151c29fa5a6Sopenharmony_ci tasks_.push(task); 152c29fa5a6Sopenharmony_ci std::string taskType = ((promise == nullptr) ? "Async" : "Sync"); 153c29fa5a6Sopenharmony_ci FI_HILOGD("Post %{public}s", taskType.c_str()); 154c29fa5a6Sopenharmony_ci return task->GetSharedPtr(); 155c29fa5a6Sopenharmony_ci} 156c29fa5a6Sopenharmony_ci} // namespace DeviceStatus 157c29fa5a6Sopenharmony_ci} // namespace Msdp 158c29fa5a6Sopenharmony_ci} // namespace OHOS 159