1c29fa5a6Sopenharmony_ci/*
2c29fa5a6Sopenharmony_ci * Copyright (c) 2023 Huawei Device Co., Ltd.
3c29fa5a6Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
4c29fa5a6Sopenharmony_ci * you may not use this file except in compliance with the License.
5c29fa5a6Sopenharmony_ci * You may obtain a copy of the License at
6c29fa5a6Sopenharmony_ci *
7c29fa5a6Sopenharmony_ci *     http://www.apache.org/licenses/LICENSE-2.0
8c29fa5a6Sopenharmony_ci *
9c29fa5a6Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software
10c29fa5a6Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
11c29fa5a6Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12c29fa5a6Sopenharmony_ci * See the License for the specific language governing permissions and
13c29fa5a6Sopenharmony_ci * limitations under the License.
14c29fa5a6Sopenharmony_ci */
15c29fa5a6Sopenharmony_ci
16c29fa5a6Sopenharmony_ci#include "task_scheduler.h"
17c29fa5a6Sopenharmony_ci
18c29fa5a6Sopenharmony_ci#include <fcntl.h>
19c29fa5a6Sopenharmony_ci#include <sys/syscall.h>
20c29fa5a6Sopenharmony_ci#include <unistd.h>
21c29fa5a6Sopenharmony_ci
22c29fa5a6Sopenharmony_ci#include "devicestatus_define.h"
23c29fa5a6Sopenharmony_ci
24c29fa5a6Sopenharmony_ci#undef LOG_TAG
25c29fa5a6Sopenharmony_ci#define LOG_TAG "TaskScheduler"
26c29fa5a6Sopenharmony_ci
27c29fa5a6Sopenharmony_cinamespace OHOS {
28c29fa5a6Sopenharmony_cinamespace Msdp {
29c29fa5a6Sopenharmony_cinamespace DeviceStatus {
30c29fa5a6Sopenharmony_ci
31c29fa5a6Sopenharmony_civoid TaskScheduler::Task::ProcessTask()
32c29fa5a6Sopenharmony_ci{
33c29fa5a6Sopenharmony_ci    CALL_DEBUG_ENTER;
34c29fa5a6Sopenharmony_ci    if (hasWaited_) {
35c29fa5a6Sopenharmony_ci        FI_HILOGE("Expired tasks will be discarded, id:%{public}d", id_);
36c29fa5a6Sopenharmony_ci        return;
37c29fa5a6Sopenharmony_ci    }
38c29fa5a6Sopenharmony_ci    int32_t ret = fun_();
39c29fa5a6Sopenharmony_ci    std::string taskType = ((promise_ == nullptr) ? "Async" : "Sync");
40c29fa5a6Sopenharmony_ci    FI_HILOGD("process:%{public}s, task id:%{public}d, ret:%{public}d", taskType.c_str(), id_, ret);
41c29fa5a6Sopenharmony_ci    if (!hasWaited_ && promise_ != nullptr) {
42c29fa5a6Sopenharmony_ci        promise_->set_value(ret);
43c29fa5a6Sopenharmony_ci    }
44c29fa5a6Sopenharmony_ci}
45c29fa5a6Sopenharmony_ci
46c29fa5a6Sopenharmony_ciTaskScheduler::~TaskScheduler()
47c29fa5a6Sopenharmony_ci{
48c29fa5a6Sopenharmony_ci    if (fds_[0] >= 0) {
49c29fa5a6Sopenharmony_ci        if (close(fds_[0]) < 0) {
50c29fa5a6Sopenharmony_ci            FI_HILOGE("Close fds_[0] failed, err:%{public}s, fds_[0]:%{public}d", strerror(errno), fds_[0]);
51c29fa5a6Sopenharmony_ci        }
52c29fa5a6Sopenharmony_ci        fds_[0] = -1;
53c29fa5a6Sopenharmony_ci    }
54c29fa5a6Sopenharmony_ci    if (fds_[1] >= 0) {
55c29fa5a6Sopenharmony_ci        if (close(fds_[1]) < 0) {
56c29fa5a6Sopenharmony_ci            FI_HILOGE("Close fds_[1] failed, err:%{public}s, fds_[1]:%{public}d", strerror(errno), fds_[1]);
57c29fa5a6Sopenharmony_ci        }
58c29fa5a6Sopenharmony_ci        fds_[1] = -1;
59c29fa5a6Sopenharmony_ci    }
60c29fa5a6Sopenharmony_ci}
61c29fa5a6Sopenharmony_ci
62c29fa5a6Sopenharmony_cibool TaskScheduler::Init()
63c29fa5a6Sopenharmony_ci{
64c29fa5a6Sopenharmony_ci    CALL_DEBUG_ENTER;
65c29fa5a6Sopenharmony_ci    if (::pipe2(fds_, O_CLOEXEC | O_NONBLOCK) != 0) {
66c29fa5a6Sopenharmony_ci        FI_HILOGE("pipe2 failed, errno:%{public}s", ::strerror(errno));
67c29fa5a6Sopenharmony_ci        return false;
68c29fa5a6Sopenharmony_ci    }
69c29fa5a6Sopenharmony_ci    return true;
70c29fa5a6Sopenharmony_ci}
71c29fa5a6Sopenharmony_ci
72c29fa5a6Sopenharmony_civoid TaskScheduler::ProcessTasks()
73c29fa5a6Sopenharmony_ci{
74c29fa5a6Sopenharmony_ci    CALL_DEBUG_ENTER;
75c29fa5a6Sopenharmony_ci    std::vector<TaskPtr> tasks;
76c29fa5a6Sopenharmony_ci    PopPendingTaskList(tasks);
77c29fa5a6Sopenharmony_ci    for (const auto &it : tasks) {
78c29fa5a6Sopenharmony_ci        it->ProcessTask();
79c29fa5a6Sopenharmony_ci    }
80c29fa5a6Sopenharmony_ci}
81c29fa5a6Sopenharmony_ci
82c29fa5a6Sopenharmony_ciint32_t TaskScheduler::PostSyncTask(DTaskCallback cb)
83c29fa5a6Sopenharmony_ci{
84c29fa5a6Sopenharmony_ci    CALL_DEBUG_ENTER;
85c29fa5a6Sopenharmony_ci    CHKPR(cb, ERROR_NULL_POINTER);
86c29fa5a6Sopenharmony_ci    if (IsCallFromWorkerThread()) {
87c29fa5a6Sopenharmony_ci        return cb();
88c29fa5a6Sopenharmony_ci    }
89c29fa5a6Sopenharmony_ci    Promise promise;
90c29fa5a6Sopenharmony_ci    Future future = promise.get_future();
91c29fa5a6Sopenharmony_ci    auto task = PostTask(cb, &promise);
92c29fa5a6Sopenharmony_ci    CHKPR(task, ETASKS_POST_SYNCTASK_FAIL);
93c29fa5a6Sopenharmony_ci
94c29fa5a6Sopenharmony_ci    static constexpr int32_t timeout = 3000;
95c29fa5a6Sopenharmony_ci    std::chrono::milliseconds span(timeout);
96c29fa5a6Sopenharmony_ci    auto res = future.wait_for(span);
97c29fa5a6Sopenharmony_ci    task->SetWaited();
98c29fa5a6Sopenharmony_ci    if (res == std::future_status::timeout) {
99c29fa5a6Sopenharmony_ci        FI_HILOGE("Task timeout");
100c29fa5a6Sopenharmony_ci        return ETASKS_WAIT_TIMEOUT;
101c29fa5a6Sopenharmony_ci    } else if (res == std::future_status::deferred) {
102c29fa5a6Sopenharmony_ci        FI_HILOGE("Task deferred");
103c29fa5a6Sopenharmony_ci        return ETASKS_WAIT_DEFERRED;
104c29fa5a6Sopenharmony_ci    }
105c29fa5a6Sopenharmony_ci    return future.get();
106c29fa5a6Sopenharmony_ci}
107c29fa5a6Sopenharmony_ci
108c29fa5a6Sopenharmony_ciint32_t TaskScheduler::PostAsyncTask(DTaskCallback callback)
109c29fa5a6Sopenharmony_ci{
110c29fa5a6Sopenharmony_ci    CHKPR(callback, ERROR_NULL_POINTER);
111c29fa5a6Sopenharmony_ci    auto task = PostTask(callback);
112c29fa5a6Sopenharmony_ci    CHKPR(task, ETASKS_POST_ASYNCTASK_FAIL);
113c29fa5a6Sopenharmony_ci    return RET_OK;
114c29fa5a6Sopenharmony_ci}
115c29fa5a6Sopenharmony_ci
116c29fa5a6Sopenharmony_civoid TaskScheduler::PopPendingTaskList(std::vector<TaskPtr> &tasks)
117c29fa5a6Sopenharmony_ci{
118c29fa5a6Sopenharmony_ci    static constexpr int32_t onceProcessTaskLimit = 10;
119c29fa5a6Sopenharmony_ci    std::lock_guard<std::mutex> guard(mux_);
120c29fa5a6Sopenharmony_ci    for (int32_t i = 0; i < onceProcessTaskLimit; i++) {
121c29fa5a6Sopenharmony_ci        if (tasks_.empty()) {
122c29fa5a6Sopenharmony_ci            break;
123c29fa5a6Sopenharmony_ci        }
124c29fa5a6Sopenharmony_ci        auto firstTask = tasks_.front();
125c29fa5a6Sopenharmony_ci        CHKPB(firstTask);
126c29fa5a6Sopenharmony_ci        RecoveryId(firstTask->GetId());
127c29fa5a6Sopenharmony_ci        tasks.push_back(firstTask->GetSharedPtr());
128c29fa5a6Sopenharmony_ci        tasks_.pop();
129c29fa5a6Sopenharmony_ci    }
130c29fa5a6Sopenharmony_ci}
131c29fa5a6Sopenharmony_ci
132c29fa5a6Sopenharmony_ciTaskScheduler::TaskPtr TaskScheduler::PostTask(DTaskCallback callback, Promise *promise)
133c29fa5a6Sopenharmony_ci{
134c29fa5a6Sopenharmony_ci    FI_HILOGD("tasks_ size:%{public}zu", tasks_.size());
135c29fa5a6Sopenharmony_ci    static constexpr int32_t maxTasksLimit = 1000;
136c29fa5a6Sopenharmony_ci    std::lock_guard<std::mutex> guard(mux_);
137c29fa5a6Sopenharmony_ci    size_t tsize = tasks_.size();
138c29fa5a6Sopenharmony_ci    if (tsize > maxTasksLimit) {
139c29fa5a6Sopenharmony_ci        FI_HILOGE("The task queue is full, size:%{public}zu/%{public}d", tsize, maxTasksLimit);
140c29fa5a6Sopenharmony_ci        return nullptr;
141c29fa5a6Sopenharmony_ci    }
142c29fa5a6Sopenharmony_ci    int32_t id = GenerateId();
143c29fa5a6Sopenharmony_ci    TaskData data = { GetThisThreadId(), id };
144c29fa5a6Sopenharmony_ci    ssize_t res = write(fds_[1], &data, sizeof(data));
145c29fa5a6Sopenharmony_ci    if (res == -1) {
146c29fa5a6Sopenharmony_ci        RecoveryId(id);
147c29fa5a6Sopenharmony_ci        FI_HILOGE("Pipeline writes failed, errno:%{public}d", errno);
148c29fa5a6Sopenharmony_ci        return nullptr;
149c29fa5a6Sopenharmony_ci    }
150c29fa5a6Sopenharmony_ci    TaskPtr task = std::make_shared<Task>(id, callback, promise);
151c29fa5a6Sopenharmony_ci    tasks_.push(task);
152c29fa5a6Sopenharmony_ci    std::string taskType = ((promise == nullptr) ? "Async" : "Sync");
153c29fa5a6Sopenharmony_ci    FI_HILOGD("Post %{public}s", taskType.c_str());
154c29fa5a6Sopenharmony_ci    return task->GetSharedPtr();
155c29fa5a6Sopenharmony_ci}
156c29fa5a6Sopenharmony_ci} // namespace DeviceStatus
157c29fa5a6Sopenharmony_ci} // namespace Msdp
158c29fa5a6Sopenharmony_ci} // namespace OHOS
159