1/*
2 * Copyright (c) 2021-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <algorithm>
17#include <map>
18
19#include "event_manager.h"
20
21#include "napi_utils.h"
22#include "netstack_log.h"
23
24namespace OHOS::NetStack {
25static constexpr const int CALLBACK_PARAM_NUM = 1;
26static constexpr const int ASYNC_CALLBACK_PARAM_NUM = 2;
27static constexpr const char *ON_HEADER_RECEIVE = "headerReceive";
28static constexpr const char *ON_HEADERS_RECEIVE = "headersReceive";
29
30EventManager::EventManager() : data_(nullptr), eventRef_(nullptr), isDestroy_(false) {}
31
32EventManager::~EventManager()
33{
34    NETSTACK_LOGD("EventManager is destructed by the destructor");
35}
36
37void EventManager::AddListener(napi_env env, const std::string &type, napi_value callback, bool once,
38                               bool asyncCallback)
39{
40    std::lock_guard lock(mutexForListenersAndEmitByUv_);
41    auto it = std::remove_if(listeners_.begin(), listeners_.end(),
42                             [type](const EventListener &listener) -> bool { return listener.MatchType(type); });
43    if (it != listeners_.end()) {
44        listeners_.erase(it, listeners_.end());
45    }
46
47    listeners_.emplace_back(GetCurrentThreadId(), env, type, callback, once, asyncCallback);
48}
49
50void EventManager::DeleteListener(const std::string &type, napi_value callback)
51{
52    std::lock_guard lock(mutexForListenersAndEmitByUv_);
53    auto it =
54        std::remove_if(listeners_.begin(), listeners_.end(), [type, callback](const EventListener &listener) -> bool {
55            return listener.Match(type, callback);
56        });
57    listeners_.erase(it, listeners_.end());
58}
59
60void EventManager::Emit(const std::string &type, const std::pair<napi_value, napi_value> &argv)
61{
62    std::lock_guard lock(mutexForEmitAndEmitByUv_);
63    auto listeners = listeners_;
64    std::for_each(listeners.begin(), listeners.end(), [type, argv](const EventListener &listener) {
65        if (listener.IsAsyncCallback()) {
66            /* AsyncCallback(BusinessError error, T data) */
67            napi_value arg[ASYNC_CALLBACK_PARAM_NUM] = {argv.first, argv.second};
68            listener.Emit(type, ASYNC_CALLBACK_PARAM_NUM, arg);
69        } else {
70            /* Callback(T data) */
71            napi_value arg[CALLBACK_PARAM_NUM] = {argv.second};
72            listener.Emit(type, CALLBACK_PARAM_NUM, arg);
73        }
74    });
75
76    auto it = std::remove_if(listeners_.begin(), listeners_.end(),
77                             [type](const EventListener &listener) -> bool { return listener.MatchOnce(type); });
78    listeners_.erase(it, listeners_.end());
79}
80
81void EventManager::SetData(void *data)
82{
83    std::lock_guard<std::mutex> lock(dataMutex_);
84    data_ = data;
85}
86
87void *EventManager::GetData()
88{
89    std::lock_guard<std::mutex> lock(dataMutex_);
90    return data_;
91}
92
93void EventManager::EmitByUvWithoutCheckShared(const std::string &type, void *data, void (*Handler)(uv_work_t *, int))
94{
95    std::lock_guard lock1(mutexForEmitAndEmitByUv_);
96    std::lock_guard lock2(mutexForListenersAndEmitByUv_);
97    bool foundHeader = std::find_if(listeners_.begin(), listeners_.end(), [](const EventListener &listener) {
98        return listener.MatchType(ON_HEADER_RECEIVE);
99    }) != listeners_.end();
100
101    bool foundHeaders = std::find_if(listeners_.begin(), listeners_.end(), [](const EventListener &listener) {
102        return listener.MatchType(ON_HEADERS_RECEIVE);
103    }) != listeners_.end();
104    if (!foundHeader && !foundHeaders) {
105        if (type == ON_HEADER_RECEIVE || type == ON_HEADERS_RECEIVE) {
106            auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
107            delete tempMap;
108        }
109    } else if (foundHeader && !foundHeaders) {
110        if (type == ON_HEADERS_RECEIVE) {
111            auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
112            delete tempMap;
113        }
114    } else if (!foundHeader) {
115        if (type == ON_HEADER_RECEIVE) {
116            auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
117            delete tempMap;
118        }
119    }
120
121    std::for_each(listeners_.begin(), listeners_.end(), [type, data, Handler, this](const EventListener &listener) {
122        if (listener.MatchType(type)) {
123            auto workWrapper = new UvWorkWrapperShared(data, listener.GetEnv(), type, shared_from_this());
124            listener.EmitByUv(type, workWrapper, Handler);
125        }
126    });
127}
128
129void EventManager::SetQueueData(void *data)
130{
131    std::lock_guard<std::mutex> lock(dataQueueMutex_);
132    dataQueue_.push(data);
133}
134
135void *EventManager::GetQueueData()
136{
137    std::lock_guard<std::mutex> lock(dataQueueMutex_);
138    if (!dataQueue_.empty()) {
139        auto data = dataQueue_.front();
140        dataQueue_.pop();
141        return data;
142    }
143    NETSTACK_LOGE("eventManager data queue is empty");
144    return nullptr;
145}
146
147void EventManager::EmitByUvWithoutCheck(const std::string &type, void *data, void(Handler)(uv_work_t *, int status))
148{
149    std::lock_guard lock1(mutexForEmitAndEmitByUv_);
150    std::lock_guard lock2(mutexForListenersAndEmitByUv_);
151    bool foundHeader = std::find_if(listeners_.begin(), listeners_.end(), [](const EventListener &listener) {
152                           return listener.MatchType(ON_HEADER_RECEIVE);
153                       }) != listeners_.end();
154    bool foundHeaders = std::find_if(listeners_.begin(), listeners_.end(), [](const EventListener &listener) {
155                            return listener.MatchType(ON_HEADERS_RECEIVE);
156                        }) != listeners_.end();
157    if (!foundHeader && !foundHeaders) {
158        if (type == ON_HEADER_RECEIVE || type == ON_HEADERS_RECEIVE) {
159            auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
160            delete tempMap;
161        }
162    } else if (foundHeader && !foundHeaders) {
163        if (type == ON_HEADERS_RECEIVE) {
164            auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
165            delete tempMap;
166        }
167    } else if (!foundHeader) {
168        if (type == ON_HEADER_RECEIVE) {
169            auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
170            delete tempMap;
171        }
172    }
173
174    std::for_each(listeners_.begin(), listeners_.end(), [type, data, Handler, this](const EventListener &listener) {
175        if (listener.MatchType(type)) {
176            auto workWrapper = new UvWorkWrapper(data, listener.GetEnv(), type, this);
177            listener.EmitByUv(type, workWrapper, Handler);
178        }
179    });
180}
181
182void EventManager::EmitByUv(const std::string &type, void *data, void(Handler)(uv_work_t *, int status))
183{
184    std::lock_guard lock1(mutexForEmitAndEmitByUv_);
185    std::lock_guard lock2(mutexForListenersAndEmitByUv_);
186    if (!EventManager::IsManagerValid(this)) {
187        return;
188    }
189    bool foundHeader = std::find_if(listeners_.begin(), listeners_.end(), [](const EventListener &listener) {
190                           return listener.MatchType(ON_HEADER_RECEIVE);
191                       }) != listeners_.end();
192    bool foundHeaders = std::find_if(listeners_.begin(), listeners_.end(), [](const EventListener &listener) {
193                            return listener.MatchType(ON_HEADERS_RECEIVE);
194                        }) != listeners_.end();
195    if (!foundHeader && !foundHeaders) {
196        if (type == ON_HEADER_RECEIVE || type == ON_HEADERS_RECEIVE) {
197            auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
198            delete tempMap;
199        }
200    } else if (foundHeader && !foundHeaders) {
201        if (type == ON_HEADERS_RECEIVE) {
202            auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
203            delete tempMap;
204        }
205    } else if (!foundHeader) {
206        if (type == ON_HEADER_RECEIVE) {
207            auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
208            delete tempMap;
209        }
210    }
211
212    std::for_each(listeners_.begin(), listeners_.end(), [type, data, Handler, this](const EventListener &listener) {
213        if (listener.MatchType(type)) {
214            auto workWrapper = new UvWorkWrapper(data, listener.GetEnv(), type, this);
215            listener.EmitByUv(type, workWrapper, Handler);
216        }
217    });
218}
219
220bool EventManager::HasEventListener(const std::string &type)
221{
222    std::lock_guard lock(mutexForListenersAndEmitByUv_);
223    return std::any_of(listeners_.begin(), listeners_.end(),
224                       [&type](const EventListener &listener) -> bool { return listener.MatchType(type); });
225}
226
227void EventManager::DeleteListener(const std::string &type)
228{
229    std::lock_guard lock(mutexForListenersAndEmitByUv_);
230    auto it = std::remove_if(listeners_.begin(), listeners_.end(),
231                             [type](const EventListener &listener) -> bool { return listener.MatchType(type); });
232    listeners_.erase(it, listeners_.end());
233}
234
235std::unordered_set<EventManager *> EventManager::validManager_;
236std::mutex EventManager::mutexForManager_;
237EventManagerMagic EventManager::magic_;
238
239void EventManager::SetInvalid(EventManager *manager)
240{
241    if (magic_.magicNumber_ != EVENT_MANAGER_MAGIC_NUMBER) {
242        return;
243    }
244    std::lock_guard lock(mutexForManager_);
245    auto pos = validManager_.find(manager);
246    if (pos == validManager_.end()) {
247        NETSTACK_LOGE("The manager is not in the unordered_set");
248        return;
249    }
250    validManager_.erase(pos);
251    delete manager;
252    manager = nullptr;
253}
254
255bool EventManager::IsManagerValid(EventManager *manager)
256{
257    if (magic_.magicNumber_ != EVENT_MANAGER_MAGIC_NUMBER) {
258        return false;
259    }
260    std::lock_guard lock(mutexForManager_);
261    return validManager_.find(manager) != validManager_.end();
262}
263
264void EventManager::SetValid(EventManager *manager)
265{
266    if (magic_.magicNumber_ != EVENT_MANAGER_MAGIC_NUMBER) {
267        return;
268    }
269    std::lock_guard lock(mutexForManager_);
270    validManager_.emplace(manager);
271}
272
273void EventManager::CreateEventReference(napi_env env, napi_value value)
274{
275    if (env != nullptr && value != nullptr) {
276        eventRef_ = NapiUtils::CreateReference(env, value);
277    }
278}
279
280void EventManager::DeleteEventReference(napi_env env)
281{
282    if (env != nullptr && eventRef_ != nullptr) {
283        NapiUtils::DeleteReference(env, eventRef_);
284    }
285    eventRef_ = nullptr;
286}
287
288void EventManager::SetEventDestroy(bool flag)
289{
290    isDestroy_.store(flag);
291}
292
293bool EventManager::IsEventDestroy()
294{
295    return isDestroy_.load();
296}
297
298const std::string &EventManager::GetWebSocketTextData()
299{
300    return webSocketTextData_;
301}
302
303void EventManager::AppendWebSocketTextData(void *data, size_t length)
304{
305    webSocketTextData_.append(reinterpret_cast<char *>(data), length);
306}
307
308const std::string &EventManager::GetWebSocketBinaryData()
309{
310    return webSocketBinaryData_;
311}
312
313void EventManager::AppendWebSocketBinaryData(void *data, size_t length)
314{
315    webSocketBinaryData_.append(reinterpret_cast<char *>(data), length);
316}
317
318void EventManager::ClearWebSocketTextData()
319{
320    webSocketTextData_.clear();
321}
322
323void EventManager::ClearWebSocketBinaryData()
324{
325    webSocketBinaryData_.clear();
326}
327
328void EventManager::NotifyRcvThdExit()
329{
330    std::unique_lock<std::mutex> lock(sockRcvThdMtx_);
331    sockRcvExit_ = true;
332    sockRcvThdCon_.notify_one();
333}
334
335void EventManager::WaitForRcvThdExit()
336{
337    std::unique_lock<std::mutex> lock(sockRcvThdMtx_);
338    sockRcvThdCon_.wait(lock, [this]() { return sockRcvExit_; });
339}
340
341void EventManager::SetReuseAddr(bool reuse)
342{
343    isReuseAddr_.store(reuse);
344}
345
346bool EventManager::GetReuseAddr()
347{
348    return isReuseAddr_.load();
349}
350
351void EventManager::SetWebSocketUserData(const std::shared_ptr<Websocket::UserData> &userData)
352{
353    std::lock_guard<std::mutex> lock(dataMutex_);
354    webSocketUserData_ = userData;
355}
356
357std::shared_ptr<Websocket::UserData> EventManager::GetWebSocketUserData()
358{
359    std::lock_guard<std::mutex> lock(dataMutex_);
360    return webSocketUserData_;
361}
362
363UvWorkWrapper::UvWorkWrapper(void *theData, napi_env theEnv, std::string eventType, EventManager *eventManager)
364    : data(theData), env(theEnv), type(std::move(eventType)), manager(eventManager)
365{
366}
367
368UvWorkWrapperShared::UvWorkWrapperShared(void *theData, napi_env theEnv, std::string eventType,
369                                         const std::shared_ptr<EventManager> &eventManager)
370    : data(theData), env(theEnv), type(std::move(eventType)), manager(eventManager)
371{
372}
373} // namespace OHOS::NetStack
374