1/*
2 * Copyright (c) 2022-2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include "delegate_tasks.h"
17
18#include <sys/syscall.h>
19#include <fcntl.h>
20#include <unistd.h>
21
22#include "devicestatus_define.h"
23
24#undef LOG_TAG
25#define LOG_TAG "DelegateTasks"
26
27namespace OHOS {
28namespace Msdp {
29namespace DeviceStatus {
30
31void DelegateTasks::Task::ProcessTask()
32{
33    if (hasWaited_) {
34        FI_HILOGE("Expired task will be discarded, id:%{public}d", id_);
35        return;
36    }
37    int32_t ret = fun_();
38    std::string taskType = ((promise_ == nullptr) ? "Async" : "Sync");
39    FI_HILOGD("process:%{public}s, task id:%{public}d, ret:%{public}d", taskType.c_str(), id_, ret);
40    if (!hasWaited_ && promise_ != nullptr) {
41        promise_->set_value(ret);
42    }
43}
44
45DelegateTasks::~DelegateTasks()
46{
47    if (fds_[0] >= 0) {
48        if (close(fds_[0]) < 0) {
49            FI_HILOGE("Close fds_[0] failed, error:%{public}s, fds_[0]:%{public}d", strerror(errno), fds_[0]);
50        }
51        fds_[0] = -1;
52    }
53    if (fds_[1] >= 0) {
54        if (close(fds_[1]) < 0) {
55            FI_HILOGE("Close fds_[1] failed, error:%{public}s, fds_[1]:%{public}d", strerror(errno), fds_[1]);
56        }
57        fds_[1] = -1;
58    }
59}
60
61bool DelegateTasks::Init()
62{
63    CALL_DEBUG_ENTER;
64    if (::pipe2(fds_, O_CLOEXEC | O_NONBLOCK) != 0) {
65        FI_HILOGE("pipe2 failed, errno:%{public}s", ::strerror(errno));
66        return false;
67    }
68    return true;
69}
70
71void DelegateTasks::ProcessTasks()
72{
73    CALL_DEBUG_ENTER;
74    std::vector<TaskPtr> tasks;
75    PopPendingTaskList(tasks);
76    for (const auto &it : tasks) {
77        it->ProcessTask();
78    }
79}
80
81int32_t DelegateTasks::PostSyncTask(DTaskCallback callback)
82{
83    CALL_DEBUG_ENTER;
84    CHKPR(callback, ERROR_NULL_POINTER);
85    if (IsCallFromWorkerThread()) {
86        return callback();
87    }
88    Promise promise;
89    Future future = promise.get_future();
90    auto task = PostTask(callback, &promise);
91    CHKPR(task, ETASKS_POST_SYNCTASK_FAIL);
92
93    static constexpr int32_t timeout = 3000;
94    std::chrono::milliseconds span(timeout);
95    auto res = future.wait_for(span);
96    task->SetWaited();
97    if (res == std::future_status::timeout) {
98        FI_HILOGE("Task timeout");
99        return ETASKS_WAIT_TIMEOUT;
100    } else if (res == std::future_status::deferred) {
101        FI_HILOGE("Task deferred");
102        return ETASKS_WAIT_DEFERRED;
103    }
104    return future.get();
105}
106
107int32_t DelegateTasks::PostAsyncTask(DTaskCallback callback)
108{
109    CHKPR(callback, ERROR_NULL_POINTER);
110    auto task = PostTask(callback);
111    CHKPR(task, ETASKS_POST_ASYNCTASK_FAIL);
112    return RET_OK;
113}
114
115void DelegateTasks::PopPendingTaskList(std::vector<TaskPtr> &tasks)
116{
117    std::lock_guard<std::mutex> guard(mux_);
118    static constexpr int32_t onceProcessTaskLimit = 10;
119    for (int32_t i = 0; i < onceProcessTaskLimit; i++) {
120        if (tasks_.empty()) {
121            break;
122        }
123        auto taskDuty = tasks_.front();
124        CHKPB(taskDuty);
125        RecoveryId(taskDuty->GetId());
126        tasks.push_back(taskDuty->GetSharedPtr());
127        tasks_.pop();
128    }
129}
130
131DelegateTasks::TaskPtr DelegateTasks::PostTask(DTaskCallback callback, Promise *promise)
132{
133    std::lock_guard<std::mutex> guard(mux_);
134    FI_HILOGD("tasks_ size:%{public}zu", tasks_.size());
135    static constexpr int32_t maxTasksLimit = 1000;
136    size_t tsize = tasks_.size();
137    if (tsize > maxTasksLimit) {
138        FI_HILOGE("The task queue is full, size:%{public}zu/%{public}d", tsize, maxTasksLimit);
139        return nullptr;
140    }
141    int32_t id = GenerateId();
142    TaskData data = {GetThisThreadId(), id};
143    ssize_t res = write(fds_[1], &data, sizeof(data));
144    if (res == -1) {
145        RecoveryId(id);
146        FI_HILOGE("Write to pipe failed, errno:%{public}d", errno);
147        return nullptr;
148    }
149    TaskPtr task = std::make_shared<Task>(id, callback, promise);
150    tasks_.push(task);
151    std::string taskType = ((promise == nullptr) ? "Async" : "Sync");
152    FI_HILOGD("TaskType post %{public}s", taskType.c_str());
153    return task->GetSharedPtr();
154}
155} // namespace DeviceStatus
156} // namespace Msdp
157} // namespace OHOS
158