1f857971dSopenharmony_ci/*
2f857971dSopenharmony_ci * Copyright (c) 2022-2023 Huawei Device Co., Ltd.
3f857971dSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
4f857971dSopenharmony_ci * you may not use this file except in compliance with the License.
5f857971dSopenharmony_ci * You may obtain a copy of the License at
6f857971dSopenharmony_ci *
7f857971dSopenharmony_ci *     http://www.apache.org/licenses/LICENSE-2.0
8f857971dSopenharmony_ci *
9f857971dSopenharmony_ci * Unless required by applicable law or agreed to in writing, software
10f857971dSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
11f857971dSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12f857971dSopenharmony_ci * See the License for the specific language governing permissions and
13f857971dSopenharmony_ci * limitations under the License.
14f857971dSopenharmony_ci */
15f857971dSopenharmony_ci
16f857971dSopenharmony_ci#include "delegate_tasks.h"
17f857971dSopenharmony_ci
18f857971dSopenharmony_ci#include <sys/syscall.h>
19f857971dSopenharmony_ci#include <fcntl.h>
20f857971dSopenharmony_ci#include <unistd.h>
21f857971dSopenharmony_ci
22f857971dSopenharmony_ci#include "devicestatus_define.h"
23f857971dSopenharmony_ci
24f857971dSopenharmony_ci#undef LOG_TAG
25f857971dSopenharmony_ci#define LOG_TAG "DelegateTasks"
26f857971dSopenharmony_ci
27f857971dSopenharmony_cinamespace OHOS {
28f857971dSopenharmony_cinamespace Msdp {
29f857971dSopenharmony_cinamespace DeviceStatus {
30f857971dSopenharmony_ci
31f857971dSopenharmony_civoid DelegateTasks::Task::ProcessTask()
32f857971dSopenharmony_ci{
33f857971dSopenharmony_ci    if (hasWaited_) {
34f857971dSopenharmony_ci        FI_HILOGE("Expired task will be discarded, id:%{public}d", id_);
35f857971dSopenharmony_ci        return;
36f857971dSopenharmony_ci    }
37f857971dSopenharmony_ci    int32_t ret = fun_();
38f857971dSopenharmony_ci    std::string taskType = ((promise_ == nullptr) ? "Async" : "Sync");
39f857971dSopenharmony_ci    FI_HILOGD("process:%{public}s, task id:%{public}d, ret:%{public}d", taskType.c_str(), id_, ret);
40f857971dSopenharmony_ci    if (!hasWaited_ && promise_ != nullptr) {
41f857971dSopenharmony_ci        promise_->set_value(ret);
42f857971dSopenharmony_ci    }
43f857971dSopenharmony_ci}
44f857971dSopenharmony_ci
45f857971dSopenharmony_ciDelegateTasks::~DelegateTasks()
46f857971dSopenharmony_ci{
47f857971dSopenharmony_ci    if (fds_[0] >= 0) {
48f857971dSopenharmony_ci        if (close(fds_[0]) < 0) {
49f857971dSopenharmony_ci            FI_HILOGE("Close fds_[0] failed, error:%{public}s, fds_[0]:%{public}d", strerror(errno), fds_[0]);
50f857971dSopenharmony_ci        }
51f857971dSopenharmony_ci        fds_[0] = -1;
52f857971dSopenharmony_ci    }
53f857971dSopenharmony_ci    if (fds_[1] >= 0) {
54f857971dSopenharmony_ci        if (close(fds_[1]) < 0) {
55f857971dSopenharmony_ci            FI_HILOGE("Close fds_[1] failed, error:%{public}s, fds_[1]:%{public}d", strerror(errno), fds_[1]);
56f857971dSopenharmony_ci        }
57f857971dSopenharmony_ci        fds_[1] = -1;
58f857971dSopenharmony_ci    }
59f857971dSopenharmony_ci}
60f857971dSopenharmony_ci
61f857971dSopenharmony_cibool DelegateTasks::Init()
62f857971dSopenharmony_ci{
63f857971dSopenharmony_ci    CALL_DEBUG_ENTER;
64f857971dSopenharmony_ci    if (::pipe2(fds_, O_CLOEXEC | O_NONBLOCK) != 0) {
65f857971dSopenharmony_ci        FI_HILOGE("pipe2 failed, errno:%{public}s", ::strerror(errno));
66f857971dSopenharmony_ci        return false;
67f857971dSopenharmony_ci    }
68f857971dSopenharmony_ci    return true;
69f857971dSopenharmony_ci}
70f857971dSopenharmony_ci
71f857971dSopenharmony_civoid DelegateTasks::ProcessTasks()
72f857971dSopenharmony_ci{
73f857971dSopenharmony_ci    CALL_DEBUG_ENTER;
74f857971dSopenharmony_ci    std::vector<TaskPtr> tasks;
75f857971dSopenharmony_ci    PopPendingTaskList(tasks);
76f857971dSopenharmony_ci    for (const auto &it : tasks) {
77f857971dSopenharmony_ci        it->ProcessTask();
78f857971dSopenharmony_ci    }
79f857971dSopenharmony_ci}
80f857971dSopenharmony_ci
81f857971dSopenharmony_ciint32_t DelegateTasks::PostSyncTask(DTaskCallback callback)
82f857971dSopenharmony_ci{
83f857971dSopenharmony_ci    CALL_DEBUG_ENTER;
84f857971dSopenharmony_ci    CHKPR(callback, ERROR_NULL_POINTER);
85f857971dSopenharmony_ci    if (IsCallFromWorkerThread()) {
86f857971dSopenharmony_ci        return callback();
87f857971dSopenharmony_ci    }
88f857971dSopenharmony_ci    Promise promise;
89f857971dSopenharmony_ci    Future future = promise.get_future();
90f857971dSopenharmony_ci    auto task = PostTask(callback, &promise);
91f857971dSopenharmony_ci    CHKPR(task, ETASKS_POST_SYNCTASK_FAIL);
92f857971dSopenharmony_ci
93f857971dSopenharmony_ci    static constexpr int32_t timeout = 3000;
94f857971dSopenharmony_ci    std::chrono::milliseconds span(timeout);
95f857971dSopenharmony_ci    auto res = future.wait_for(span);
96f857971dSopenharmony_ci    task->SetWaited();
97f857971dSopenharmony_ci    if (res == std::future_status::timeout) {
98f857971dSopenharmony_ci        FI_HILOGE("Task timeout");
99f857971dSopenharmony_ci        return ETASKS_WAIT_TIMEOUT;
100f857971dSopenharmony_ci    } else if (res == std::future_status::deferred) {
101f857971dSopenharmony_ci        FI_HILOGE("Task deferred");
102f857971dSopenharmony_ci        return ETASKS_WAIT_DEFERRED;
103f857971dSopenharmony_ci    }
104f857971dSopenharmony_ci    return future.get();
105f857971dSopenharmony_ci}
106f857971dSopenharmony_ci
107f857971dSopenharmony_ciint32_t DelegateTasks::PostAsyncTask(DTaskCallback callback)
108f857971dSopenharmony_ci{
109f857971dSopenharmony_ci    CHKPR(callback, ERROR_NULL_POINTER);
110f857971dSopenharmony_ci    auto task = PostTask(callback);
111f857971dSopenharmony_ci    CHKPR(task, ETASKS_POST_ASYNCTASK_FAIL);
112f857971dSopenharmony_ci    return RET_OK;
113f857971dSopenharmony_ci}
114f857971dSopenharmony_ci
115f857971dSopenharmony_civoid DelegateTasks::PopPendingTaskList(std::vector<TaskPtr> &tasks)
116f857971dSopenharmony_ci{
117f857971dSopenharmony_ci    std::lock_guard<std::mutex> guard(mux_);
118f857971dSopenharmony_ci    static constexpr int32_t onceProcessTaskLimit = 10;
119f857971dSopenharmony_ci    for (int32_t i = 0; i < onceProcessTaskLimit; i++) {
120f857971dSopenharmony_ci        if (tasks_.empty()) {
121f857971dSopenharmony_ci            break;
122f857971dSopenharmony_ci        }
123f857971dSopenharmony_ci        auto taskDuty = tasks_.front();
124f857971dSopenharmony_ci        CHKPB(taskDuty);
125f857971dSopenharmony_ci        RecoveryId(taskDuty->GetId());
126f857971dSopenharmony_ci        tasks.push_back(taskDuty->GetSharedPtr());
127f857971dSopenharmony_ci        tasks_.pop();
128f857971dSopenharmony_ci    }
129f857971dSopenharmony_ci}
130f857971dSopenharmony_ci
131f857971dSopenharmony_ciDelegateTasks::TaskPtr DelegateTasks::PostTask(DTaskCallback callback, Promise *promise)
132f857971dSopenharmony_ci{
133f857971dSopenharmony_ci    std::lock_guard<std::mutex> guard(mux_);
134f857971dSopenharmony_ci    FI_HILOGD("tasks_ size:%{public}zu", tasks_.size());
135f857971dSopenharmony_ci    static constexpr int32_t maxTasksLimit = 1000;
136f857971dSopenharmony_ci    size_t tsize = tasks_.size();
137f857971dSopenharmony_ci    if (tsize > maxTasksLimit) {
138f857971dSopenharmony_ci        FI_HILOGE("The task queue is full, size:%{public}zu/%{public}d", tsize, maxTasksLimit);
139f857971dSopenharmony_ci        return nullptr;
140f857971dSopenharmony_ci    }
141f857971dSopenharmony_ci    int32_t id = GenerateId();
142f857971dSopenharmony_ci    TaskData data = {GetThisThreadId(), id};
143f857971dSopenharmony_ci    ssize_t res = write(fds_[1], &data, sizeof(data));
144f857971dSopenharmony_ci    if (res == -1) {
145f857971dSopenharmony_ci        RecoveryId(id);
146f857971dSopenharmony_ci        FI_HILOGE("Write to pipe failed, errno:%{public}d", errno);
147f857971dSopenharmony_ci        return nullptr;
148f857971dSopenharmony_ci    }
149f857971dSopenharmony_ci    TaskPtr task = std::make_shared<Task>(id, callback, promise);
150f857971dSopenharmony_ci    tasks_.push(task);
151f857971dSopenharmony_ci    std::string taskType = ((promise == nullptr) ? "Async" : "Sync");
152f857971dSopenharmony_ci    FI_HILOGD("TaskType post %{public}s", taskType.c_str());
153f857971dSopenharmony_ci    return task->GetSharedPtr();
154f857971dSopenharmony_ci}
155f857971dSopenharmony_ci} // namespace DeviceStatus
156f857971dSopenharmony_ci} // namespace Msdp
157f857971dSopenharmony_ci} // namespace OHOS
158