1/*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include "delegate_tasks.h"
17
18#include <fcntl.h>
19#include <sys/syscall.h>
20#include <unistd.h>
21
22#include "error_multimodal.h"
23#include "util.h"
24
25#undef MMI_LOG_DOMAIN
26#define MMI_LOG_DOMAIN MMI_LOG_SERVER
27#undef MMI_LOG_TAG
28#define MMI_LOG_TAG "DelegateTasks"
29
30namespace OHOS {
31namespace MMI {
32void DelegateTasks::Task::ProcessTask()
33{
34    CALL_DEBUG_ENTER;
35    if (hasWaited_) {
36        MMI_HILOGE("Expired tasks will be discarded. id:%{public}d", id_);
37        return;
38    }
39    int32_t ret = fun_();
40    std::string taskType = ((promise_ == nullptr) ? "Async" : "Sync");
41    MMI_HILOGD("Process taskType:%{public}s, taskId:%{public}d, ret:%{public}d", taskType.c_str(), id_, ret);
42    if (!hasWaited_ && promise_ != nullptr) {
43        promise_->set_value(ret);
44    }
45}
46
47DelegateTasks::~DelegateTasks()
48{
49    if (fds_[0] >= 0) {
50        close(fds_[0]);
51        fds_[0] = -1;
52    }
53    if (fds_[1] >= 0) {
54        close(fds_[1]);
55        fds_[1] = -1;
56    }
57}
58
59bool DelegateTasks::Init()
60{
61    CALL_DEBUG_ENTER;
62    if (pipe(fds_) == -1) {
63        MMI_HILOGE("The pipe create failed, errno:%{public}d", errno);
64        return false;
65    }
66    if (fcntl(fds_[0], F_SETFL, O_NONBLOCK) == -1) {
67        MMI_HILOGE("The fcntl read failed, errno:%{public}d", errno);
68        close(fds_[0]);
69        return false;
70    }
71    if (fcntl(fds_[1], F_SETFL, O_NONBLOCK) == -1) {
72        MMI_HILOGE("The fcntl write failed, errno:%{public}d", errno);
73        close(fds_[1]);
74        return false;
75    }
76    return true;
77}
78
79void DelegateTasks::ProcessTasks()
80{
81    CALL_DEBUG_ENTER;
82    std::vector<TaskPtr> tasks;
83    PopPendingTaskList(tasks);
84    for (const auto &it : tasks) {
85        it->ProcessTask();
86    }
87}
88
89int32_t DelegateTasks::PostSyncTask(DTaskCallback callback)
90{
91    CALL_DEBUG_ENTER;
92    CHKPR(callback, ERROR_NULL_POINTER);
93    if (IsCallFromWorkerThread()) {
94        return callback();
95    }
96    Promise promise;
97    Future future = promise.get_future();
98    auto task = PostTask(callback, &promise);
99    CHKPR(task, ETASKS_POST_SYNCTASK_FAIL);
100
101    static constexpr int32_t timeout = 3000;
102    std::chrono::milliseconds span(timeout);
103    auto res = future.wait_for(span);
104    task->SetWaited();
105    if (res == std::future_status::timeout) {
106        MMI_HILOGE("Task timeout");
107        return ETASKS_WAIT_TIMEOUT;
108    } else if (res == std::future_status::deferred) {
109        MMI_HILOGE("Task deferred");
110        return ETASKS_WAIT_DEFERRED;
111    }
112    return future.get();
113}
114
115int32_t DelegateTasks::PostAsyncTask(DTaskCallback callback)
116{
117    CHKPR(callback, ERROR_NULL_POINTER);
118    if (IsCallFromWorkerThread()) {
119        return callback();
120    }
121    CHKPR(PostTask(callback), ETASKS_POST_ASYNCTASK_FAIL);
122    return RET_OK;
123}
124
125void DelegateTasks::PopPendingTaskList(std::vector<TaskPtr> &tasks)
126{
127    std::lock_guard<std::mutex> guard(mux_);
128    static constexpr int32_t onceProcessTaskLimit = 10;
129    for (int32_t count = 0; count < onceProcessTaskLimit; count++) {
130        if (tasks_.empty()) {
131            break;
132        }
133        auto task = tasks_.front();
134        CHKPB(task);
135        RecoveryId(task->GetId());
136        tasks.push_back(task->GetSharedPtr());
137        tasks_.pop();
138    }
139}
140
141DelegateTasks::TaskPtr DelegateTasks::PostTask(DTaskCallback callback, Promise *promise)
142{
143    if (IsCallFromWorkerThread()) {
144        MMI_HILOGE("This interface cannot be called from a worker thread");
145        return nullptr;
146    }
147    std::lock_guard<std::mutex> guard(mux_);
148    MMI_HILOGD("tasks_ size:%{public}d", static_cast<int32_t>(tasks_.size()));
149    static constexpr int32_t maxTasksLimit = 1000;
150    auto tsize = tasks_.size();
151    if (tsize > maxTasksLimit) {
152        MMI_HILOGE("The task queue is full. size:%{public}zu, maxTasksLimit:%{public}d", tsize, maxTasksLimit);
153        return nullptr;
154    }
155    int32_t id = GenerateId();
156    TaskData data = { GetThisThreadId(), id };
157    auto res = write(fds_[1], &data, sizeof(data));
158    if (res == -1) {
159        RecoveryId(id);
160        MMI_HILOGE("Pipe write failed, errno:%{public}d", errno);
161        return nullptr;
162    }
163    TaskPtr task = std::make_shared<Task>(id, callback, promise);
164    tasks_.push(task);
165    std::string taskType = ((promise == nullptr) ? "Async" : "Sync");
166    MMI_HILOGD("Post taskType:%{public}s", taskType.c_str());
167    return task->GetSharedPtr();
168}
169} // namespace MMI
170} // namespace OHOS