1/*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#ifndef COMMUNICATION_NETMANAGER_BASE_DELAYED_QUEUE_H
17#define COMMUNICATION_NETMANAGER_BASE_DELAYED_QUEUE_H
18
19#include <array>
20#include <atomic>
21#include <condition_variable>
22#include <map>
23#include <memory>
24#include <mutex>
25#include <set>
26#include <thread>
27
28namespace OHOS::NetManagerStandard {
29template <typename T, size_t ARRAY_SIZE, size_t DELAYED_COUNT> class DelayedQueue {
30public:
31    DelayedQueue() : index_(0), needRun_(true)
32    {
33        pthread_ = std::thread([this]() {
34            while (needRun_) {
35                {
36                    std::lock_guard<std::mutex> guard(mutex_);
37                    for (const auto &elem : elems_[index_]) {
38                        if (elem) {
39                            elem->Execute();
40                        }
41                        indexMap_.erase(elem);
42                    }
43                    elems_[index_].clear();
44                }
45                if (!needRun_) {
46                    break;
47                }
48                std::unique_lock<std::mutex> needRunLock(needRunMutex_);
49                needRunCondition_.wait_for(needRunLock, std::chrono::seconds(1), [this] { return !needRun_; });
50                std::lock_guard<std::mutex> guard(mutex_);
51                index_ = (index_ + 1) % (ARRAY_SIZE + DELAYED_COUNT);
52            }
53        });
54    }
55
56    ~DelayedQueue()
57    {
58        // set needRun_ = false, and notify the thread to wake
59        needRun_ = false;
60        needRunCondition_.notify_all();
61        if (pthread_.joinable()) {
62            pthread_.join();
63        }
64    }
65
66    void Put(const std::shared_ptr<T> &elem)
67    {
68        std::lock_guard<std::mutex> guard(mutex_);
69        if (indexMap_.find(elem) != indexMap_.end()) {
70            int oldIndex = indexMap_[elem];
71            if (oldIndex >= 0 && oldIndex < static_cast<int>(elems_.size()) &&
72                (elems_[oldIndex].find(elem) != elems_[oldIndex].end())) {
73                elems_[oldIndex].erase(elem);
74            }
75        }
76        int index = (index_ + DELAYED_COUNT) % (ARRAY_SIZE + DELAYED_COUNT);
77        elems_[index].insert(elem);
78        indexMap_[elem] = index;
79    }
80
81private:
82    std::thread pthread_;
83    int index_;
84    std::mutex mutex_;
85    std::atomic_bool needRun_;
86    std::condition_variable needRunCondition_;
87    std::mutex needRunMutex_;
88    std::array<std::set<std::shared_ptr<T>, std::owner_less<std::shared_ptr<T>>>, ARRAY_SIZE + DELAYED_COUNT> elems_;
89    std::map<std::shared_ptr<T>, int, std::owner_less<std::shared_ptr<T>>> indexMap_;
90};
91} // namespace OHOS::NetManagerStandard
92
93#endif // COMMUNICATION_NETMANAGER_BASE_DELAYED_QUEUE_H
94