1/*
2 * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#ifndef CHANNEL_H
17#define CHANNEL_H
18
19#include <condition_variable>
20#include <deque>
21#include <memory>
22#include <mutex>
23#include <type_traits>
24
25namespace OHOS {
26namespace Msdp {
27namespace DeviceStatus {
28
29template<typename Event>
30class Channel {
31    static_assert(std::is_enum_v<Event> || std::is_integral_v<Event> ||
32                  (std::is_class_v<Event> &&
33                   std::is_default_constructible_v<Event> &&
34                   std::is_copy_constructible_v<Event>));
35
36public:
37    enum ChannelError {
38        NO_ERROR = 0,
39        QUEUE_IS_FULL = -1,
40        NO_CHANNEL = -2,
41        INACTIVE_CHANNEL = -3,
42    };
43
44    class Sender final {
45        friend class Channel<Event>;
46
47    public:
48        Sender() = default;
49        ~Sender() = default;
50
51        Sender(const Sender &other)
52            : channel_(other.channel_)
53        {}
54
55        Sender(Sender &&other)
56            : channel_(other.channel_)
57        {
58            other.channel_ = nullptr;
59        }
60
61        Sender& operator=(const Sender &other)
62        {
63            channel_ = other.channel_;
64            return *this;
65        }
66
67        Sender& operator=(Sender &&other)
68        {
69            channel_ = other.channel_;
70            other.channel_ = nullptr;
71            return *this;
72        }
73
74        int32_t Send(const Event &event)
75        {
76            if (channel_ == nullptr) {
77                return ChannelError::NO_CHANNEL;
78            }
79            return channel_->Send(event);
80        }
81
82    private:
83        Sender(std::shared_ptr<Channel<Event>> channel)
84            : channel_(channel)
85        {}
86
87        std::shared_ptr<Channel<Event>> channel_ { nullptr };
88    };
89
90    class Receiver final {
91        friend class Channel<Event>;
92
93    public:
94        Receiver() = default;
95        ~Receiver() = default;
96
97        Receiver(const Receiver &other)
98            : channel_(other.channel_)
99        {}
100
101        Receiver(Receiver &&other)
102            : channel_(other.channel_)
103        {
104            other.channel_ = nullptr;
105        }
106
107        Receiver& operator=(const Receiver &other)
108        {
109            channel_ = other.channel_;
110            return *this;
111        }
112
113        Receiver& operator=(Receiver &&other)
114        {
115            channel_ = other.channel_;
116            other.channel_ = nullptr;
117            return *this;
118        }
119
120        void Enable()
121        {
122            if (channel_ != nullptr) {
123                channel_->Enable();
124            }
125        }
126
127        void Disable()
128        {
129            if (channel_ != nullptr) {
130                channel_->Disable();
131            }
132        }
133
134        Event Peek()
135        {
136            return (channel_ != nullptr ? channel_->Peek() : Event());
137        }
138
139        void Pop()
140        {
141            if (channel_ != nullptr) {
142                channel_->Pop();
143            }
144        }
145
146        Event Receive()
147        {
148            return (channel_ != nullptr ? channel_->Receive() : Event());
149        }
150
151    private:
152        Receiver(std::shared_ptr<Channel<Event>> channel)
153            : channel_(channel)
154        {}
155
156        std::shared_ptr<Channel<Event>> channel_ { nullptr };
157    };
158
159    Channel() = default;
160    ~Channel() = default;
161
162    static std::pair<Sender, Receiver> OpenChannel();
163
164private:
165    void Enable();
166    void Disable();
167    int32_t Send(const Event &event);
168    Event Peek();
169    void Pop();
170    Event Receive();
171
172    static inline constexpr size_t QUEUE_CAPACITY { 1024 };
173
174    std::mutex lock_;
175    bool isActive_ { false };
176    std::condition_variable empty_;
177    std::deque<Event> queue_;
178};
179
180template<typename Event>
181std::pair<typename Channel<Event>::Sender, typename Channel<Event>::Receiver> Channel<Event>::OpenChannel()
182{
183    std::shared_ptr<Channel<Event>> channel = std::make_shared<Channel<Event>>();
184    return std::make_pair(Channel<Event>::Sender(channel), Channel<Event>::Receiver(channel));
185}
186
187template<typename Event>
188void Channel<Event>::Enable()
189{
190    std::unique_lock<std::mutex> lock(lock_);
191    isActive_ = true;
192}
193
194template<typename Event>
195void Channel<Event>::Disable()
196{
197    std::unique_lock<std::mutex> lock(lock_);
198    isActive_ = false;
199    queue_.clear();
200}
201
202template<typename Event>
203int32_t Channel<Event>::Send(const Event &event)
204{
205    std::unique_lock<std::mutex> lock(lock_);
206    if (!isActive_) {
207        return ChannelError::INACTIVE_CHANNEL;
208    }
209    if (queue_.size() >= QUEUE_CAPACITY) {
210        return ChannelError::QUEUE_IS_FULL;
211    }
212    bool needNotify = queue_.empty();
213    queue_.push_back(event);
214    if (needNotify) {
215        empty_.notify_all();
216    }
217    return ChannelError::NO_ERROR;
218}
219
220template<typename Event>
221Event Channel<Event>::Peek()
222{
223    std::unique_lock<std::mutex> lock(lock_);
224    if (queue_.empty()) {
225        empty_.wait(lock, [this] {
226            return !queue_.empty();
227        });
228    }
229    return queue_.front();
230}
231
232template<typename Event>
233void Channel<Event>::Pop()
234{
235    std::unique_lock<std::mutex> lock(lock_);
236    if (queue_.empty()) {
237        empty_.wait(lock, [this] {
238            return !queue_.empty();
239        });
240    }
241    queue_.pop_front();
242}
243
244template<typename Event>
245Event Channel<Event>::Receive()
246{
247    std::unique_lock<std::mutex> lock(lock_);
248    if (queue_.empty()) {
249        empty_.wait(lock, [this] {
250            return !queue_.empty();
251        });
252    }
253    Event event = queue_.front();
254    queue_.pop_front();
255    return event;
256}
257} // namespace DeviceStatus
258} // namespace Msdp
259} // namespace OHOS
260#endif // CHANNEL_H