1/*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15#include "file_descriptor.h"
16#ifndef HDC_HOST
17#include <sys/epoll.h>
18#endif
19
20namespace Hdc {
21static const int SECONDS_TIMEOUT = 5;
22
23HdcFileDescriptor::HdcFileDescriptor(uv_loop_t *loopIn, int fdToRead, void *callerContextIn,
24                                     CallBackWhenRead callbackReadIn, CmdResultCallback callbackFinishIn,
25                                     bool interactiveShell)
26{
27    loop = loopIn;
28    workContinue = true;
29    callbackFinish = callbackFinishIn;
30    callbackRead = callbackReadIn;
31    fdIO = fdToRead;
32    refIO = 0;
33    isInteractive = interactiveShell;
34    callerContext = callerContextIn;
35    if (isInteractive) {
36        std::thread([this]() {
37            HdcFileDescriptor::IOWriteThread(this);
38        }).detach();
39    }
40}
41
42HdcFileDescriptor::~HdcFileDescriptor()
43{
44    workContinue = false;
45    if (isInteractive) {
46        NotifyWrite();
47        uv_sleep(MILL_SECONDS);
48    }
49}
50
51bool HdcFileDescriptor::ReadyForRelease()
52{
53    return refIO == 0;
54}
55
56// just tryCloseFdIo = true, callback will be effect
57void HdcFileDescriptor::StopWorkOnThread(bool tryCloseFdIo, std::function<void()> closeFdCallback)
58{
59    workContinue = false;
60    if (isInteractive) {
61        NotifyWrite();
62    }
63
64    callbackCloseFd = closeFdCallback;
65    if (tryCloseFdIo && refIO > 0) {
66        if (callbackCloseFd != nullptr) {
67            callbackCloseFd();
68        }
69    }
70}
71
72void HdcFileDescriptor::FileIOOnThread(CtxFileIO *ctxIO, int bufSize)
73{
74#ifdef CONFIG_USE_JEMALLOC_DFX_INIF
75    mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
76    mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
77#endif
78    HdcFileDescriptor *thisClass = ctxIO->thisClass;
79    uint8_t *buf = ctxIO->bufIO;
80    bool bFinish = false;
81    bool fetalFinish = false;
82    ssize_t nBytes;
83#ifndef HDC_HOST
84    constexpr int epollSize = 1;
85    int epfd = epoll_create(epollSize);
86    struct epoll_event ev;
87    struct epoll_event events[epollSize];
88    ev.data.fd = thisClass->fdIO;
89    ev.events = EPOLLIN | EPOLLET;
90    epoll_ctl(epfd, EPOLL_CTL_ADD, thisClass->fdIO, &ev);
91#endif
92    while (true) {
93        if (thisClass->workContinue == false) {
94            WRITE_LOG(LOG_INFO, "FileIOOnThread fdIO:%d workContinue false", thisClass->fdIO);
95            bFinish = true;
96            break;
97        }
98
99        if (memset_s(buf, bufSize, 0, bufSize) != EOK) {
100            WRITE_LOG(LOG_DEBUG, "FileIOOnThread buf memset_s fail.");
101            bFinish = true;
102            break;
103        }
104#ifndef HDC_HOST
105        int rc = epoll_wait(epfd, events, epollSize, SECONDS_TIMEOUT * TIME_BASE);
106#else
107        struct timeval timeout;
108        timeout.tv_sec = SECONDS_TIMEOUT;
109        timeout.tv_usec = 0;
110        fd_set rset;
111        FD_ZERO(&rset);
112        FD_SET(thisClass->fdIO, &rset);
113        int rc = select(thisClass->fdIO + 1, &rset, nullptr, nullptr, &timeout);
114#endif
115        if (rc < 0) {
116            WRITE_LOG(LOG_FATAL, "FileIOOnThread select or epoll_wait fdIO:%d error:%d",
117                thisClass->fdIO, errno);
118            if (errno == EINTR || errno == EAGAIN) {
119                continue;
120            }
121            bFinish = true;
122            break;
123        } else if (rc == 0) {
124            WRITE_LOG(LOG_WARN, "FileIOOnThread select rc = 0, timeout.");
125            continue;
126        }
127        nBytes = 0;
128#ifndef HDC_HOST
129        uint32_t event = events[0].events;
130        if ((event & EPOLLIN) && (thisClass->fdIO > 0)) {
131            nBytes = read(thisClass->fdIO, buf, bufSize);
132        }
133        if ((event & EPOLLERR) || (event & EPOLLHUP) || (event & EPOLLRDHUP)) {
134            bFinish = true;
135            fetalFinish = true;
136            if ((nBytes > 0) && !thisClass->callbackRead(thisClass->callerContext, buf, nBytes)) {
137                WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d callbackRead false", thisClass->fdIO);
138            }
139            break;
140        }
141        if (nBytes < 0 && (errno == EINTR || errno == EAGAIN)) {
142            WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d read interrupt", thisClass->fdIO);
143            continue;
144        }
145        if (nBytes > 0) {
146            if (!thisClass->callbackRead(thisClass->callerContext, buf, nBytes)) {
147                WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d callbackRead false", thisClass->fdIO);
148                bFinish = true;
149                break;
150            }
151            continue;
152        } else {
153            WRITE_LOG(LOG_INFO, "FileIOOnThread fd:%d nBytes:%d errno:%d",
154                thisClass->fdIO, nBytes, errno);
155            bFinish = true;
156            fetalFinish = true;
157            break;
158        }
159#else
160        if (thisClass->fdIO > 0) {
161            nBytes = read(thisClass->fdIO, buf, bufSize);
162        }
163        if (nBytes < 0 && (errno == EINTR || errno == EAGAIN)) {
164            WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d read interrupt", thisClass->fdIO);
165            continue;
166        }
167        if (nBytes > 0) {
168            if (!thisClass->callbackRead(thisClass->callerContext, buf, nBytes)) {
169                WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d callbackRead false", thisClass->fdIO);
170                bFinish = true;
171                break;
172            }
173            continue;
174        } else {
175            WRITE_LOG(LOG_INFO, "FileIOOnThread fd:%d nBytes:%d errno:%d",
176                thisClass->fdIO, nBytes, errno);
177            bFinish = true;
178            fetalFinish = true;
179            break;
180        }
181#endif
182    }
183#ifndef HDC_HOST
184    if ((thisClass->fdIO > 0) && (epoll_ctl(epfd, EPOLL_CTL_DEL, thisClass->fdIO, nullptr) == -1)) {
185        WRITE_LOG(LOG_INFO, "EPOLL_CTL_DEL fail fd:%d epfd:%d errno:%d",
186            thisClass->fdIO, epfd, errno);
187    }
188    close(epfd);
189#endif
190    if (buf != nullptr) {
191        delete[] buf;
192        buf = nullptr;
193    }
194    delete ctxIO;
195
196    --thisClass->refIO;
197    if (bFinish) {
198        thisClass->workContinue = false;
199        thisClass->callbackFinish(thisClass->callerContext, fetalFinish, STRING_EMPTY);
200    }
201}
202
203int HdcFileDescriptor::LoopReadOnThread()
204{
205#ifdef CONFIG_USE_JEMALLOC_DFX_INIF
206    mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
207    mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
208#endif
209    int readMax = Base::GetMaxBufSizeStable() * 1.2; // 120% of max buf size, use stable size to avoid no buf.
210    auto contextIO = new(std::nothrow) CtxFileIO();
211    auto buf = new(std::nothrow) uint8_t[readMax]();
212    if (!contextIO || !buf) {
213        if (contextIO) {
214            delete contextIO;
215        }
216        if (buf) {
217            delete[] buf;
218        }
219        WRITE_LOG(LOG_FATAL, "Memory alloc failed");
220        callbackFinish(callerContext, true, "Memory alloc failed");
221        return -1;
222    }
223    contextIO->bufIO = buf;
224    contextIO->thisClass = this;
225    ++refIO;
226    std::thread([contextIO, readMax]() {
227        HdcFileDescriptor::FileIOOnThread(contextIO, readMax);
228    }).detach();
229    return 0;
230}
231
232bool HdcFileDescriptor::StartWorkOnThread()
233{
234    if (LoopReadOnThread() < 0) {
235        return false;
236    }
237    return true;
238}
239
240int HdcFileDescriptor::Write(uint8_t *data, int size)
241{
242    if (size > static_cast<int>(HDC_BUF_MAX_BYTES - 1)) {
243        size = static_cast<int>(HDC_BUF_MAX_BYTES - 1);
244    }
245    if (size <= 0) {
246        WRITE_LOG(LOG_WARN, "Write failed, size:%d", size);
247        return -1;
248    }
249    auto buf = new(std::nothrow) uint8_t[size];
250    if (!buf) {
251        return -1;
252    }
253    if (memcpy_s(buf, size, data, size) != EOK) {
254        delete[] buf;
255        return -1;
256    }
257    return WriteWithMem(buf, size);
258}
259
260// Data's memory must be Malloc, and the callback FREE after this function is completed
261int HdcFileDescriptor::WriteWithMem(uint8_t *data, int size)
262{
263#ifdef CONFIG_USE_JEMALLOC_DFX_INIF
264    mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
265    mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
266#endif
267    auto contextIO = new(std::nothrow) CtxFileIO();
268    if (!contextIO) {
269        delete[] data;
270        WRITE_LOG(LOG_FATAL, "Memory alloc failed");
271        callbackFinish(callerContext, true, "Memory alloc failed");
272        return -1;
273    }
274    contextIO->bufIO = data;
275    contextIO->size = static_cast<size_t>(size);
276    contextIO->thisClass = this;
277    PushWrite(contextIO);
278    NotifyWrite();
279    return size;
280}
281
282void HdcFileDescriptor::IOWriteThread(void *object)
283{
284    HdcFileDescriptor *hfd = reinterpret_cast<HdcFileDescriptor *>(object);
285    while (hfd->workContinue) {
286        hfd->HandleWrite();
287        hfd->WaitWrite();
288    }
289}
290
291void HdcFileDescriptor::PushWrite(CtxFileIO *cfio)
292{
293    std::unique_lock<std::mutex> lock(writeMutex);
294    writeQueue.push(cfio);
295}
296
297CtxFileIO *HdcFileDescriptor::PopWrite()
298{
299    std::unique_lock<std::mutex> lock(writeMutex);
300    CtxFileIO *cfio = nullptr;
301    if (!writeQueue.empty()) {
302        cfio = writeQueue.front();
303        writeQueue.pop();
304    }
305    return cfio;
306}
307
308void HdcFileDescriptor::NotifyWrite()
309{
310    writeCond.notify_one();
311}
312
313void HdcFileDescriptor::WaitWrite()
314{
315    std::unique_lock<std::mutex> lock(writeMutex);
316    writeCond.wait_for(lock, std::chrono::seconds(WAIT_SECONDS), [&]() {
317        return !writeQueue.empty() || !workContinue;
318    });
319}
320
321void HdcFileDescriptor::HandleWrite()
322{
323    CtxFileIO *cfio = nullptr;
324    while ((cfio = PopWrite()) != nullptr) {
325        CtxFileIOWrite(cfio);
326        delete cfio;
327    }
328}
329
330void HdcFileDescriptor::CtxFileIOWrite(CtxFileIO *cfio)
331{
332    std::unique_lock<std::mutex> lock(writeMutex);
333    uint8_t *buf = cfio->bufIO;
334    uint8_t *data = buf;
335    size_t cnt = cfio->size;
336    constexpr int intrmax = 1000;
337    int intrcnt = 0;
338    while (cnt > 0) {
339        ssize_t rc = write(fdIO, data, cnt);
340        if (rc < 0) {
341            if (errno == EINTR || errno == EAGAIN) {
342                if (++intrcnt > intrmax) {
343                    WRITE_LOG(LOG_WARN, "CtxFileIOWrite fdIO:%d interrupt errno:%d", fdIO, errno);
344                    intrcnt = 0;
345                }
346                continue;
347            } else {
348                WRITE_LOG(LOG_FATAL, "CtxFileIOWrite fdIO:%d rc:%d error:%d", fdIO, rc, errno);
349                break;
350            }
351        }
352        data += rc;
353        cnt -= static_cast<size_t>(rc);
354    }
355    delete[] buf;
356}
357}  // namespace Hdc
358