1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "file_descriptor.h"
16 #ifndef HDC_HOST
17 #include <sys/epoll.h>
18 #endif
19 
20 namespace Hdc {
21 static const int SECONDS_TIMEOUT = 5;
22 
HdcFileDescriptor(uv_loop_t *loopIn, int fdToRead, void *callerContextIn, CallBackWhenRead callbackReadIn, CmdResultCallback callbackFinishIn, bool interactiveShell)23 HdcFileDescriptor::HdcFileDescriptor(uv_loop_t *loopIn, int fdToRead, void *callerContextIn,
24                                      CallBackWhenRead callbackReadIn, CmdResultCallback callbackFinishIn,
25                                      bool interactiveShell)
26 {
27     loop = loopIn;
28     workContinue = true;
29     callbackFinish = callbackFinishIn;
30     callbackRead = callbackReadIn;
31     fdIO = fdToRead;
32     refIO = 0;
33     isInteractive = interactiveShell;
34     callerContext = callerContextIn;
35     if (isInteractive) {
36         std::thread([this]() {
37             HdcFileDescriptor::IOWriteThread(this);
38         }).detach();
39     }
40 }
41 
~HdcFileDescriptor()42 HdcFileDescriptor::~HdcFileDescriptor()
43 {
44     workContinue = false;
45     if (isInteractive) {
46         NotifyWrite();
47         uv_sleep(MILL_SECONDS);
48     }
49 }
50 
ReadyForRelease()51 bool HdcFileDescriptor::ReadyForRelease()
52 {
53     return refIO == 0;
54 }
55 
56 // just tryCloseFdIo = true, callback will be effect
StopWorkOnThread(bool tryCloseFdIo, std::function<void()> closeFdCallback)57 void HdcFileDescriptor::StopWorkOnThread(bool tryCloseFdIo, std::function<void()> closeFdCallback)
58 {
59     workContinue = false;
60     if (isInteractive) {
61         NotifyWrite();
62     }
63 
64     callbackCloseFd = closeFdCallback;
65     if (tryCloseFdIo && refIO > 0) {
66         if (callbackCloseFd != nullptr) {
67             callbackCloseFd();
68         }
69     }
70 }
71 
FileIOOnThread(CtxFileIO *ctxIO, int bufSize)72 void HdcFileDescriptor::FileIOOnThread(CtxFileIO *ctxIO, int bufSize)
73 {
74 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
75     mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
76     mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
77 #endif
78     HdcFileDescriptor *thisClass = ctxIO->thisClass;
79     uint8_t *buf = ctxIO->bufIO;
80     bool bFinish = false;
81     bool fetalFinish = false;
82     ssize_t nBytes;
83 #ifndef HDC_HOST
84     constexpr int epollSize = 1;
85     int epfd = epoll_create(epollSize);
86     struct epoll_event ev;
87     struct epoll_event events[epollSize];
88     ev.data.fd = thisClass->fdIO;
89     ev.events = EPOLLIN | EPOLLET;
90     epoll_ctl(epfd, EPOLL_CTL_ADD, thisClass->fdIO, &ev);
91 #endif
92     while (true) {
93         if (thisClass->workContinue == false) {
94             WRITE_LOG(LOG_INFO, "FileIOOnThread fdIO:%d workContinue false", thisClass->fdIO);
95             bFinish = true;
96             break;
97         }
98 
99         if (memset_s(buf, bufSize, 0, bufSize) != EOK) {
100             WRITE_LOG(LOG_DEBUG, "FileIOOnThread buf memset_s fail.");
101             bFinish = true;
102             break;
103         }
104 #ifndef HDC_HOST
105         int rc = epoll_wait(epfd, events, epollSize, SECONDS_TIMEOUT * TIME_BASE);
106 #else
107         struct timeval timeout;
108         timeout.tv_sec = SECONDS_TIMEOUT;
109         timeout.tv_usec = 0;
110         fd_set rset;
111         FD_ZERO(&rset);
112         FD_SET(thisClass->fdIO, &rset);
113         int rc = select(thisClass->fdIO + 1, &rset, nullptr, nullptr, &timeout);
114 #endif
115         if (rc < 0) {
116             WRITE_LOG(LOG_FATAL, "FileIOOnThread select or epoll_wait fdIO:%d error:%d",
117                 thisClass->fdIO, errno);
118             if (errno == EINTR || errno == EAGAIN) {
119                 continue;
120             }
121             bFinish = true;
122             break;
123         } else if (rc == 0) {
124             WRITE_LOG(LOG_WARN, "FileIOOnThread select rc = 0, timeout.");
125             continue;
126         }
127         nBytes = 0;
128 #ifndef HDC_HOST
129         uint32_t event = events[0].events;
130         if ((event & EPOLLIN) && (thisClass->fdIO > 0)) {
131             nBytes = read(thisClass->fdIO, buf, bufSize);
132         }
133         if ((event & EPOLLERR) || (event & EPOLLHUP) || (event & EPOLLRDHUP)) {
134             bFinish = true;
135             fetalFinish = true;
136             if ((nBytes > 0) && !thisClass->callbackRead(thisClass->callerContext, buf, nBytes)) {
137                 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d callbackRead false", thisClass->fdIO);
138             }
139             break;
140         }
141         if (nBytes < 0 && (errno == EINTR || errno == EAGAIN)) {
142             WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d read interrupt", thisClass->fdIO);
143             continue;
144         }
145         if (nBytes > 0) {
146             if (!thisClass->callbackRead(thisClass->callerContext, buf, nBytes)) {
147                 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d callbackRead false", thisClass->fdIO);
148                 bFinish = true;
149                 break;
150             }
151             continue;
152         } else {
153             WRITE_LOG(LOG_INFO, "FileIOOnThread fd:%d nBytes:%d errno:%d",
154                 thisClass->fdIO, nBytes, errno);
155             bFinish = true;
156             fetalFinish = true;
157             break;
158         }
159 #else
160         if (thisClass->fdIO > 0) {
161             nBytes = read(thisClass->fdIO, buf, bufSize);
162         }
163         if (nBytes < 0 && (errno == EINTR || errno == EAGAIN)) {
164             WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d read interrupt", thisClass->fdIO);
165             continue;
166         }
167         if (nBytes > 0) {
168             if (!thisClass->callbackRead(thisClass->callerContext, buf, nBytes)) {
169                 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d callbackRead false", thisClass->fdIO);
170                 bFinish = true;
171                 break;
172             }
173             continue;
174         } else {
175             WRITE_LOG(LOG_INFO, "FileIOOnThread fd:%d nBytes:%d errno:%d",
176                 thisClass->fdIO, nBytes, errno);
177             bFinish = true;
178             fetalFinish = true;
179             break;
180         }
181 #endif
182     }
183 #ifndef HDC_HOST
184     if ((thisClass->fdIO > 0) && (epoll_ctl(epfd, EPOLL_CTL_DEL, thisClass->fdIO, nullptr) == -1)) {
185         WRITE_LOG(LOG_INFO, "EPOLL_CTL_DEL fail fd:%d epfd:%d errno:%d",
186             thisClass->fdIO, epfd, errno);
187     }
188     close(epfd);
189 #endif
190     if (buf != nullptr) {
191         delete[] buf;
192         buf = nullptr;
193     }
194     delete ctxIO;
195 
196     --thisClass->refIO;
197     if (bFinish) {
198         thisClass->workContinue = false;
199         thisClass->callbackFinish(thisClass->callerContext, fetalFinish, STRING_EMPTY);
200     }
201 }
202 
LoopReadOnThread()203 int HdcFileDescriptor::LoopReadOnThread()
204 {
205 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
206     mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
207     mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
208 #endif
209     int readMax = Base::GetMaxBufSizeStable() * 1.2; // 120% of max buf size, use stable size to avoid no buf.
210     auto contextIO = new(std::nothrow) CtxFileIO();
211     auto buf = new(std::nothrow) uint8_t[readMax]();
212     if (!contextIO || !buf) {
213         if (contextIO) {
214             delete contextIO;
215         }
216         if (buf) {
217             delete[] buf;
218         }
219         WRITE_LOG(LOG_FATAL, "Memory alloc failed");
220         callbackFinish(callerContext, true, "Memory alloc failed");
221         return -1;
222     }
223     contextIO->bufIO = buf;
224     contextIO->thisClass = this;
225     ++refIO;
226     std::thread([contextIO, readMax]() {
227         HdcFileDescriptor::FileIOOnThread(contextIO, readMax);
228     }).detach();
229     return 0;
230 }
231 
StartWorkOnThread()232 bool HdcFileDescriptor::StartWorkOnThread()
233 {
234     if (LoopReadOnThread() < 0) {
235         return false;
236     }
237     return true;
238 }
239 
Write(uint8_t *data, int size)240 int HdcFileDescriptor::Write(uint8_t *data, int size)
241 {
242     if (size > static_cast<int>(HDC_BUF_MAX_BYTES - 1)) {
243         size = static_cast<int>(HDC_BUF_MAX_BYTES - 1);
244     }
245     if (size <= 0) {
246         WRITE_LOG(LOG_WARN, "Write failed, size:%d", size);
247         return -1;
248     }
249     auto buf = new(std::nothrow) uint8_t[size];
250     if (!buf) {
251         return -1;
252     }
253     if (memcpy_s(buf, size, data, size) != EOK) {
254         delete[] buf;
255         return -1;
256     }
257     return WriteWithMem(buf, size);
258 }
259 
260 // Data's memory must be Malloc, and the callback FREE after this function is completed
WriteWithMem(uint8_t *data, int size)261 int HdcFileDescriptor::WriteWithMem(uint8_t *data, int size)
262 {
263 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
264     mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
265     mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
266 #endif
267     auto contextIO = new(std::nothrow) CtxFileIO();
268     if (!contextIO) {
269         delete[] data;
270         WRITE_LOG(LOG_FATAL, "Memory alloc failed");
271         callbackFinish(callerContext, true, "Memory alloc failed");
272         return -1;
273     }
274     contextIO->bufIO = data;
275     contextIO->size = static_cast<size_t>(size);
276     contextIO->thisClass = this;
277     PushWrite(contextIO);
278     NotifyWrite();
279     return size;
280 }
281 
IOWriteThread(void *object)282 void HdcFileDescriptor::IOWriteThread(void *object)
283 {
284     HdcFileDescriptor *hfd = reinterpret_cast<HdcFileDescriptor *>(object);
285     while (hfd->workContinue) {
286         hfd->HandleWrite();
287         hfd->WaitWrite();
288     }
289 }
290 
PushWrite(CtxFileIO *cfio)291 void HdcFileDescriptor::PushWrite(CtxFileIO *cfio)
292 {
293     std::unique_lock<std::mutex> lock(writeMutex);
294     writeQueue.push(cfio);
295 }
296 
PopWrite()297 CtxFileIO *HdcFileDescriptor::PopWrite()
298 {
299     std::unique_lock<std::mutex> lock(writeMutex);
300     CtxFileIO *cfio = nullptr;
301     if (!writeQueue.empty()) {
302         cfio = writeQueue.front();
303         writeQueue.pop();
304     }
305     return cfio;
306 }
307 
NotifyWrite()308 void HdcFileDescriptor::NotifyWrite()
309 {
310     writeCond.notify_one();
311 }
312 
WaitWrite()313 void HdcFileDescriptor::WaitWrite()
314 {
315     std::unique_lock<std::mutex> lock(writeMutex);
316     writeCond.wait_for(lock, std::chrono::seconds(WAIT_SECONDS), [&]() {
317         return !writeQueue.empty() || !workContinue;
318     });
319 }
320 
HandleWrite()321 void HdcFileDescriptor::HandleWrite()
322 {
323     CtxFileIO *cfio = nullptr;
324     while ((cfio = PopWrite()) != nullptr) {
325         CtxFileIOWrite(cfio);
326         delete cfio;
327     }
328 }
329 
CtxFileIOWrite(CtxFileIO *cfio)330 void HdcFileDescriptor::CtxFileIOWrite(CtxFileIO *cfio)
331 {
332     std::unique_lock<std::mutex> lock(writeMutex);
333     uint8_t *buf = cfio->bufIO;
334     uint8_t *data = buf;
335     size_t cnt = cfio->size;
336     constexpr int intrmax = 1000;
337     int intrcnt = 0;
338     while (cnt > 0) {
339         ssize_t rc = write(fdIO, data, cnt);
340         if (rc < 0) {
341             if (errno == EINTR || errno == EAGAIN) {
342                 if (++intrcnt > intrmax) {
343                     WRITE_LOG(LOG_WARN, "CtxFileIOWrite fdIO:%d interrupt errno:%d", fdIO, errno);
344                     intrcnt = 0;
345                 }
346                 continue;
347             } else {
348                 WRITE_LOG(LOG_FATAL, "CtxFileIOWrite fdIO:%d rc:%d error:%d", fdIO, rc, errno);
349                 break;
350             }
351         }
352         data += rc;
353         cnt -= static_cast<size_t>(rc);
354     }
355     delete[] buf;
356 }
357 }  // namespace Hdc
358