1/*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <atomic>
17#include <mutex>
18#include <ostream>
19#include <queue>
20#include <climits>
21#include <iostream>
22#include "utils_log.h"
23#include "common_event_sys_errors.h"
24#include "io_event_epoll.h"
25#include "io_event_reactor.h"
26
27namespace OHOS {
28namespace Utils {
29
30IOEventReactor::IOEventReactor()
31    :loopReady_(false), enabled_(false), count_(0), ioHandlers_(INIT_FD_NUMS), backend_(new IOEventEpoll()) {}
32
33IOEventReactor::~IOEventReactor()
34{
35    CleanUp();
36}
37
38ErrCode IOEventReactor::SetUp()
39{
40    if (backend_ == nullptr) {
41        backend_ = std::make_unique<IOEventEpoll>();
42    }
43
44    ErrCode res = backend_->SetUp();
45    if (res != EVENT_SYS_ERR_OK) {
46        UTILS_LOGE("%{public}s: Backend start failed.", __FUNCTION__);
47        return res;
48    }
49
50    loopReady_ = true;
51    return res;
52}
53
54void IOEventReactor::InsertNodeFront(int fd, IOEventHandler* target)
55{
56    IOEventHandler* h = ioHandlers_[fd].head.get();
57    target->next_ = h->next_;
58    target->prev_ = h;
59    if (h->next_ != nullptr) {
60        h->next_->prev_ = target;
61    }
62    h->next_ = target;
63}
64
65void IOEventReactor::RemoveNode(IOEventHandler* target)
66{
67    target->prev_->next_ = target->next_;
68
69    if (target->next_ != nullptr) {
70        target->next_->prev_ = target->prev_;
71    }
72
73    target->prev_ = nullptr;
74    target->next_ = nullptr;
75}
76
77ErrCode IOEventReactor::AddHandler(IOEventHandler* target)
78{
79    if (target == nullptr) {
80        return EVENT_SYS_ERR_NOT_FOUND;
81    }
82
83    if (target->fd_ == -1) {
84        UTILS_LOGE("%{public}s: Failed, Bad fd.", __FUNCTION__);
85        return EVENT_SYS_ERR_BADF;
86    }
87    if (target->prev_!=nullptr) {
88        UTILS_LOGW("%{public}s: Warning, already started.", __FUNCTION__);
89        return EVENT_SYS_ERR_ALREADY_STARTED;
90    }
91
92    std::lock_guard<std::mutex> lock(mutex_);
93    int fd = target->fd_;
94    if (static_cast<size_t>(fd) > ioHandlers_.size() - 1u) {
95        UTILS_LOGD("%{public}s: Resize when fd: %{public}d", __FUNCTION__, fd);
96        ioHandlers_.resize(fd * EXPANSION_COEFF);
97    }
98
99    InsertNodeFront(fd, target);
100
101    if ((ioHandlers_[fd].events & target->events_) != target->events_) {
102        if (backend_ == nullptr || !UpdateToDemultiplexer(target->fd_)) {
103            UTILS_LOGE("%{public}s: Update fd: %{public}d to backend failed.", __FUNCTION__, target->fd_);
104            return EVENT_SYS_ERR_FAILED;
105        }
106    }
107
108    target->enabled_ = true;
109    count_++;
110    return EVENT_SYS_ERR_OK;
111}
112
113ErrCode IOEventReactor::UpdateHandler(IOEventHandler* target)
114{
115    if (target == nullptr) {
116        return EVENT_SYS_ERR_NOT_FOUND;
117    }
118
119    if (target->fd_ == -1) {
120        UTILS_LOGE("%{public}s: Failed, Bad fd.", __FUNCTION__);
121        return EVENT_SYS_ERR_BADF;
122    }
123
124    if (target->prev_!=nullptr) {
125        if (!HasHandler(target)) {
126            UTILS_LOGE("%{public}s: Failed, handler not found.", __FUNCTION__);
127            return EVENT_SYS_ERR_NOT_FOUND;
128        }
129        if (backend_ == nullptr || !UpdateToDemultiplexer(target->fd_)) {
130            UTILS_LOGE("%{public}s: Update fd: %{public}d to backend failed.", __FUNCTION__, target->fd_);
131            return EVENT_SYS_ERR_FAILED;
132        }
133        return EVENT_SYS_ERR_OK;
134    }
135
136    return AddHandler(target);
137}
138
139ErrCode IOEventReactor::RemoveHandler(IOEventHandler* target)
140{
141    if (target == nullptr) {
142        return EVENT_SYS_ERR_NOT_FOUND;
143    }
144
145    if (target->fd_ == -1) {
146        UTILS_LOGE("%{public}s: Failed, Bad fd.", __FUNCTION__);
147        return EVENT_SYS_ERR_BADF;
148    }
149
150    target->enabled_ = false;
151    std::lock_guard<std::mutex> lock(mutex_);
152
153    if (!HasHandler(target)) {
154        UTILS_LOGE("%{public}s Failed. Handler not found.", __FUNCTION__);
155        target->enabled_=true;
156        return EVENT_SYS_ERR_NOT_FOUND;
157    }
158
159    RemoveNode(target);
160
161    if (backend_ == nullptr || !UpdateToDemultiplexer(target->fd_)) {
162        UTILS_LOGE("%{public}s: Update fd: %{public}d to backend failed.", __FUNCTION__, target->fd_);
163        target->enabled_=true;
164        return EVENT_SYS_ERR_FAILED;
165    }
166
167    count_--;
168    return EVENT_SYS_ERR_OK;
169}
170
171bool IOEventReactor::HasHandler(IOEventHandler* target)
172{
173    for (IOEventHandler* cur = ioHandlers_[target->fd_].head.get(); cur != nullptr; cur = cur->next_) {
174        if (cur == target) {
175            return true;
176        }
177    }
178
179    return false;
180}
181
182ErrCode IOEventReactor::FindHandler(IOEventHandler* target)
183{
184    if (target == nullptr) {
185        return EVENT_SYS_ERR_NOT_FOUND;
186    }
187
188    if (target->fd_ == -1) {
189        UTILS_LOGD("%{public}s: Failed, Bad fd.", __FUNCTION__);
190        return EVENT_SYS_ERR_BADF;
191    }
192
193    std::lock_guard<std::mutex> lock(mutex_);
194
195    if (!HasHandler(target)) {
196        UTILS_LOGD("%{public}s: Handler not found.", __FUNCTION__);
197        return EVENT_SYS_ERR_NOT_FOUND;
198    }
199
200    return EVENT_SYS_ERR_OK;
201}
202
203bool IOEventReactor::UpdateToDemultiplexer(int fd)
204{
205    uint32_t emask = 0u;
206    for (IOEventHandler* cur = ioHandlers_[fd].head.get(); cur != nullptr; cur = cur->next_) {
207        emask |= cur->events_;
208    }
209
210    if (emask == ioHandlers_[fd].events) {
211        UTILS_LOGW("%{public}s: Warning, Interested events not changed.", __FUNCTION__);
212        return true;
213    }
214
215    ErrCode res = backend_->ModifyEvents(fd, emask);
216    if (res != EVENT_SYS_ERR_OK) {
217        UTILS_LOGE("%{public}s: Modify events on backend failed. fd: %{public}d, \
218                   new event: %{public}d, error code: %{public}d", __FUNCTION__, fd, emask, res);
219        return false;
220    }
221
222    ioHandlers_[fd].events = emask;
223    return true;
224}
225
226void IOEventReactor::Execute(const std::vector<EventCallback>& tasks)
227{
228    for (const EventCallback& cb : tasks) {
229        cb();
230    }
231}
232
233ErrCode IOEventReactor::HandleEvents(int fd, EventId event)
234{
235    std::vector<EventCallback> taskQue;
236    {
237        std::lock_guard<std::mutex> lock(mutex_);
238        if (!(ioHandlers_[fd].events & event)) {
239            UTILS_LOGD("%{public}s: Non-interested event: %{public}d with fd: %{public}d, interested events: \
240                       %{public}d", __FUNCTION__, event, fd, ioHandlers_[fd].events);
241            return EVENT_SYS_ERR_BADEVENT;
242        }
243
244        for (IOEventHandler* cur = ioHandlers_[fd].head.get()->next_; cur != nullptr; cur = cur->next_) {
245            if (cur->events_ != Events::EVENT_NONE && cur->enabled_ && (cur->events_ & event) && cur->cb_) {
246                taskQue.push_back(cur->cb_);
247                UTILS_LOGD("%{public}s: Handling event success: %{public}d with fd: %{public}d; \
248                           handler interested events: %{public}d, active-status: %{public}d", \
249                           __FUNCTION__, event, fd, cur->events_, cur->enabled_);
250            } else {
251                UTILS_LOGD("%{public}s: Handling event ignore: %{public}d with fd: %{public}d; \
252                           handler interested events: %{public}d, active-status: %{public}d", \
253                           __FUNCTION__, event, fd, cur->events_, cur->enabled_);
254            }
255        }
256    }
257
258    Execute(taskQue);
259    return EVENT_SYS_ERR_OK;
260}
261
262void IOEventReactor::HandleAll(const std::vector<std::pair<int, EventId>>& events)
263{
264    for (size_t idx = 0u; idx < events.size(); idx++) {
265        int fd = events[idx].first;
266        EventId event = events[idx].second;
267
268        UTILS_LOGD("%{public}s: Processing. Handling event: %{public}d, with fd: %{public}d.", \
269                   __FUNCTION__, event, fd);
270
271        if (HandleEvents(fd, event) == EVENT_SYS_ERR_BADEVENT) {
272            UTILS_LOGD("%{public}s: Received non-interested events-%{public}d.", __FUNCTION__, event);
273        }
274    }
275}
276
277void IOEventReactor::Run(int timeout)
278{
279    std::vector<std::pair<int, EventId>> gotEvents;
280    while (loopReady_) {
281        if (!enabled_) {
282            continue;
283        }
284        ErrCode res;
285        if (timeout == -1) {
286            std::lock_guard<std::mutex> lock(mutex_);
287            if (count_ ==0) {
288                continue;
289            }
290            res = backend_->Polling(timeout, gotEvents);
291        } else {
292            res = backend_->Polling(timeout, gotEvents);
293        }
294
295        switch (res) {
296            case EVENT_SYS_ERR_OK:
297                HandleAll(gotEvents);
298                gotEvents.clear();
299                break;
300            case EVENT_SYS_ERR_NOEVENT:
301                UTILS_LOGD("%{public}s: No events captured.", __FUNCTION__);
302                break;
303            case EVENT_SYS_ERR_FAILED:
304                UTILS_LOGE("%{public}s: Backends failed.", __FUNCTION__);
305                break;
306            default:
307                break;
308        }
309    }
310}
311
312bool IOEventReactor::DoClean(int fd)
313{
314    if (ioHandlers_[fd].head->next_ == nullptr) {
315        return true;
316    }
317
318    for (IOEventHandler* cur = ioHandlers_[fd].head->next_; cur != nullptr; cur = cur->next_) {
319        cur->prev_->next_ = nullptr;
320        cur->prev_ = nullptr;
321        cur->enabled_ = false;
322    }
323
324    if (!UpdateToDemultiplexer(fd)) {
325        UTILS_LOGD("%{public}s: Clear handler list success, while updating backend failed.", __FUNCTION__);
326        return false;
327    }
328
329    return true;
330}
331
332ErrCode IOEventReactor::CleanUp()
333{
334    std::lock_guard<std::mutex> lock(mutex_);
335    ErrCode res = EVENT_SYS_ERR_OK;
336    for (size_t fd = 0u; fd < ioHandlers_.size() && fd <= INT_MAX; fd++) {
337        if (!DoClean(fd)) {
338            UTILS_LOGD("%{public}s Failed.", __FUNCTION__);
339            res = EVENT_SYS_ERR_FAILED;
340        }
341    }
342
343    return res;
344}
345
346ErrCode IOEventReactor::Clean(int fd)
347{
348    if (fd == -1) {
349        UTILS_LOGD("%{public}s: Failed, bad fd.", __FUNCTION__);
350        return EVENT_SYS_ERR_BADF;
351    }
352
353    std::lock_guard<std::mutex> lock(mutex_);
354    if (!DoClean(fd)) {
355        UTILS_LOGD("%{public}s: Failed.", __FUNCTION__);
356        return EVENT_SYS_ERR_FAILED;
357    }
358
359    return EVENT_SYS_ERR_OK;
360}
361
362} // namespace Utils
363} // namespace OHOS
364