1/* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16#include "delegate_tasks.h" 17 18#include <fcntl.h> 19#include <sys/syscall.h> 20#include <unistd.h> 21 22#include "error_multimodal.h" 23#include "util.h" 24 25#undef MMI_LOG_DOMAIN 26#define MMI_LOG_DOMAIN MMI_LOG_SERVER 27#undef MMI_LOG_TAG 28#define MMI_LOG_TAG "DelegateTasks" 29 30namespace OHOS { 31namespace MMI { 32void DelegateTasks::Task::ProcessTask() 33{ 34 CALL_DEBUG_ENTER; 35 if (hasWaited_) { 36 MMI_HILOGE("Expired tasks will be discarded. id:%{public}d", id_); 37 return; 38 } 39 int32_t ret = fun_(); 40 std::string taskType = ((promise_ == nullptr) ? "Async" : "Sync"); 41 MMI_HILOGD("Process taskType:%{public}s, taskId:%{public}d, ret:%{public}d", taskType.c_str(), id_, ret); 42 if (!hasWaited_ && promise_ != nullptr) { 43 promise_->set_value(ret); 44 } 45} 46 47DelegateTasks::~DelegateTasks() 48{ 49 if (fds_[0] >= 0) { 50 close(fds_[0]); 51 fds_[0] = -1; 52 } 53 if (fds_[1] >= 0) { 54 close(fds_[1]); 55 fds_[1] = -1; 56 } 57} 58 59bool DelegateTasks::Init() 60{ 61 CALL_DEBUG_ENTER; 62 if (pipe(fds_) == -1) { 63 MMI_HILOGE("The pipe create failed, errno:%{public}d", errno); 64 return false; 65 } 66 if (fcntl(fds_[0], F_SETFL, O_NONBLOCK) == -1) { 67 MMI_HILOGE("The fcntl read failed, errno:%{public}d", errno); 68 close(fds_[0]); 69 return false; 70 } 71 if (fcntl(fds_[1], F_SETFL, O_NONBLOCK) == -1) { 72 MMI_HILOGE("The fcntl write failed, errno:%{public}d", errno); 73 close(fds_[1]); 74 return false; 75 } 76 return true; 77} 78 79void DelegateTasks::ProcessTasks() 80{ 81 CALL_DEBUG_ENTER; 82 std::vector<TaskPtr> tasks; 83 PopPendingTaskList(tasks); 84 for (const auto &it : tasks) { 85 it->ProcessTask(); 86 } 87} 88 89int32_t DelegateTasks::PostSyncTask(DTaskCallback callback) 90{ 91 CALL_DEBUG_ENTER; 92 CHKPR(callback, ERROR_NULL_POINTER); 93 if (IsCallFromWorkerThread()) { 94 return callback(); 95 } 96 Promise promise; 97 Future future = promise.get_future(); 98 auto task = PostTask(callback, &promise); 99 CHKPR(task, ETASKS_POST_SYNCTASK_FAIL); 100 101 static constexpr int32_t timeout = 3000; 102 std::chrono::milliseconds span(timeout); 103 auto res = future.wait_for(span); 104 task->SetWaited(); 105 if (res == std::future_status::timeout) { 106 MMI_HILOGE("Task timeout"); 107 return ETASKS_WAIT_TIMEOUT; 108 } else if (res == std::future_status::deferred) { 109 MMI_HILOGE("Task deferred"); 110 return ETASKS_WAIT_DEFERRED; 111 } 112 return future.get(); 113} 114 115int32_t DelegateTasks::PostAsyncTask(DTaskCallback callback) 116{ 117 CHKPR(callback, ERROR_NULL_POINTER); 118 if (IsCallFromWorkerThread()) { 119 return callback(); 120 } 121 CHKPR(PostTask(callback), ETASKS_POST_ASYNCTASK_FAIL); 122 return RET_OK; 123} 124 125void DelegateTasks::PopPendingTaskList(std::vector<TaskPtr> &tasks) 126{ 127 std::lock_guard<std::mutex> guard(mux_); 128 static constexpr int32_t onceProcessTaskLimit = 10; 129 for (int32_t count = 0; count < onceProcessTaskLimit; count++) { 130 if (tasks_.empty()) { 131 break; 132 } 133 auto task = tasks_.front(); 134 CHKPB(task); 135 RecoveryId(task->GetId()); 136 tasks.push_back(task->GetSharedPtr()); 137 tasks_.pop(); 138 } 139} 140 141DelegateTasks::TaskPtr DelegateTasks::PostTask(DTaskCallback callback, Promise *promise) 142{ 143 if (IsCallFromWorkerThread()) { 144 MMI_HILOGE("This interface cannot be called from a worker thread"); 145 return nullptr; 146 } 147 std::lock_guard<std::mutex> guard(mux_); 148 MMI_HILOGD("tasks_ size:%{public}d", static_cast<int32_t>(tasks_.size())); 149 static constexpr int32_t maxTasksLimit = 1000; 150 auto tsize = tasks_.size(); 151 if (tsize > maxTasksLimit) { 152 MMI_HILOGE("The task queue is full. size:%{public}zu, maxTasksLimit:%{public}d", tsize, maxTasksLimit); 153 return nullptr; 154 } 155 int32_t id = GenerateId(); 156 TaskData data = { GetThisThreadId(), id }; 157 auto res = write(fds_[1], &data, sizeof(data)); 158 if (res == -1) { 159 RecoveryId(id); 160 MMI_HILOGE("Pipe write failed, errno:%{public}d", errno); 161 return nullptr; 162 } 163 TaskPtr task = std::make_shared<Task>(id, callback, promise); 164 tasks_.push(task); 165 std::string taskType = ((promise == nullptr) ? "Async" : "Sync"); 166 MMI_HILOGD("Post taskType:%{public}s", taskType.c_str()); 167 return task->GetSharedPtr(); 168} 169} // namespace MMI 170} // namespace OHOS