1 /*
2  * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "schedule_task_manager.h"
17 
18 #include <ctime>
19 #include <fcntl.h>
20 #include <mutex>
21 #include <pthread.h>
22 #include <cstring>
23 #include <sys/epoll.h>
24 #include <sys/eventfd.h>
25 #include <sys/timerfd.h>
26 #include <unistd.h>
27 
28 namespace {
29 constexpr int32_t TIME_BASE = 1000; // Time progression rate.
30 constexpr int32_t FIRST_TIME = 10; // The start time of the first task is 10 nanoseconds.
31 constexpr int32_t EPOLL_EVENT_MAX = 1024;
32 } // namespace
33 
ScheduleTaskManager()34 ScheduleTaskManager::ScheduleTaskManager()
35 {
36     StartThread();
37 }
38 
~ScheduleTaskManager()39 ScheduleTaskManager::~ScheduleTaskManager()
40 {
41     Shutdown();
42 }
43 
Shutdown()44 void ScheduleTaskManager::Shutdown()
45 {
46     bool expect = true;
47     if (!runScheduleThread_.compare_exchange_strong(expect, false)) {
48         return;
49     }
50     uint64_t value = 1;
51     write(stopFd_, &value, sizeof(value));
52     if (scheduleThread_.joinable()) {
53         scheduleThread_.join();
54     }
55     std::lock_guard<std::mutex> guard(mtx_);
56     for (const auto& [timerFd, func] : tasks_) {
57         close(timerFd);
58     }
59     close(epollFd_);
60     close(stopFd_);
61 }
62 
ScheduleTask(const std::function<void(void)>& callback, const uint64_t interval, bool once, bool block)63 int32_t ScheduleTaskManager::ScheduleTask(const std::function<void(void)>& callback, const uint64_t interval, bool once,
64                                           bool block)
65 {
66     int32_t timerFd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
67     if (timerFd == -1) {
68         PROFILER_LOG_ERROR(LOG_CORE, "ScheduleTaskManager timerfd create failed");
69         return -1;
70     }
71 
72     std::function<void(void)> func;
73     struct itimerspec time;
74     if (once) {
75         if (interval == 0) {
76             PROFILER_LOG_ERROR(LOG_CORE, "the interval parameters of a single execution cannot be 0");
77             return -1;
78         }
79         time.it_value.tv_sec = interval / TIME_BASE;
80         time.it_value.tv_nsec = (interval % TIME_BASE) * TIME_BASE * TIME_BASE;
81         time.it_interval.tv_sec = 0;
82         time.it_interval.tv_nsec = 0;
83         func = ([this, timerFd, callback] { this->HandleSingleTask(timerFd, callback); });
84     } else {
85         time.it_value.tv_sec = 0;
86         time.it_value.tv_nsec = FIRST_TIME;
87         time.it_interval.tv_sec = interval / TIME_BASE;
88         time.it_interval.tv_nsec = (interval % TIME_BASE) * TIME_BASE * TIME_BASE;
89         func = callback;
90     }
91 
92     int32_t ret = timerfd_settime(timerFd, 0, &time, NULL);
93     if (ret == -1) {
94         PROFILER_LOG_ERROR(LOG_CORE, "ScheduleTaskManager timerfd settime failed");
95         return -1;
96     }
97 
98     struct epoll_event evt;
99     evt.data.fd = timerFd;
100     evt.events = EPOLLIN;
101     epoll_ctl(epollFd_, EPOLL_CTL_ADD, timerFd, &evt);
102     if (block) {
103         std::lock_guard<std::mutex> guard(mtx_);
104         tasks_[timerFd] = std::move(func);
105     } else {
106         tasks_[timerFd] = std::move(func);
107     }
108     return timerFd;
109 }
110 
UnscheduleTask(const int32_t timerFd)111 bool ScheduleTaskManager::UnscheduleTask(const int32_t timerFd)
112 {
113     std::lock_guard<std::mutex> guard(mtx_);
114     return DeleteTask(timerFd);
115 }
116 
UnscheduleTaskLockless(const int32_t timerFd)117 bool ScheduleTaskManager::UnscheduleTaskLockless(const int32_t timerFd)
118 {
119     return DeleteTask(timerFd);
120 }
121 
DeleteTask(const int32_t timerFd)122 bool ScheduleTaskManager::DeleteTask(const int32_t timerFd)
123 {
124     if (auto iter = tasks_.find(timerFd); iter != tasks_.end()) {
125         tasks_.erase(timerFd);
126         epoll_ctl(epollFd_, EPOLL_CTL_DEL, timerFd, NULL);
127         close(timerFd);
128         return true;
129     }
130     return false;
131 }
132 
ScheduleThread()133 void ScheduleTaskManager::ScheduleThread()
134 {
135     pthread_setname_np(pthread_self(), "SchedTaskMgr");
136     uint64_t exp;
137     while (runScheduleThread_) {
138         struct epoll_event events[EPOLL_EVENT_MAX];
139         int32_t nfd = epoll_wait(epollFd_, events, EPOLL_EVENT_MAX, -1);
140         if (nfd > 0) {
141             for (int32_t i = 0; i < nfd; ++i) {
142                 if (events[i].data.fd == stopFd_) {
143                     return;
144                 }
145 
146                 int32_t ret = read(events[i].data.fd, &exp, sizeof(uint64_t));
147                 if (ret == sizeof(uint64_t)) {
148                     if (tasks_.find(events[i].data.fd) != tasks_.end() && tasks_[events[i].data.fd] != nullptr) {
149                         auto funcTask = tasks_[events[i].data.fd];
150                         funcTask();
151                     }
152                 }
153             }
154         }
155     }
156 }
157 
HandleSingleTask(int32_t fd, std::function<void(void)> callback)158 void ScheduleTaskManager::HandleSingleTask(int32_t fd, std::function<void(void)> callback)
159 {
160     callback();
161     UnscheduleTaskLockless(fd);
162 }
163 
StartThread()164 void ScheduleTaskManager::StartThread()
165 {
166     epollFd_ = epoll_create(0);
167     stopFd_ = eventfd(0, EFD_NONBLOCK); // Specifically designed for stopping epoll_wait.
168     struct epoll_event evt;
169     evt.data.fd = stopFd_;
170     evt.events = EPOLLIN;
171     epoll_ctl(epollFd_, EPOLL_CTL_ADD, stopFd_, &evt);
172 
173     scheduleThread_ = std::thread([this] { this->ScheduleThread(); });
174 }