1b1b8bc3fSopenharmony_ci/*
2b1b8bc3fSopenharmony_ci * Copyright (c) 2022 Huawei Device Co., Ltd.
3b1b8bc3fSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
4b1b8bc3fSopenharmony_ci * you may not use this file except in compliance with the License.
5b1b8bc3fSopenharmony_ci * You may obtain a copy of the License at
6b1b8bc3fSopenharmony_ci *
7b1b8bc3fSopenharmony_ci *     http://www.apache.org/licenses/LICENSE-2.0
8b1b8bc3fSopenharmony_ci *
9b1b8bc3fSopenharmony_ci * Unless required by applicable law or agreed to in writing, software
10b1b8bc3fSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
11b1b8bc3fSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12b1b8bc3fSopenharmony_ci * See the License for the specific language governing permissions and
13b1b8bc3fSopenharmony_ci * limitations under the License.
14b1b8bc3fSopenharmony_ci */
15b1b8bc3fSopenharmony_ci
16b1b8bc3fSopenharmony_ci#ifndef COMMUNICATION_NETMANAGER_BASE_DELAYED_QUEUE_H
17b1b8bc3fSopenharmony_ci#define COMMUNICATION_NETMANAGER_BASE_DELAYED_QUEUE_H
18b1b8bc3fSopenharmony_ci
19b1b8bc3fSopenharmony_ci#include <array>
20b1b8bc3fSopenharmony_ci#include <atomic>
21b1b8bc3fSopenharmony_ci#include <condition_variable>
22b1b8bc3fSopenharmony_ci#include <map>
23b1b8bc3fSopenharmony_ci#include <memory>
24b1b8bc3fSopenharmony_ci#include <mutex>
25b1b8bc3fSopenharmony_ci#include <set>
26b1b8bc3fSopenharmony_ci#include <thread>
27b1b8bc3fSopenharmony_ci
28b1b8bc3fSopenharmony_cinamespace OHOS::NetManagerStandard {
29b1b8bc3fSopenharmony_citemplate <typename T, size_t ARRAY_SIZE, size_t DELAYED_COUNT> class DelayedQueue {
30b1b8bc3fSopenharmony_cipublic:
31b1b8bc3fSopenharmony_ci    DelayedQueue() : index_(0), needRun_(true)
32b1b8bc3fSopenharmony_ci    {
33b1b8bc3fSopenharmony_ci        pthread_ = std::thread([this]() {
34b1b8bc3fSopenharmony_ci            while (needRun_) {
35b1b8bc3fSopenharmony_ci                {
36b1b8bc3fSopenharmony_ci                    std::lock_guard<std::mutex> guard(mutex_);
37b1b8bc3fSopenharmony_ci                    for (const auto &elem : elems_[index_]) {
38b1b8bc3fSopenharmony_ci                        if (elem) {
39b1b8bc3fSopenharmony_ci                            elem->Execute();
40b1b8bc3fSopenharmony_ci                        }
41b1b8bc3fSopenharmony_ci                        indexMap_.erase(elem);
42b1b8bc3fSopenharmony_ci                    }
43b1b8bc3fSopenharmony_ci                    elems_[index_].clear();
44b1b8bc3fSopenharmony_ci                }
45b1b8bc3fSopenharmony_ci                if (!needRun_) {
46b1b8bc3fSopenharmony_ci                    break;
47b1b8bc3fSopenharmony_ci                }
48b1b8bc3fSopenharmony_ci                std::unique_lock<std::mutex> needRunLock(needRunMutex_);
49b1b8bc3fSopenharmony_ci                needRunCondition_.wait_for(needRunLock, std::chrono::seconds(1), [this] { return !needRun_; });
50b1b8bc3fSopenharmony_ci                std::lock_guard<std::mutex> guard(mutex_);
51b1b8bc3fSopenharmony_ci                index_ = (index_ + 1) % (ARRAY_SIZE + DELAYED_COUNT);
52b1b8bc3fSopenharmony_ci            }
53b1b8bc3fSopenharmony_ci        });
54b1b8bc3fSopenharmony_ci    }
55b1b8bc3fSopenharmony_ci
56b1b8bc3fSopenharmony_ci    ~DelayedQueue()
57b1b8bc3fSopenharmony_ci    {
58b1b8bc3fSopenharmony_ci        // set needRun_ = false, and notify the thread to wake
59b1b8bc3fSopenharmony_ci        needRun_ = false;
60b1b8bc3fSopenharmony_ci        needRunCondition_.notify_all();
61b1b8bc3fSopenharmony_ci        if (pthread_.joinable()) {
62b1b8bc3fSopenharmony_ci            pthread_.join();
63b1b8bc3fSopenharmony_ci        }
64b1b8bc3fSopenharmony_ci    }
65b1b8bc3fSopenharmony_ci
66b1b8bc3fSopenharmony_ci    void Put(const std::shared_ptr<T> &elem)
67b1b8bc3fSopenharmony_ci    {
68b1b8bc3fSopenharmony_ci        std::lock_guard<std::mutex> guard(mutex_);
69b1b8bc3fSopenharmony_ci        if (indexMap_.find(elem) != indexMap_.end()) {
70b1b8bc3fSopenharmony_ci            int oldIndex = indexMap_[elem];
71b1b8bc3fSopenharmony_ci            if (oldIndex >= 0 && oldIndex < static_cast<int>(elems_.size()) &&
72b1b8bc3fSopenharmony_ci                (elems_[oldIndex].find(elem) != elems_[oldIndex].end())) {
73b1b8bc3fSopenharmony_ci                elems_[oldIndex].erase(elem);
74b1b8bc3fSopenharmony_ci            }
75b1b8bc3fSopenharmony_ci        }
76b1b8bc3fSopenharmony_ci        int index = (index_ + DELAYED_COUNT) % (ARRAY_SIZE + DELAYED_COUNT);
77b1b8bc3fSopenharmony_ci        elems_[index].insert(elem);
78b1b8bc3fSopenharmony_ci        indexMap_[elem] = index;
79b1b8bc3fSopenharmony_ci    }
80b1b8bc3fSopenharmony_ci
81b1b8bc3fSopenharmony_ciprivate:
82b1b8bc3fSopenharmony_ci    std::thread pthread_;
83b1b8bc3fSopenharmony_ci    int index_;
84b1b8bc3fSopenharmony_ci    std::mutex mutex_;
85b1b8bc3fSopenharmony_ci    std::atomic_bool needRun_;
86b1b8bc3fSopenharmony_ci    std::condition_variable needRunCondition_;
87b1b8bc3fSopenharmony_ci    std::mutex needRunMutex_;
88b1b8bc3fSopenharmony_ci    std::array<std::set<std::shared_ptr<T>, std::owner_less<std::shared_ptr<T>>>, ARRAY_SIZE + DELAYED_COUNT> elems_;
89b1b8bc3fSopenharmony_ci    std::map<std::shared_ptr<T>, int, std::owner_less<std::shared_ptr<T>>> indexMap_;
90b1b8bc3fSopenharmony_ci};
91b1b8bc3fSopenharmony_ci} // namespace OHOS::NetManagerStandard
92b1b8bc3fSopenharmony_ci
93b1b8bc3fSopenharmony_ci#endif // COMMUNICATION_NETMANAGER_BASE_DELAYED_QUEUE_H
94