1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "file_descriptor.h"
16 #ifndef HDC_HOST
17 #include <sys/epoll.h>
18 #endif
19
20 namespace Hdc {
21 static const int SECONDS_TIMEOUT = 5;
22
HdcFileDescriptor(uv_loop_t *loopIn, int fdToRead, void *callerContextIn, CallBackWhenRead callbackReadIn, CmdResultCallback callbackFinishIn, bool interactiveShell)23 HdcFileDescriptor::HdcFileDescriptor(uv_loop_t *loopIn, int fdToRead, void *callerContextIn,
24 CallBackWhenRead callbackReadIn, CmdResultCallback callbackFinishIn,
25 bool interactiveShell)
26 {
27 loop = loopIn;
28 workContinue = true;
29 callbackFinish = callbackFinishIn;
30 callbackRead = callbackReadIn;
31 fdIO = fdToRead;
32 refIO = 0;
33 isInteractive = interactiveShell;
34 callerContext = callerContextIn;
35 if (isInteractive) {
36 std::thread([this]() {
37 HdcFileDescriptor::IOWriteThread(this);
38 }).detach();
39 }
40 }
41
~HdcFileDescriptor()42 HdcFileDescriptor::~HdcFileDescriptor()
43 {
44 workContinue = false;
45 if (isInteractive) {
46 NotifyWrite();
47 uv_sleep(MILL_SECONDS);
48 }
49 }
50
ReadyForRelease()51 bool HdcFileDescriptor::ReadyForRelease()
52 {
53 return refIO == 0;
54 }
55
56 // just tryCloseFdIo = true, callback will be effect
StopWorkOnThread(bool tryCloseFdIo, std::function<void()> closeFdCallback)57 void HdcFileDescriptor::StopWorkOnThread(bool tryCloseFdIo, std::function<void()> closeFdCallback)
58 {
59 workContinue = false;
60 if (isInteractive) {
61 NotifyWrite();
62 }
63
64 callbackCloseFd = closeFdCallback;
65 if (tryCloseFdIo && refIO > 0) {
66 if (callbackCloseFd != nullptr) {
67 callbackCloseFd();
68 }
69 }
70 }
71
FileIOOnThread(CtxFileIO *ctxIO, int bufSize)72 void HdcFileDescriptor::FileIOOnThread(CtxFileIO *ctxIO, int bufSize)
73 {
74 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
75 mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
76 mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
77 #endif
78 HdcFileDescriptor *thisClass = ctxIO->thisClass;
79 uint8_t *buf = ctxIO->bufIO;
80 bool bFinish = false;
81 bool fetalFinish = false;
82 ssize_t nBytes;
83 #ifndef HDC_HOST
84 constexpr int epollSize = 1;
85 int epfd = epoll_create(epollSize);
86 struct epoll_event ev;
87 struct epoll_event events[epollSize];
88 ev.data.fd = thisClass->fdIO;
89 ev.events = EPOLLIN | EPOLLET;
90 epoll_ctl(epfd, EPOLL_CTL_ADD, thisClass->fdIO, &ev);
91 #endif
92 while (true) {
93 if (thisClass->workContinue == false) {
94 WRITE_LOG(LOG_INFO, "FileIOOnThread fdIO:%d workContinue false", thisClass->fdIO);
95 bFinish = true;
96 break;
97 }
98
99 if (memset_s(buf, bufSize, 0, bufSize) != EOK) {
100 WRITE_LOG(LOG_DEBUG, "FileIOOnThread buf memset_s fail.");
101 bFinish = true;
102 break;
103 }
104 #ifndef HDC_HOST
105 int rc = epoll_wait(epfd, events, epollSize, SECONDS_TIMEOUT * TIME_BASE);
106 #else
107 struct timeval timeout;
108 timeout.tv_sec = SECONDS_TIMEOUT;
109 timeout.tv_usec = 0;
110 fd_set rset;
111 FD_ZERO(&rset);
112 FD_SET(thisClass->fdIO, &rset);
113 int rc = select(thisClass->fdIO + 1, &rset, nullptr, nullptr, &timeout);
114 #endif
115 if (rc < 0) {
116 WRITE_LOG(LOG_FATAL, "FileIOOnThread select or epoll_wait fdIO:%d error:%d",
117 thisClass->fdIO, errno);
118 if (errno == EINTR || errno == EAGAIN) {
119 continue;
120 }
121 bFinish = true;
122 break;
123 } else if (rc == 0) {
124 WRITE_LOG(LOG_WARN, "FileIOOnThread select rc = 0, timeout.");
125 continue;
126 }
127 nBytes = 0;
128 #ifndef HDC_HOST
129 uint32_t event = events[0].events;
130 if ((event & EPOLLIN) && (thisClass->fdIO > 0)) {
131 nBytes = read(thisClass->fdIO, buf, bufSize);
132 }
133 if ((event & EPOLLERR) || (event & EPOLLHUP) || (event & EPOLLRDHUP)) {
134 bFinish = true;
135 fetalFinish = true;
136 if ((nBytes > 0) && !thisClass->callbackRead(thisClass->callerContext, buf, nBytes)) {
137 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d callbackRead false", thisClass->fdIO);
138 }
139 break;
140 }
141 if (nBytes < 0 && (errno == EINTR || errno == EAGAIN)) {
142 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d read interrupt", thisClass->fdIO);
143 continue;
144 }
145 if (nBytes > 0) {
146 if (!thisClass->callbackRead(thisClass->callerContext, buf, nBytes)) {
147 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d callbackRead false", thisClass->fdIO);
148 bFinish = true;
149 break;
150 }
151 continue;
152 } else {
153 WRITE_LOG(LOG_INFO, "FileIOOnThread fd:%d nBytes:%d errno:%d",
154 thisClass->fdIO, nBytes, errno);
155 bFinish = true;
156 fetalFinish = true;
157 break;
158 }
159 #else
160 if (thisClass->fdIO > 0) {
161 nBytes = read(thisClass->fdIO, buf, bufSize);
162 }
163 if (nBytes < 0 && (errno == EINTR || errno == EAGAIN)) {
164 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d read interrupt", thisClass->fdIO);
165 continue;
166 }
167 if (nBytes > 0) {
168 if (!thisClass->callbackRead(thisClass->callerContext, buf, nBytes)) {
169 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d callbackRead false", thisClass->fdIO);
170 bFinish = true;
171 break;
172 }
173 continue;
174 } else {
175 WRITE_LOG(LOG_INFO, "FileIOOnThread fd:%d nBytes:%d errno:%d",
176 thisClass->fdIO, nBytes, errno);
177 bFinish = true;
178 fetalFinish = true;
179 break;
180 }
181 #endif
182 }
183 #ifndef HDC_HOST
184 if ((thisClass->fdIO > 0) && (epoll_ctl(epfd, EPOLL_CTL_DEL, thisClass->fdIO, nullptr) == -1)) {
185 WRITE_LOG(LOG_INFO, "EPOLL_CTL_DEL fail fd:%d epfd:%d errno:%d",
186 thisClass->fdIO, epfd, errno);
187 }
188 close(epfd);
189 #endif
190 if (buf != nullptr) {
191 delete[] buf;
192 buf = nullptr;
193 }
194 delete ctxIO;
195
196 --thisClass->refIO;
197 if (bFinish) {
198 thisClass->workContinue = false;
199 thisClass->callbackFinish(thisClass->callerContext, fetalFinish, STRING_EMPTY);
200 }
201 }
202
LoopReadOnThread()203 int HdcFileDescriptor::LoopReadOnThread()
204 {
205 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
206 mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
207 mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
208 #endif
209 int readMax = Base::GetMaxBufSizeStable() * 1.2; // 120% of max buf size, use stable size to avoid no buf.
210 auto contextIO = new(std::nothrow) CtxFileIO();
211 auto buf = new(std::nothrow) uint8_t[readMax]();
212 if (!contextIO || !buf) {
213 if (contextIO) {
214 delete contextIO;
215 }
216 if (buf) {
217 delete[] buf;
218 }
219 WRITE_LOG(LOG_FATAL, "Memory alloc failed");
220 callbackFinish(callerContext, true, "Memory alloc failed");
221 return -1;
222 }
223 contextIO->bufIO = buf;
224 contextIO->thisClass = this;
225 ++refIO;
226 std::thread([contextIO, readMax]() {
227 HdcFileDescriptor::FileIOOnThread(contextIO, readMax);
228 }).detach();
229 return 0;
230 }
231
StartWorkOnThread()232 bool HdcFileDescriptor::StartWorkOnThread()
233 {
234 if (LoopReadOnThread() < 0) {
235 return false;
236 }
237 return true;
238 }
239
Write(uint8_t *data, int size)240 int HdcFileDescriptor::Write(uint8_t *data, int size)
241 {
242 if (size > static_cast<int>(HDC_BUF_MAX_BYTES - 1)) {
243 size = static_cast<int>(HDC_BUF_MAX_BYTES - 1);
244 }
245 if (size <= 0) {
246 WRITE_LOG(LOG_WARN, "Write failed, size:%d", size);
247 return -1;
248 }
249 auto buf = new(std::nothrow) uint8_t[size];
250 if (!buf) {
251 return -1;
252 }
253 if (memcpy_s(buf, size, data, size) != EOK) {
254 delete[] buf;
255 return -1;
256 }
257 return WriteWithMem(buf, size);
258 }
259
260 // Data's memory must be Malloc, and the callback FREE after this function is completed
WriteWithMem(uint8_t *data, int size)261 int HdcFileDescriptor::WriteWithMem(uint8_t *data, int size)
262 {
263 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
264 mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
265 mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
266 #endif
267 auto contextIO = new(std::nothrow) CtxFileIO();
268 if (!contextIO) {
269 delete[] data;
270 WRITE_LOG(LOG_FATAL, "Memory alloc failed");
271 callbackFinish(callerContext, true, "Memory alloc failed");
272 return -1;
273 }
274 contextIO->bufIO = data;
275 contextIO->size = static_cast<size_t>(size);
276 contextIO->thisClass = this;
277 PushWrite(contextIO);
278 NotifyWrite();
279 return size;
280 }
281
IOWriteThread(void *object)282 void HdcFileDescriptor::IOWriteThread(void *object)
283 {
284 HdcFileDescriptor *hfd = reinterpret_cast<HdcFileDescriptor *>(object);
285 while (hfd->workContinue) {
286 hfd->HandleWrite();
287 hfd->WaitWrite();
288 }
289 }
290
PushWrite(CtxFileIO *cfio)291 void HdcFileDescriptor::PushWrite(CtxFileIO *cfio)
292 {
293 std::unique_lock<std::mutex> lock(writeMutex);
294 writeQueue.push(cfio);
295 }
296
PopWrite()297 CtxFileIO *HdcFileDescriptor::PopWrite()
298 {
299 std::unique_lock<std::mutex> lock(writeMutex);
300 CtxFileIO *cfio = nullptr;
301 if (!writeQueue.empty()) {
302 cfio = writeQueue.front();
303 writeQueue.pop();
304 }
305 return cfio;
306 }
307
NotifyWrite()308 void HdcFileDescriptor::NotifyWrite()
309 {
310 writeCond.notify_one();
311 }
312
WaitWrite()313 void HdcFileDescriptor::WaitWrite()
314 {
315 std::unique_lock<std::mutex> lock(writeMutex);
316 writeCond.wait_for(lock, std::chrono::seconds(WAIT_SECONDS), [&]() {
317 return !writeQueue.empty() || !workContinue;
318 });
319 }
320
HandleWrite()321 void HdcFileDescriptor::HandleWrite()
322 {
323 CtxFileIO *cfio = nullptr;
324 while ((cfio = PopWrite()) != nullptr) {
325 CtxFileIOWrite(cfio);
326 delete cfio;
327 }
328 }
329
CtxFileIOWrite(CtxFileIO *cfio)330 void HdcFileDescriptor::CtxFileIOWrite(CtxFileIO *cfio)
331 {
332 std::unique_lock<std::mutex> lock(writeMutex);
333 uint8_t *buf = cfio->bufIO;
334 uint8_t *data = buf;
335 size_t cnt = cfio->size;
336 constexpr int intrmax = 1000;
337 int intrcnt = 0;
338 while (cnt > 0) {
339 ssize_t rc = write(fdIO, data, cnt);
340 if (rc < 0) {
341 if (errno == EINTR || errno == EAGAIN) {
342 if (++intrcnt > intrmax) {
343 WRITE_LOG(LOG_WARN, "CtxFileIOWrite fdIO:%d interrupt errno:%d", fdIO, errno);
344 intrcnt = 0;
345 }
346 continue;
347 } else {
348 WRITE_LOG(LOG_FATAL, "CtxFileIOWrite fdIO:%d rc:%d error:%d", fdIO, rc, errno);
349 break;
350 }
351 }
352 data += rc;
353 cnt -= static_cast<size_t>(rc);
354 }
355 delete[] buf;
356 }
357 } // namespace Hdc
358