1/* 2 * Copyright (c) 2023-2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16#ifndef CHANNEL_H 17#define CHANNEL_H 18 19#include <condition_variable> 20#include <deque> 21#include <memory> 22#include <mutex> 23#include <type_traits> 24 25namespace OHOS { 26namespace Msdp { 27namespace DeviceStatus { 28 29template<typename Event> 30class Channel { 31 static_assert(std::is_enum_v<Event> || std::is_integral_v<Event> || 32 (std::is_class_v<Event> && 33 std::is_default_constructible_v<Event> && 34 std::is_copy_constructible_v<Event>)); 35 36public: 37 enum ChannelError { 38 NO_ERROR = 0, 39 QUEUE_IS_FULL = -1, 40 NO_CHANNEL = -2, 41 INACTIVE_CHANNEL = -3, 42 }; 43 44 class Sender final { 45 friend class Channel<Event>; 46 47 public: 48 Sender() = default; 49 ~Sender() = default; 50 51 Sender(const Sender &other) 52 : channel_(other.channel_) 53 {} 54 55 Sender(Sender &&other) 56 : channel_(other.channel_) 57 { 58 other.channel_ = nullptr; 59 } 60 61 Sender& operator=(const Sender &other) 62 { 63 channel_ = other.channel_; 64 return *this; 65 } 66 67 Sender& operator=(Sender &&other) 68 { 69 channel_ = other.channel_; 70 other.channel_ = nullptr; 71 return *this; 72 } 73 74 int32_t Send(const Event &event) 75 { 76 if (channel_ == nullptr) { 77 return ChannelError::NO_CHANNEL; 78 } 79 return channel_->Send(event); 80 } 81 82 private: 83 Sender(std::shared_ptr<Channel<Event>> channel) 84 : channel_(channel) 85 {} 86 87 std::shared_ptr<Channel<Event>> channel_ { nullptr }; 88 }; 89 90 class Receiver final { 91 friend class Channel<Event>; 92 93 public: 94 Receiver() = default; 95 ~Receiver() = default; 96 97 Receiver(const Receiver &other) 98 : channel_(other.channel_) 99 {} 100 101 Receiver(Receiver &&other) 102 : channel_(other.channel_) 103 { 104 other.channel_ = nullptr; 105 } 106 107 Receiver& operator=(const Receiver &other) 108 { 109 channel_ = other.channel_; 110 return *this; 111 } 112 113 Receiver& operator=(Receiver &&other) 114 { 115 channel_ = other.channel_; 116 other.channel_ = nullptr; 117 return *this; 118 } 119 120 void Enable() 121 { 122 if (channel_ != nullptr) { 123 channel_->Enable(); 124 } 125 } 126 127 void Disable() 128 { 129 if (channel_ != nullptr) { 130 channel_->Disable(); 131 } 132 } 133 134 Event Peek() 135 { 136 return (channel_ != nullptr ? channel_->Peek() : Event()); 137 } 138 139 void Pop() 140 { 141 if (channel_ != nullptr) { 142 channel_->Pop(); 143 } 144 } 145 146 Event Receive() 147 { 148 return (channel_ != nullptr ? channel_->Receive() : Event()); 149 } 150 151 private: 152 Receiver(std::shared_ptr<Channel<Event>> channel) 153 : channel_(channel) 154 {} 155 156 std::shared_ptr<Channel<Event>> channel_ { nullptr }; 157 }; 158 159 Channel() = default; 160 ~Channel() = default; 161 162 static std::pair<Sender, Receiver> OpenChannel(); 163 164private: 165 void Enable(); 166 void Disable(); 167 int32_t Send(const Event &event); 168 Event Peek(); 169 void Pop(); 170 Event Receive(); 171 172 static inline constexpr size_t QUEUE_CAPACITY { 1024 }; 173 174 std::mutex lock_; 175 bool isActive_ { false }; 176 std::condition_variable empty_; 177 std::deque<Event> queue_; 178}; 179 180template<typename Event> 181std::pair<typename Channel<Event>::Sender, typename Channel<Event>::Receiver> Channel<Event>::OpenChannel() 182{ 183 std::shared_ptr<Channel<Event>> channel = std::make_shared<Channel<Event>>(); 184 return std::make_pair(Channel<Event>::Sender(channel), Channel<Event>::Receiver(channel)); 185} 186 187template<typename Event> 188void Channel<Event>::Enable() 189{ 190 std::unique_lock<std::mutex> lock(lock_); 191 isActive_ = true; 192} 193 194template<typename Event> 195void Channel<Event>::Disable() 196{ 197 std::unique_lock<std::mutex> lock(lock_); 198 isActive_ = false; 199 queue_.clear(); 200} 201 202template<typename Event> 203int32_t Channel<Event>::Send(const Event &event) 204{ 205 std::unique_lock<std::mutex> lock(lock_); 206 if (!isActive_) { 207 return ChannelError::INACTIVE_CHANNEL; 208 } 209 if (queue_.size() >= QUEUE_CAPACITY) { 210 return ChannelError::QUEUE_IS_FULL; 211 } 212 bool needNotify = queue_.empty(); 213 queue_.push_back(event); 214 if (needNotify) { 215 empty_.notify_all(); 216 } 217 return ChannelError::NO_ERROR; 218} 219 220template<typename Event> 221Event Channel<Event>::Peek() 222{ 223 std::unique_lock<std::mutex> lock(lock_); 224 if (queue_.empty()) { 225 empty_.wait(lock, [this] { 226 return !queue_.empty(); 227 }); 228 } 229 return queue_.front(); 230} 231 232template<typename Event> 233void Channel<Event>::Pop() 234{ 235 std::unique_lock<std::mutex> lock(lock_); 236 if (queue_.empty()) { 237 empty_.wait(lock, [this] { 238 return !queue_.empty(); 239 }); 240 } 241 queue_.pop_front(); 242} 243 244template<typename Event> 245Event Channel<Event>::Receive() 246{ 247 std::unique_lock<std::mutex> lock(lock_); 248 if (queue_.empty()) { 249 empty_.wait(lock, [this] { 250 return !queue_.empty(); 251 }); 252 } 253 Event event = queue_.front(); 254 queue_.pop_front(); 255 return event; 256} 257} // namespace DeviceStatus 258} // namespace Msdp 259} // namespace OHOS 260#endif // CHANNEL_H