1/* 2 * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16#include "schedule_task_manager.h" 17 18#include <ctime> 19#include <fcntl.h> 20#include <mutex> 21#include <pthread.h> 22#include <cstring> 23#include <sys/epoll.h> 24#include <sys/eventfd.h> 25#include <sys/timerfd.h> 26#include <unistd.h> 27 28namespace { 29constexpr int32_t TIME_BASE = 1000; // Time progression rate. 30constexpr int32_t FIRST_TIME = 10; // The start time of the first task is 10 nanoseconds. 31constexpr int32_t EPOLL_EVENT_MAX = 1024; 32} // namespace 33 34ScheduleTaskManager::ScheduleTaskManager() 35{ 36 StartThread(); 37} 38 39ScheduleTaskManager::~ScheduleTaskManager() 40{ 41 Shutdown(); 42} 43 44void ScheduleTaskManager::Shutdown() 45{ 46 bool expect = true; 47 if (!runScheduleThread_.compare_exchange_strong(expect, false)) { 48 return; 49 } 50 uint64_t value = 1; 51 write(stopFd_, &value, sizeof(value)); 52 if (scheduleThread_.joinable()) { 53 scheduleThread_.join(); 54 } 55 std::lock_guard<std::mutex> guard(mtx_); 56 for (const auto& [timerFd, func] : tasks_) { 57 close(timerFd); 58 } 59 close(epollFd_); 60 close(stopFd_); 61} 62 63int32_t ScheduleTaskManager::ScheduleTask(const std::function<void(void)>& callback, const uint64_t interval, bool once, 64 bool block) 65{ 66 int32_t timerFd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); 67 if (timerFd == -1) { 68 PROFILER_LOG_ERROR(LOG_CORE, "ScheduleTaskManager timerfd create failed"); 69 return -1; 70 } 71 72 std::function<void(void)> func; 73 struct itimerspec time; 74 if (once) { 75 if (interval == 0) { 76 PROFILER_LOG_ERROR(LOG_CORE, "the interval parameters of a single execution cannot be 0"); 77 return -1; 78 } 79 time.it_value.tv_sec = interval / TIME_BASE; 80 time.it_value.tv_nsec = (interval % TIME_BASE) * TIME_BASE * TIME_BASE; 81 time.it_interval.tv_sec = 0; 82 time.it_interval.tv_nsec = 0; 83 func = ([this, timerFd, callback] { this->HandleSingleTask(timerFd, callback); }); 84 } else { 85 time.it_value.tv_sec = 0; 86 time.it_value.tv_nsec = FIRST_TIME; 87 time.it_interval.tv_sec = interval / TIME_BASE; 88 time.it_interval.tv_nsec = (interval % TIME_BASE) * TIME_BASE * TIME_BASE; 89 func = callback; 90 } 91 92 int32_t ret = timerfd_settime(timerFd, 0, &time, NULL); 93 if (ret == -1) { 94 PROFILER_LOG_ERROR(LOG_CORE, "ScheduleTaskManager timerfd settime failed"); 95 return -1; 96 } 97 98 struct epoll_event evt; 99 evt.data.fd = timerFd; 100 evt.events = EPOLLIN; 101 epoll_ctl(epollFd_, EPOLL_CTL_ADD, timerFd, &evt); 102 if (block) { 103 std::lock_guard<std::mutex> guard(mtx_); 104 tasks_[timerFd] = std::move(func); 105 } else { 106 tasks_[timerFd] = std::move(func); 107 } 108 return timerFd; 109} 110 111bool ScheduleTaskManager::UnscheduleTask(const int32_t timerFd) 112{ 113 std::lock_guard<std::mutex> guard(mtx_); 114 return DeleteTask(timerFd); 115} 116 117bool ScheduleTaskManager::UnscheduleTaskLockless(const int32_t timerFd) 118{ 119 return DeleteTask(timerFd); 120} 121 122bool ScheduleTaskManager::DeleteTask(const int32_t timerFd) 123{ 124 if (auto iter = tasks_.find(timerFd); iter != tasks_.end()) { 125 tasks_.erase(timerFd); 126 epoll_ctl(epollFd_, EPOLL_CTL_DEL, timerFd, NULL); 127 close(timerFd); 128 return true; 129 } 130 return false; 131} 132 133void ScheduleTaskManager::ScheduleThread() 134{ 135 pthread_setname_np(pthread_self(), "SchedTaskMgr"); 136 uint64_t exp; 137 while (runScheduleThread_) { 138 struct epoll_event events[EPOLL_EVENT_MAX]; 139 int32_t nfd = epoll_wait(epollFd_, events, EPOLL_EVENT_MAX, -1); 140 if (nfd > 0) { 141 for (int32_t i = 0; i < nfd; ++i) { 142 if (events[i].data.fd == stopFd_) { 143 return; 144 } 145 146 int32_t ret = read(events[i].data.fd, &exp, sizeof(uint64_t)); 147 if (ret == sizeof(uint64_t)) { 148 if (tasks_.find(events[i].data.fd) != tasks_.end() && tasks_[events[i].data.fd] != nullptr) { 149 auto funcTask = tasks_[events[i].data.fd]; 150 funcTask(); 151 } 152 } 153 } 154 } 155 } 156} 157 158void ScheduleTaskManager::HandleSingleTask(int32_t fd, std::function<void(void)> callback) 159{ 160 callback(); 161 UnscheduleTaskLockless(fd); 162} 163 164void ScheduleTaskManager::StartThread() 165{ 166 epollFd_ = epoll_create(0); 167 stopFd_ = eventfd(0, EFD_NONBLOCK); // Specifically designed for stopping epoll_wait. 168 struct epoll_event evt; 169 evt.data.fd = stopFd_; 170 evt.events = EPOLLIN; 171 epoll_ctl(epollFd_, EPOLL_CTL_ADD, stopFd_, &evt); 172 173 scheduleThread_ = std::thread([this] { this->ScheduleThread(); }); 174}