1/*
2 * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include "schedule_task_manager.h"
17
18#include <ctime>
19#include <fcntl.h>
20#include <mutex>
21#include <pthread.h>
22#include <cstring>
23#include <sys/epoll.h>
24#include <sys/eventfd.h>
25#include <sys/timerfd.h>
26#include <unistd.h>
27
28namespace {
29constexpr int32_t TIME_BASE = 1000; // Time progression rate.
30constexpr int32_t FIRST_TIME = 10; // The start time of the first task is 10 nanoseconds.
31constexpr int32_t EPOLL_EVENT_MAX = 1024;
32} // namespace
33
34ScheduleTaskManager::ScheduleTaskManager()
35{
36    StartThread();
37}
38
39ScheduleTaskManager::~ScheduleTaskManager()
40{
41    Shutdown();
42}
43
44void ScheduleTaskManager::Shutdown()
45{
46    bool expect = true;
47    if (!runScheduleThread_.compare_exchange_strong(expect, false)) {
48        return;
49    }
50    uint64_t value = 1;
51    write(stopFd_, &value, sizeof(value));
52    if (scheduleThread_.joinable()) {
53        scheduleThread_.join();
54    }
55    std::lock_guard<std::mutex> guard(mtx_);
56    for (const auto& [timerFd, func] : tasks_) {
57        close(timerFd);
58    }
59    close(epollFd_);
60    close(stopFd_);
61}
62
63int32_t ScheduleTaskManager::ScheduleTask(const std::function<void(void)>& callback, const uint64_t interval, bool once,
64                                          bool block)
65{
66    int32_t timerFd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
67    if (timerFd == -1) {
68        PROFILER_LOG_ERROR(LOG_CORE, "ScheduleTaskManager timerfd create failed");
69        return -1;
70    }
71
72    std::function<void(void)> func;
73    struct itimerspec time;
74    if (once) {
75        if (interval == 0) {
76            PROFILER_LOG_ERROR(LOG_CORE, "the interval parameters of a single execution cannot be 0");
77            return -1;
78        }
79        time.it_value.tv_sec = interval / TIME_BASE;
80        time.it_value.tv_nsec = (interval % TIME_BASE) * TIME_BASE * TIME_BASE;
81        time.it_interval.tv_sec = 0;
82        time.it_interval.tv_nsec = 0;
83        func = ([this, timerFd, callback] { this->HandleSingleTask(timerFd, callback); });
84    } else {
85        time.it_value.tv_sec = 0;
86        time.it_value.tv_nsec = FIRST_TIME;
87        time.it_interval.tv_sec = interval / TIME_BASE;
88        time.it_interval.tv_nsec = (interval % TIME_BASE) * TIME_BASE * TIME_BASE;
89        func = callback;
90    }
91
92    int32_t ret = timerfd_settime(timerFd, 0, &time, NULL);
93    if (ret == -1) {
94        PROFILER_LOG_ERROR(LOG_CORE, "ScheduleTaskManager timerfd settime failed");
95        return -1;
96    }
97
98    struct epoll_event evt;
99    evt.data.fd = timerFd;
100    evt.events = EPOLLIN;
101    epoll_ctl(epollFd_, EPOLL_CTL_ADD, timerFd, &evt);
102    if (block) {
103        std::lock_guard<std::mutex> guard(mtx_);
104        tasks_[timerFd] = std::move(func);
105    } else {
106        tasks_[timerFd] = std::move(func);
107    }
108    return timerFd;
109}
110
111bool ScheduleTaskManager::UnscheduleTask(const int32_t timerFd)
112{
113    std::lock_guard<std::mutex> guard(mtx_);
114    return DeleteTask(timerFd);
115}
116
117bool ScheduleTaskManager::UnscheduleTaskLockless(const int32_t timerFd)
118{
119    return DeleteTask(timerFd);
120}
121
122bool ScheduleTaskManager::DeleteTask(const int32_t timerFd)
123{
124    if (auto iter = tasks_.find(timerFd); iter != tasks_.end()) {
125        tasks_.erase(timerFd);
126        epoll_ctl(epollFd_, EPOLL_CTL_DEL, timerFd, NULL);
127        close(timerFd);
128        return true;
129    }
130    return false;
131}
132
133void ScheduleTaskManager::ScheduleThread()
134{
135    pthread_setname_np(pthread_self(), "SchedTaskMgr");
136    uint64_t exp;
137    while (runScheduleThread_) {
138        struct epoll_event events[EPOLL_EVENT_MAX];
139        int32_t nfd = epoll_wait(epollFd_, events, EPOLL_EVENT_MAX, -1);
140        if (nfd > 0) {
141            for (int32_t i = 0; i < nfd; ++i) {
142                if (events[i].data.fd == stopFd_) {
143                    return;
144                }
145
146                int32_t ret = read(events[i].data.fd, &exp, sizeof(uint64_t));
147                if (ret == sizeof(uint64_t)) {
148                    if (tasks_.find(events[i].data.fd) != tasks_.end() && tasks_[events[i].data.fd] != nullptr) {
149                        auto funcTask = tasks_[events[i].data.fd];
150                        funcTask();
151                    }
152                }
153            }
154        }
155    }
156}
157
158void ScheduleTaskManager::HandleSingleTask(int32_t fd, std::function<void(void)> callback)
159{
160    callback();
161    UnscheduleTaskLockless(fd);
162}
163
164void ScheduleTaskManager::StartThread()
165{
166    epollFd_ = epoll_create(0);
167    stopFd_ = eventfd(0, EFD_NONBLOCK); // Specifically designed for stopping epoll_wait.
168    struct epoll_event evt;
169    evt.data.fd = stopFd_;
170    evt.events = EPOLLIN;
171    epoll_ctl(epollFd_, EPOLL_CTL_ADD, stopFd_, &evt);
172
173    scheduleThread_ = std::thread([this] { this->ScheduleThread(); });
174}