1b1b8bc3fSopenharmony_ci/* 2b1b8bc3fSopenharmony_ci * Copyright (c) 2022 Huawei Device Co., Ltd. 3b1b8bc3fSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 4b1b8bc3fSopenharmony_ci * you may not use this file except in compliance with the License. 5b1b8bc3fSopenharmony_ci * You may obtain a copy of the License at 6b1b8bc3fSopenharmony_ci * 7b1b8bc3fSopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 8b1b8bc3fSopenharmony_ci * 9b1b8bc3fSopenharmony_ci * Unless required by applicable law or agreed to in writing, software 10b1b8bc3fSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 11b1b8bc3fSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12b1b8bc3fSopenharmony_ci * See the License for the specific language governing permissions and 13b1b8bc3fSopenharmony_ci * limitations under the License. 14b1b8bc3fSopenharmony_ci */ 15b1b8bc3fSopenharmony_ci 16b1b8bc3fSopenharmony_ci#ifndef COMMUNICATION_NETMANAGER_BASE_DELAYED_QUEUE_H 17b1b8bc3fSopenharmony_ci#define COMMUNICATION_NETMANAGER_BASE_DELAYED_QUEUE_H 18b1b8bc3fSopenharmony_ci 19b1b8bc3fSopenharmony_ci#include <array> 20b1b8bc3fSopenharmony_ci#include <atomic> 21b1b8bc3fSopenharmony_ci#include <condition_variable> 22b1b8bc3fSopenharmony_ci#include <map> 23b1b8bc3fSopenharmony_ci#include <memory> 24b1b8bc3fSopenharmony_ci#include <mutex> 25b1b8bc3fSopenharmony_ci#include <set> 26b1b8bc3fSopenharmony_ci#include <thread> 27b1b8bc3fSopenharmony_ci 28b1b8bc3fSopenharmony_cinamespace OHOS::NetManagerStandard { 29b1b8bc3fSopenharmony_citemplate <typename T, size_t ARRAY_SIZE, size_t DELAYED_COUNT> class DelayedQueue { 30b1b8bc3fSopenharmony_cipublic: 31b1b8bc3fSopenharmony_ci DelayedQueue() : index_(0), needRun_(true) 32b1b8bc3fSopenharmony_ci { 33b1b8bc3fSopenharmony_ci pthread_ = std::thread([this]() { 34b1b8bc3fSopenharmony_ci while (needRun_) { 35b1b8bc3fSopenharmony_ci { 36b1b8bc3fSopenharmony_ci std::lock_guard<std::mutex> guard(mutex_); 37b1b8bc3fSopenharmony_ci for (const auto &elem : elems_[index_]) { 38b1b8bc3fSopenharmony_ci if (elem) { 39b1b8bc3fSopenharmony_ci elem->Execute(); 40b1b8bc3fSopenharmony_ci } 41b1b8bc3fSopenharmony_ci indexMap_.erase(elem); 42b1b8bc3fSopenharmony_ci } 43b1b8bc3fSopenharmony_ci elems_[index_].clear(); 44b1b8bc3fSopenharmony_ci } 45b1b8bc3fSopenharmony_ci if (!needRun_) { 46b1b8bc3fSopenharmony_ci break; 47b1b8bc3fSopenharmony_ci } 48b1b8bc3fSopenharmony_ci std::unique_lock<std::mutex> needRunLock(needRunMutex_); 49b1b8bc3fSopenharmony_ci needRunCondition_.wait_for(needRunLock, std::chrono::seconds(1), [this] { return !needRun_; }); 50b1b8bc3fSopenharmony_ci std::lock_guard<std::mutex> guard(mutex_); 51b1b8bc3fSopenharmony_ci index_ = (index_ + 1) % (ARRAY_SIZE + DELAYED_COUNT); 52b1b8bc3fSopenharmony_ci } 53b1b8bc3fSopenharmony_ci }); 54b1b8bc3fSopenharmony_ci } 55b1b8bc3fSopenharmony_ci 56b1b8bc3fSopenharmony_ci ~DelayedQueue() 57b1b8bc3fSopenharmony_ci { 58b1b8bc3fSopenharmony_ci // set needRun_ = false, and notify the thread to wake 59b1b8bc3fSopenharmony_ci needRun_ = false; 60b1b8bc3fSopenharmony_ci needRunCondition_.notify_all(); 61b1b8bc3fSopenharmony_ci if (pthread_.joinable()) { 62b1b8bc3fSopenharmony_ci pthread_.join(); 63b1b8bc3fSopenharmony_ci } 64b1b8bc3fSopenharmony_ci } 65b1b8bc3fSopenharmony_ci 66b1b8bc3fSopenharmony_ci void Put(const std::shared_ptr<T> &elem) 67b1b8bc3fSopenharmony_ci { 68b1b8bc3fSopenharmony_ci std::lock_guard<std::mutex> guard(mutex_); 69b1b8bc3fSopenharmony_ci if (indexMap_.find(elem) != indexMap_.end()) { 70b1b8bc3fSopenharmony_ci int oldIndex = indexMap_[elem]; 71b1b8bc3fSopenharmony_ci if (oldIndex >= 0 && oldIndex < static_cast<int>(elems_.size()) && 72b1b8bc3fSopenharmony_ci (elems_[oldIndex].find(elem) != elems_[oldIndex].end())) { 73b1b8bc3fSopenharmony_ci elems_[oldIndex].erase(elem); 74b1b8bc3fSopenharmony_ci } 75b1b8bc3fSopenharmony_ci } 76b1b8bc3fSopenharmony_ci int index = (index_ + DELAYED_COUNT) % (ARRAY_SIZE + DELAYED_COUNT); 77b1b8bc3fSopenharmony_ci elems_[index].insert(elem); 78b1b8bc3fSopenharmony_ci indexMap_[elem] = index; 79b1b8bc3fSopenharmony_ci } 80b1b8bc3fSopenharmony_ci 81b1b8bc3fSopenharmony_ciprivate: 82b1b8bc3fSopenharmony_ci std::thread pthread_; 83b1b8bc3fSopenharmony_ci int index_; 84b1b8bc3fSopenharmony_ci std::mutex mutex_; 85b1b8bc3fSopenharmony_ci std::atomic_bool needRun_; 86b1b8bc3fSopenharmony_ci std::condition_variable needRunCondition_; 87b1b8bc3fSopenharmony_ci std::mutex needRunMutex_; 88b1b8bc3fSopenharmony_ci std::array<std::set<std::shared_ptr<T>, std::owner_less<std::shared_ptr<T>>>, ARRAY_SIZE + DELAYED_COUNT> elems_; 89b1b8bc3fSopenharmony_ci std::map<std::shared_ptr<T>, int, std::owner_less<std::shared_ptr<T>>> indexMap_; 90b1b8bc3fSopenharmony_ci}; 91b1b8bc3fSopenharmony_ci} // namespace OHOS::NetManagerStandard 92b1b8bc3fSopenharmony_ci 93b1b8bc3fSopenharmony_ci#endif // COMMUNICATION_NETMANAGER_BASE_DELAYED_QUEUE_H 94