1f857971dSopenharmony_ci/* 2f857971dSopenharmony_ci * Copyright (c) 2022-2023 Huawei Device Co., Ltd. 3f857971dSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 4f857971dSopenharmony_ci * you may not use this file except in compliance with the License. 5f857971dSopenharmony_ci * You may obtain a copy of the License at 6f857971dSopenharmony_ci * 7f857971dSopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 8f857971dSopenharmony_ci * 9f857971dSopenharmony_ci * Unless required by applicable law or agreed to in writing, software 10f857971dSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 11f857971dSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12f857971dSopenharmony_ci * See the License for the specific language governing permissions and 13f857971dSopenharmony_ci * limitations under the License. 14f857971dSopenharmony_ci */ 15f857971dSopenharmony_ci 16f857971dSopenharmony_ci#include "delegate_tasks.h" 17f857971dSopenharmony_ci 18f857971dSopenharmony_ci#include <sys/syscall.h> 19f857971dSopenharmony_ci#include <fcntl.h> 20f857971dSopenharmony_ci#include <unistd.h> 21f857971dSopenharmony_ci 22f857971dSopenharmony_ci#include "devicestatus_define.h" 23f857971dSopenharmony_ci 24f857971dSopenharmony_ci#undef LOG_TAG 25f857971dSopenharmony_ci#define LOG_TAG "DelegateTasks" 26f857971dSopenharmony_ci 27f857971dSopenharmony_cinamespace OHOS { 28f857971dSopenharmony_cinamespace Msdp { 29f857971dSopenharmony_cinamespace DeviceStatus { 30f857971dSopenharmony_ci 31f857971dSopenharmony_civoid DelegateTasks::Task::ProcessTask() 32f857971dSopenharmony_ci{ 33f857971dSopenharmony_ci if (hasWaited_) { 34f857971dSopenharmony_ci FI_HILOGE("Expired task will be discarded, id:%{public}d", id_); 35f857971dSopenharmony_ci return; 36f857971dSopenharmony_ci } 37f857971dSopenharmony_ci int32_t ret = fun_(); 38f857971dSopenharmony_ci std::string taskType = ((promise_ == nullptr) ? "Async" : "Sync"); 39f857971dSopenharmony_ci FI_HILOGD("process:%{public}s, task id:%{public}d, ret:%{public}d", taskType.c_str(), id_, ret); 40f857971dSopenharmony_ci if (!hasWaited_ && promise_ != nullptr) { 41f857971dSopenharmony_ci promise_->set_value(ret); 42f857971dSopenharmony_ci } 43f857971dSopenharmony_ci} 44f857971dSopenharmony_ci 45f857971dSopenharmony_ciDelegateTasks::~DelegateTasks() 46f857971dSopenharmony_ci{ 47f857971dSopenharmony_ci if (fds_[0] >= 0) { 48f857971dSopenharmony_ci if (close(fds_[0]) < 0) { 49f857971dSopenharmony_ci FI_HILOGE("Close fds_[0] failed, error:%{public}s, fds_[0]:%{public}d", strerror(errno), fds_[0]); 50f857971dSopenharmony_ci } 51f857971dSopenharmony_ci fds_[0] = -1; 52f857971dSopenharmony_ci } 53f857971dSopenharmony_ci if (fds_[1] >= 0) { 54f857971dSopenharmony_ci if (close(fds_[1]) < 0) { 55f857971dSopenharmony_ci FI_HILOGE("Close fds_[1] failed, error:%{public}s, fds_[1]:%{public}d", strerror(errno), fds_[1]); 56f857971dSopenharmony_ci } 57f857971dSopenharmony_ci fds_[1] = -1; 58f857971dSopenharmony_ci } 59f857971dSopenharmony_ci} 60f857971dSopenharmony_ci 61f857971dSopenharmony_cibool DelegateTasks::Init() 62f857971dSopenharmony_ci{ 63f857971dSopenharmony_ci CALL_DEBUG_ENTER; 64f857971dSopenharmony_ci if (::pipe2(fds_, O_CLOEXEC | O_NONBLOCK) != 0) { 65f857971dSopenharmony_ci FI_HILOGE("pipe2 failed, errno:%{public}s", ::strerror(errno)); 66f857971dSopenharmony_ci return false; 67f857971dSopenharmony_ci } 68f857971dSopenharmony_ci return true; 69f857971dSopenharmony_ci} 70f857971dSopenharmony_ci 71f857971dSopenharmony_civoid DelegateTasks::ProcessTasks() 72f857971dSopenharmony_ci{ 73f857971dSopenharmony_ci CALL_DEBUG_ENTER; 74f857971dSopenharmony_ci std::vector<TaskPtr> tasks; 75f857971dSopenharmony_ci PopPendingTaskList(tasks); 76f857971dSopenharmony_ci for (const auto &it : tasks) { 77f857971dSopenharmony_ci it->ProcessTask(); 78f857971dSopenharmony_ci } 79f857971dSopenharmony_ci} 80f857971dSopenharmony_ci 81f857971dSopenharmony_ciint32_t DelegateTasks::PostSyncTask(DTaskCallback callback) 82f857971dSopenharmony_ci{ 83f857971dSopenharmony_ci CALL_DEBUG_ENTER; 84f857971dSopenharmony_ci CHKPR(callback, ERROR_NULL_POINTER); 85f857971dSopenharmony_ci if (IsCallFromWorkerThread()) { 86f857971dSopenharmony_ci return callback(); 87f857971dSopenharmony_ci } 88f857971dSopenharmony_ci Promise promise; 89f857971dSopenharmony_ci Future future = promise.get_future(); 90f857971dSopenharmony_ci auto task = PostTask(callback, &promise); 91f857971dSopenharmony_ci CHKPR(task, ETASKS_POST_SYNCTASK_FAIL); 92f857971dSopenharmony_ci 93f857971dSopenharmony_ci static constexpr int32_t timeout = 3000; 94f857971dSopenharmony_ci std::chrono::milliseconds span(timeout); 95f857971dSopenharmony_ci auto res = future.wait_for(span); 96f857971dSopenharmony_ci task->SetWaited(); 97f857971dSopenharmony_ci if (res == std::future_status::timeout) { 98f857971dSopenharmony_ci FI_HILOGE("Task timeout"); 99f857971dSopenharmony_ci return ETASKS_WAIT_TIMEOUT; 100f857971dSopenharmony_ci } else if (res == std::future_status::deferred) { 101f857971dSopenharmony_ci FI_HILOGE("Task deferred"); 102f857971dSopenharmony_ci return ETASKS_WAIT_DEFERRED; 103f857971dSopenharmony_ci } 104f857971dSopenharmony_ci return future.get(); 105f857971dSopenharmony_ci} 106f857971dSopenharmony_ci 107f857971dSopenharmony_ciint32_t DelegateTasks::PostAsyncTask(DTaskCallback callback) 108f857971dSopenharmony_ci{ 109f857971dSopenharmony_ci CHKPR(callback, ERROR_NULL_POINTER); 110f857971dSopenharmony_ci auto task = PostTask(callback); 111f857971dSopenharmony_ci CHKPR(task, ETASKS_POST_ASYNCTASK_FAIL); 112f857971dSopenharmony_ci return RET_OK; 113f857971dSopenharmony_ci} 114f857971dSopenharmony_ci 115f857971dSopenharmony_civoid DelegateTasks::PopPendingTaskList(std::vector<TaskPtr> &tasks) 116f857971dSopenharmony_ci{ 117f857971dSopenharmony_ci std::lock_guard<std::mutex> guard(mux_); 118f857971dSopenharmony_ci static constexpr int32_t onceProcessTaskLimit = 10; 119f857971dSopenharmony_ci for (int32_t i = 0; i < onceProcessTaskLimit; i++) { 120f857971dSopenharmony_ci if (tasks_.empty()) { 121f857971dSopenharmony_ci break; 122f857971dSopenharmony_ci } 123f857971dSopenharmony_ci auto taskDuty = tasks_.front(); 124f857971dSopenharmony_ci CHKPB(taskDuty); 125f857971dSopenharmony_ci RecoveryId(taskDuty->GetId()); 126f857971dSopenharmony_ci tasks.push_back(taskDuty->GetSharedPtr()); 127f857971dSopenharmony_ci tasks_.pop(); 128f857971dSopenharmony_ci } 129f857971dSopenharmony_ci} 130f857971dSopenharmony_ci 131f857971dSopenharmony_ciDelegateTasks::TaskPtr DelegateTasks::PostTask(DTaskCallback callback, Promise *promise) 132f857971dSopenharmony_ci{ 133f857971dSopenharmony_ci std::lock_guard<std::mutex> guard(mux_); 134f857971dSopenharmony_ci FI_HILOGD("tasks_ size:%{public}zu", tasks_.size()); 135f857971dSopenharmony_ci static constexpr int32_t maxTasksLimit = 1000; 136f857971dSopenharmony_ci size_t tsize = tasks_.size(); 137f857971dSopenharmony_ci if (tsize > maxTasksLimit) { 138f857971dSopenharmony_ci FI_HILOGE("The task queue is full, size:%{public}zu/%{public}d", tsize, maxTasksLimit); 139f857971dSopenharmony_ci return nullptr; 140f857971dSopenharmony_ci } 141f857971dSopenharmony_ci int32_t id = GenerateId(); 142f857971dSopenharmony_ci TaskData data = {GetThisThreadId(), id}; 143f857971dSopenharmony_ci ssize_t res = write(fds_[1], &data, sizeof(data)); 144f857971dSopenharmony_ci if (res == -1) { 145f857971dSopenharmony_ci RecoveryId(id); 146f857971dSopenharmony_ci FI_HILOGE("Write to pipe failed, errno:%{public}d", errno); 147f857971dSopenharmony_ci return nullptr; 148f857971dSopenharmony_ci } 149f857971dSopenharmony_ci TaskPtr task = std::make_shared<Task>(id, callback, promise); 150f857971dSopenharmony_ci tasks_.push(task); 151f857971dSopenharmony_ci std::string taskType = ((promise == nullptr) ? "Async" : "Sync"); 152f857971dSopenharmony_ci FI_HILOGD("TaskType post %{public}s", taskType.c_str()); 153f857971dSopenharmony_ci return task->GetSharedPtr(); 154f857971dSopenharmony_ci} 155f857971dSopenharmony_ci} // namespace DeviceStatus 156f857971dSopenharmony_ci} // namespace Msdp 157f857971dSopenharmony_ci} // namespace OHOS 158