106f6ba60Sopenharmony_ci/*
206f6ba60Sopenharmony_ci * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
306f6ba60Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
406f6ba60Sopenharmony_ci * you may not use this file except in compliance with the License.
506f6ba60Sopenharmony_ci * You may obtain a copy of the License at
606f6ba60Sopenharmony_ci *
706f6ba60Sopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0
806f6ba60Sopenharmony_ci *
906f6ba60Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software
1006f6ba60Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
1106f6ba60Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1206f6ba60Sopenharmony_ci * See the License for the specific language governing permissions and
1306f6ba60Sopenharmony_ci * limitations under the License.
1406f6ba60Sopenharmony_ci */
1506f6ba60Sopenharmony_ci
1606f6ba60Sopenharmony_ci#include "schedule_task_manager.h"
1706f6ba60Sopenharmony_ci
1806f6ba60Sopenharmony_ci#include <ctime>
1906f6ba60Sopenharmony_ci#include <fcntl.h>
2006f6ba60Sopenharmony_ci#include <mutex>
2106f6ba60Sopenharmony_ci#include <pthread.h>
2206f6ba60Sopenharmony_ci#include <cstring>
2306f6ba60Sopenharmony_ci#include <sys/epoll.h>
2406f6ba60Sopenharmony_ci#include <sys/eventfd.h>
2506f6ba60Sopenharmony_ci#include <sys/timerfd.h>
2606f6ba60Sopenharmony_ci#include <unistd.h>
2706f6ba60Sopenharmony_ci
2806f6ba60Sopenharmony_cinamespace {
2906f6ba60Sopenharmony_ciconstexpr int32_t TIME_BASE = 1000; // Time progression rate.
3006f6ba60Sopenharmony_ciconstexpr int32_t FIRST_TIME = 10; // The start time of the first task is 10 nanoseconds.
3106f6ba60Sopenharmony_ciconstexpr int32_t EPOLL_EVENT_MAX = 1024;
3206f6ba60Sopenharmony_ci} // namespace
3306f6ba60Sopenharmony_ci
3406f6ba60Sopenharmony_ciScheduleTaskManager::ScheduleTaskManager()
3506f6ba60Sopenharmony_ci{
3606f6ba60Sopenharmony_ci    StartThread();
3706f6ba60Sopenharmony_ci}
3806f6ba60Sopenharmony_ci
3906f6ba60Sopenharmony_ciScheduleTaskManager::~ScheduleTaskManager()
4006f6ba60Sopenharmony_ci{
4106f6ba60Sopenharmony_ci    Shutdown();
4206f6ba60Sopenharmony_ci}
4306f6ba60Sopenharmony_ci
4406f6ba60Sopenharmony_civoid ScheduleTaskManager::Shutdown()
4506f6ba60Sopenharmony_ci{
4606f6ba60Sopenharmony_ci    bool expect = true;
4706f6ba60Sopenharmony_ci    if (!runScheduleThread_.compare_exchange_strong(expect, false)) {
4806f6ba60Sopenharmony_ci        return;
4906f6ba60Sopenharmony_ci    }
5006f6ba60Sopenharmony_ci    uint64_t value = 1;
5106f6ba60Sopenharmony_ci    write(stopFd_, &value, sizeof(value));
5206f6ba60Sopenharmony_ci    if (scheduleThread_.joinable()) {
5306f6ba60Sopenharmony_ci        scheduleThread_.join();
5406f6ba60Sopenharmony_ci    }
5506f6ba60Sopenharmony_ci    std::lock_guard<std::mutex> guard(mtx_);
5606f6ba60Sopenharmony_ci    for (const auto& [timerFd, func] : tasks_) {
5706f6ba60Sopenharmony_ci        close(timerFd);
5806f6ba60Sopenharmony_ci    }
5906f6ba60Sopenharmony_ci    close(epollFd_);
6006f6ba60Sopenharmony_ci    close(stopFd_);
6106f6ba60Sopenharmony_ci}
6206f6ba60Sopenharmony_ci
6306f6ba60Sopenharmony_ciint32_t ScheduleTaskManager::ScheduleTask(const std::function<void(void)>& callback, const uint64_t interval, bool once,
6406f6ba60Sopenharmony_ci                                          bool block)
6506f6ba60Sopenharmony_ci{
6606f6ba60Sopenharmony_ci    int32_t timerFd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
6706f6ba60Sopenharmony_ci    if (timerFd == -1) {
6806f6ba60Sopenharmony_ci        PROFILER_LOG_ERROR(LOG_CORE, "ScheduleTaskManager timerfd create failed");
6906f6ba60Sopenharmony_ci        return -1;
7006f6ba60Sopenharmony_ci    }
7106f6ba60Sopenharmony_ci
7206f6ba60Sopenharmony_ci    std::function<void(void)> func;
7306f6ba60Sopenharmony_ci    struct itimerspec time;
7406f6ba60Sopenharmony_ci    if (once) {
7506f6ba60Sopenharmony_ci        if (interval == 0) {
7606f6ba60Sopenharmony_ci            PROFILER_LOG_ERROR(LOG_CORE, "the interval parameters of a single execution cannot be 0");
7706f6ba60Sopenharmony_ci            return -1;
7806f6ba60Sopenharmony_ci        }
7906f6ba60Sopenharmony_ci        time.it_value.tv_sec = interval / TIME_BASE;
8006f6ba60Sopenharmony_ci        time.it_value.tv_nsec = (interval % TIME_BASE) * TIME_BASE * TIME_BASE;
8106f6ba60Sopenharmony_ci        time.it_interval.tv_sec = 0;
8206f6ba60Sopenharmony_ci        time.it_interval.tv_nsec = 0;
8306f6ba60Sopenharmony_ci        func = ([this, timerFd, callback] { this->HandleSingleTask(timerFd, callback); });
8406f6ba60Sopenharmony_ci    } else {
8506f6ba60Sopenharmony_ci        time.it_value.tv_sec = 0;
8606f6ba60Sopenharmony_ci        time.it_value.tv_nsec = FIRST_TIME;
8706f6ba60Sopenharmony_ci        time.it_interval.tv_sec = interval / TIME_BASE;
8806f6ba60Sopenharmony_ci        time.it_interval.tv_nsec = (interval % TIME_BASE) * TIME_BASE * TIME_BASE;
8906f6ba60Sopenharmony_ci        func = callback;
9006f6ba60Sopenharmony_ci    }
9106f6ba60Sopenharmony_ci
9206f6ba60Sopenharmony_ci    int32_t ret = timerfd_settime(timerFd, 0, &time, NULL);
9306f6ba60Sopenharmony_ci    if (ret == -1) {
9406f6ba60Sopenharmony_ci        PROFILER_LOG_ERROR(LOG_CORE, "ScheduleTaskManager timerfd settime failed");
9506f6ba60Sopenharmony_ci        return -1;
9606f6ba60Sopenharmony_ci    }
9706f6ba60Sopenharmony_ci
9806f6ba60Sopenharmony_ci    struct epoll_event evt;
9906f6ba60Sopenharmony_ci    evt.data.fd = timerFd;
10006f6ba60Sopenharmony_ci    evt.events = EPOLLIN;
10106f6ba60Sopenharmony_ci    epoll_ctl(epollFd_, EPOLL_CTL_ADD, timerFd, &evt);
10206f6ba60Sopenharmony_ci    if (block) {
10306f6ba60Sopenharmony_ci        std::lock_guard<std::mutex> guard(mtx_);
10406f6ba60Sopenharmony_ci        tasks_[timerFd] = std::move(func);
10506f6ba60Sopenharmony_ci    } else {
10606f6ba60Sopenharmony_ci        tasks_[timerFd] = std::move(func);
10706f6ba60Sopenharmony_ci    }
10806f6ba60Sopenharmony_ci    return timerFd;
10906f6ba60Sopenharmony_ci}
11006f6ba60Sopenharmony_ci
11106f6ba60Sopenharmony_cibool ScheduleTaskManager::UnscheduleTask(const int32_t timerFd)
11206f6ba60Sopenharmony_ci{
11306f6ba60Sopenharmony_ci    std::lock_guard<std::mutex> guard(mtx_);
11406f6ba60Sopenharmony_ci    return DeleteTask(timerFd);
11506f6ba60Sopenharmony_ci}
11606f6ba60Sopenharmony_ci
11706f6ba60Sopenharmony_cibool ScheduleTaskManager::UnscheduleTaskLockless(const int32_t timerFd)
11806f6ba60Sopenharmony_ci{
11906f6ba60Sopenharmony_ci    return DeleteTask(timerFd);
12006f6ba60Sopenharmony_ci}
12106f6ba60Sopenharmony_ci
12206f6ba60Sopenharmony_cibool ScheduleTaskManager::DeleteTask(const int32_t timerFd)
12306f6ba60Sopenharmony_ci{
12406f6ba60Sopenharmony_ci    if (auto iter = tasks_.find(timerFd); iter != tasks_.end()) {
12506f6ba60Sopenharmony_ci        tasks_.erase(timerFd);
12606f6ba60Sopenharmony_ci        epoll_ctl(epollFd_, EPOLL_CTL_DEL, timerFd, NULL);
12706f6ba60Sopenharmony_ci        close(timerFd);
12806f6ba60Sopenharmony_ci        return true;
12906f6ba60Sopenharmony_ci    }
13006f6ba60Sopenharmony_ci    return false;
13106f6ba60Sopenharmony_ci}
13206f6ba60Sopenharmony_ci
13306f6ba60Sopenharmony_civoid ScheduleTaskManager::ScheduleThread()
13406f6ba60Sopenharmony_ci{
13506f6ba60Sopenharmony_ci    pthread_setname_np(pthread_self(), "SchedTaskMgr");
13606f6ba60Sopenharmony_ci    uint64_t exp;
13706f6ba60Sopenharmony_ci    while (runScheduleThread_) {
13806f6ba60Sopenharmony_ci        struct epoll_event events[EPOLL_EVENT_MAX];
13906f6ba60Sopenharmony_ci        int32_t nfd = epoll_wait(epollFd_, events, EPOLL_EVENT_MAX, -1);
14006f6ba60Sopenharmony_ci        if (nfd > 0) {
14106f6ba60Sopenharmony_ci            for (int32_t i = 0; i < nfd; ++i) {
14206f6ba60Sopenharmony_ci                if (events[i].data.fd == stopFd_) {
14306f6ba60Sopenharmony_ci                    return;
14406f6ba60Sopenharmony_ci                }
14506f6ba60Sopenharmony_ci
14606f6ba60Sopenharmony_ci                int32_t ret = read(events[i].data.fd, &exp, sizeof(uint64_t));
14706f6ba60Sopenharmony_ci                if (ret == sizeof(uint64_t)) {
14806f6ba60Sopenharmony_ci                    if (tasks_.find(events[i].data.fd) != tasks_.end() && tasks_[events[i].data.fd] != nullptr) {
14906f6ba60Sopenharmony_ci                        auto funcTask = tasks_[events[i].data.fd];
15006f6ba60Sopenharmony_ci                        funcTask();
15106f6ba60Sopenharmony_ci                    }
15206f6ba60Sopenharmony_ci                }
15306f6ba60Sopenharmony_ci            }
15406f6ba60Sopenharmony_ci        }
15506f6ba60Sopenharmony_ci    }
15606f6ba60Sopenharmony_ci}
15706f6ba60Sopenharmony_ci
15806f6ba60Sopenharmony_civoid ScheduleTaskManager::HandleSingleTask(int32_t fd, std::function<void(void)> callback)
15906f6ba60Sopenharmony_ci{
16006f6ba60Sopenharmony_ci    callback();
16106f6ba60Sopenharmony_ci    UnscheduleTaskLockless(fd);
16206f6ba60Sopenharmony_ci}
16306f6ba60Sopenharmony_ci
16406f6ba60Sopenharmony_civoid ScheduleTaskManager::StartThread()
16506f6ba60Sopenharmony_ci{
16606f6ba60Sopenharmony_ci    epollFd_ = epoll_create(0);
16706f6ba60Sopenharmony_ci    stopFd_ = eventfd(0, EFD_NONBLOCK); // Specifically designed for stopping epoll_wait.
16806f6ba60Sopenharmony_ci    struct epoll_event evt;
16906f6ba60Sopenharmony_ci    evt.data.fd = stopFd_;
17006f6ba60Sopenharmony_ci    evt.events = EPOLLIN;
17106f6ba60Sopenharmony_ci    epoll_ctl(epollFd_, EPOLL_CTL_ADD, stopFd_, &evt);
17206f6ba60Sopenharmony_ci
17306f6ba60Sopenharmony_ci    scheduleThread_ = std::thread([this] { this->ScheduleThread(); });
17406f6ba60Sopenharmony_ci}