1/* 2 * Copyright (c) 2022-2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16#include "delegate_tasks.h" 17 18#include <sys/syscall.h> 19#include <fcntl.h> 20#include <unistd.h> 21 22#include "devicestatus_define.h" 23 24#undef LOG_TAG 25#define LOG_TAG "DelegateTasks" 26 27namespace OHOS { 28namespace Msdp { 29namespace DeviceStatus { 30 31void DelegateTasks::Task::ProcessTask() 32{ 33 if (hasWaited_) { 34 FI_HILOGE("Expired task will be discarded, id:%{public}d", id_); 35 return; 36 } 37 int32_t ret = fun_(); 38 std::string taskType = ((promise_ == nullptr) ? "Async" : "Sync"); 39 FI_HILOGD("process:%{public}s, task id:%{public}d, ret:%{public}d", taskType.c_str(), id_, ret); 40 if (!hasWaited_ && promise_ != nullptr) { 41 promise_->set_value(ret); 42 } 43} 44 45DelegateTasks::~DelegateTasks() 46{ 47 if (fds_[0] >= 0) { 48 if (close(fds_[0]) < 0) { 49 FI_HILOGE("Close fds_[0] failed, error:%{public}s, fds_[0]:%{public}d", strerror(errno), fds_[0]); 50 } 51 fds_[0] = -1; 52 } 53 if (fds_[1] >= 0) { 54 if (close(fds_[1]) < 0) { 55 FI_HILOGE("Close fds_[1] failed, error:%{public}s, fds_[1]:%{public}d", strerror(errno), fds_[1]); 56 } 57 fds_[1] = -1; 58 } 59} 60 61bool DelegateTasks::Init() 62{ 63 CALL_DEBUG_ENTER; 64 if (::pipe2(fds_, O_CLOEXEC | O_NONBLOCK) != 0) { 65 FI_HILOGE("pipe2 failed, errno:%{public}s", ::strerror(errno)); 66 return false; 67 } 68 return true; 69} 70 71void DelegateTasks::ProcessTasks() 72{ 73 CALL_DEBUG_ENTER; 74 std::vector<TaskPtr> tasks; 75 PopPendingTaskList(tasks); 76 for (const auto &it : tasks) { 77 it->ProcessTask(); 78 } 79} 80 81int32_t DelegateTasks::PostSyncTask(DTaskCallback callback) 82{ 83 CALL_DEBUG_ENTER; 84 CHKPR(callback, ERROR_NULL_POINTER); 85 if (IsCallFromWorkerThread()) { 86 return callback(); 87 } 88 Promise promise; 89 Future future = promise.get_future(); 90 auto task = PostTask(callback, &promise); 91 CHKPR(task, ETASKS_POST_SYNCTASK_FAIL); 92 93 static constexpr int32_t timeout = 3000; 94 std::chrono::milliseconds span(timeout); 95 auto res = future.wait_for(span); 96 task->SetWaited(); 97 if (res == std::future_status::timeout) { 98 FI_HILOGE("Task timeout"); 99 return ETASKS_WAIT_TIMEOUT; 100 } else if (res == std::future_status::deferred) { 101 FI_HILOGE("Task deferred"); 102 return ETASKS_WAIT_DEFERRED; 103 } 104 return future.get(); 105} 106 107int32_t DelegateTasks::PostAsyncTask(DTaskCallback callback) 108{ 109 CHKPR(callback, ERROR_NULL_POINTER); 110 auto task = PostTask(callback); 111 CHKPR(task, ETASKS_POST_ASYNCTASK_FAIL); 112 return RET_OK; 113} 114 115void DelegateTasks::PopPendingTaskList(std::vector<TaskPtr> &tasks) 116{ 117 std::lock_guard<std::mutex> guard(mux_); 118 static constexpr int32_t onceProcessTaskLimit = 10; 119 for (int32_t i = 0; i < onceProcessTaskLimit; i++) { 120 if (tasks_.empty()) { 121 break; 122 } 123 auto taskDuty = tasks_.front(); 124 CHKPB(taskDuty); 125 RecoveryId(taskDuty->GetId()); 126 tasks.push_back(taskDuty->GetSharedPtr()); 127 tasks_.pop(); 128 } 129} 130 131DelegateTasks::TaskPtr DelegateTasks::PostTask(DTaskCallback callback, Promise *promise) 132{ 133 std::lock_guard<std::mutex> guard(mux_); 134 FI_HILOGD("tasks_ size:%{public}zu", tasks_.size()); 135 static constexpr int32_t maxTasksLimit = 1000; 136 size_t tsize = tasks_.size(); 137 if (tsize > maxTasksLimit) { 138 FI_HILOGE("The task queue is full, size:%{public}zu/%{public}d", tsize, maxTasksLimit); 139 return nullptr; 140 } 141 int32_t id = GenerateId(); 142 TaskData data = {GetThisThreadId(), id}; 143 ssize_t res = write(fds_[1], &data, sizeof(data)); 144 if (res == -1) { 145 RecoveryId(id); 146 FI_HILOGE("Write to pipe failed, errno:%{public}d", errno); 147 return nullptr; 148 } 149 TaskPtr task = std::make_shared<Task>(id, callback, promise); 150 tasks_.push(task); 151 std::string taskType = ((promise == nullptr) ? "Async" : "Sync"); 152 FI_HILOGD("TaskType post %{public}s", taskType.c_str()); 153 return task->GetSharedPtr(); 154} 155} // namespace DeviceStatus 156} // namespace Msdp 157} // namespace OHOS 158