1/* 2 * Copyright (C) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15#include "file_descriptor.h" 16#ifndef HDC_HOST 17#include <sys/epoll.h> 18#endif 19 20namespace Hdc { 21static const int SECONDS_TIMEOUT = 5; 22 23HdcFileDescriptor::HdcFileDescriptor(uv_loop_t *loopIn, int fdToRead, void *callerContextIn, 24 CallBackWhenRead callbackReadIn, CmdResultCallback callbackFinishIn, 25 bool interactiveShell) 26{ 27 loop = loopIn; 28 workContinue = true; 29 callbackFinish = callbackFinishIn; 30 callbackRead = callbackReadIn; 31 fdIO = fdToRead; 32 refIO = 0; 33 isInteractive = interactiveShell; 34 callerContext = callerContextIn; 35 if (isInteractive) { 36 std::thread([this]() { 37 HdcFileDescriptor::IOWriteThread(this); 38 }).detach(); 39 } 40} 41 42HdcFileDescriptor::~HdcFileDescriptor() 43{ 44 workContinue = false; 45 if (isInteractive) { 46 NotifyWrite(); 47 uv_sleep(MILL_SECONDS); 48 } 49} 50 51bool HdcFileDescriptor::ReadyForRelease() 52{ 53 return refIO == 0; 54} 55 56// just tryCloseFdIo = true, callback will be effect 57void HdcFileDescriptor::StopWorkOnThread(bool tryCloseFdIo, std::function<void()> closeFdCallback) 58{ 59 workContinue = false; 60 if (isInteractive) { 61 NotifyWrite(); 62 } 63 64 callbackCloseFd = closeFdCallback; 65 if (tryCloseFdIo && refIO > 0) { 66 if (callbackCloseFd != nullptr) { 67 callbackCloseFd(); 68 } 69 } 70} 71 72void HdcFileDescriptor::FileIOOnThread(CtxFileIO *ctxIO, int bufSize) 73{ 74#ifdef CONFIG_USE_JEMALLOC_DFX_INIF 75 mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE); 76 mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE); 77#endif 78 HdcFileDescriptor *thisClass = ctxIO->thisClass; 79 uint8_t *buf = ctxIO->bufIO; 80 bool bFinish = false; 81 bool fetalFinish = false; 82 ssize_t nBytes; 83#ifndef HDC_HOST 84 constexpr int epollSize = 1; 85 int epfd = epoll_create(epollSize); 86 struct epoll_event ev; 87 struct epoll_event events[epollSize]; 88 ev.data.fd = thisClass->fdIO; 89 ev.events = EPOLLIN | EPOLLET; 90 epoll_ctl(epfd, EPOLL_CTL_ADD, thisClass->fdIO, &ev); 91#endif 92 while (true) { 93 if (thisClass->workContinue == false) { 94 WRITE_LOG(LOG_INFO, "FileIOOnThread fdIO:%d workContinue false", thisClass->fdIO); 95 bFinish = true; 96 break; 97 } 98 99 if (memset_s(buf, bufSize, 0, bufSize) != EOK) { 100 WRITE_LOG(LOG_DEBUG, "FileIOOnThread buf memset_s fail."); 101 bFinish = true; 102 break; 103 } 104#ifndef HDC_HOST 105 int rc = epoll_wait(epfd, events, epollSize, SECONDS_TIMEOUT * TIME_BASE); 106#else 107 struct timeval timeout; 108 timeout.tv_sec = SECONDS_TIMEOUT; 109 timeout.tv_usec = 0; 110 fd_set rset; 111 FD_ZERO(&rset); 112 FD_SET(thisClass->fdIO, &rset); 113 int rc = select(thisClass->fdIO + 1, &rset, nullptr, nullptr, &timeout); 114#endif 115 if (rc < 0) { 116 WRITE_LOG(LOG_FATAL, "FileIOOnThread select or epoll_wait fdIO:%d error:%d", 117 thisClass->fdIO, errno); 118 if (errno == EINTR || errno == EAGAIN) { 119 continue; 120 } 121 bFinish = true; 122 break; 123 } else if (rc == 0) { 124 WRITE_LOG(LOG_WARN, "FileIOOnThread select rc = 0, timeout."); 125 continue; 126 } 127 nBytes = 0; 128#ifndef HDC_HOST 129 uint32_t event = events[0].events; 130 if ((event & EPOLLIN) && (thisClass->fdIO > 0)) { 131 nBytes = read(thisClass->fdIO, buf, bufSize); 132 } 133 if ((event & EPOLLERR) || (event & EPOLLHUP) || (event & EPOLLRDHUP)) { 134 bFinish = true; 135 fetalFinish = true; 136 if ((nBytes > 0) && !thisClass->callbackRead(thisClass->callerContext, buf, nBytes)) { 137 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d callbackRead false", thisClass->fdIO); 138 } 139 break; 140 } 141 if (nBytes < 0 && (errno == EINTR || errno == EAGAIN)) { 142 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d read interrupt", thisClass->fdIO); 143 continue; 144 } 145 if (nBytes > 0) { 146 if (!thisClass->callbackRead(thisClass->callerContext, buf, nBytes)) { 147 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d callbackRead false", thisClass->fdIO); 148 bFinish = true; 149 break; 150 } 151 continue; 152 } else { 153 WRITE_LOG(LOG_INFO, "FileIOOnThread fd:%d nBytes:%d errno:%d", 154 thisClass->fdIO, nBytes, errno); 155 bFinish = true; 156 fetalFinish = true; 157 break; 158 } 159#else 160 if (thisClass->fdIO > 0) { 161 nBytes = read(thisClass->fdIO, buf, bufSize); 162 } 163 if (nBytes < 0 && (errno == EINTR || errno == EAGAIN)) { 164 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d read interrupt", thisClass->fdIO); 165 continue; 166 } 167 if (nBytes > 0) { 168 if (!thisClass->callbackRead(thisClass->callerContext, buf, nBytes)) { 169 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d callbackRead false", thisClass->fdIO); 170 bFinish = true; 171 break; 172 } 173 continue; 174 } else { 175 WRITE_LOG(LOG_INFO, "FileIOOnThread fd:%d nBytes:%d errno:%d", 176 thisClass->fdIO, nBytes, errno); 177 bFinish = true; 178 fetalFinish = true; 179 break; 180 } 181#endif 182 } 183#ifndef HDC_HOST 184 if ((thisClass->fdIO > 0) && (epoll_ctl(epfd, EPOLL_CTL_DEL, thisClass->fdIO, nullptr) == -1)) { 185 WRITE_LOG(LOG_INFO, "EPOLL_CTL_DEL fail fd:%d epfd:%d errno:%d", 186 thisClass->fdIO, epfd, errno); 187 } 188 close(epfd); 189#endif 190 if (buf != nullptr) { 191 delete[] buf; 192 buf = nullptr; 193 } 194 delete ctxIO; 195 196 --thisClass->refIO; 197 if (bFinish) { 198 thisClass->workContinue = false; 199 thisClass->callbackFinish(thisClass->callerContext, fetalFinish, STRING_EMPTY); 200 } 201} 202 203int HdcFileDescriptor::LoopReadOnThread() 204{ 205#ifdef CONFIG_USE_JEMALLOC_DFX_INIF 206 mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE); 207 mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE); 208#endif 209 int readMax = Base::GetMaxBufSizeStable() * 1.2; // 120% of max buf size, use stable size to avoid no buf. 210 auto contextIO = new(std::nothrow) CtxFileIO(); 211 auto buf = new(std::nothrow) uint8_t[readMax](); 212 if (!contextIO || !buf) { 213 if (contextIO) { 214 delete contextIO; 215 } 216 if (buf) { 217 delete[] buf; 218 } 219 WRITE_LOG(LOG_FATAL, "Memory alloc failed"); 220 callbackFinish(callerContext, true, "Memory alloc failed"); 221 return -1; 222 } 223 contextIO->bufIO = buf; 224 contextIO->thisClass = this; 225 ++refIO; 226 std::thread([contextIO, readMax]() { 227 HdcFileDescriptor::FileIOOnThread(contextIO, readMax); 228 }).detach(); 229 return 0; 230} 231 232bool HdcFileDescriptor::StartWorkOnThread() 233{ 234 if (LoopReadOnThread() < 0) { 235 return false; 236 } 237 return true; 238} 239 240int HdcFileDescriptor::Write(uint8_t *data, int size) 241{ 242 if (size > static_cast<int>(HDC_BUF_MAX_BYTES - 1)) { 243 size = static_cast<int>(HDC_BUF_MAX_BYTES - 1); 244 } 245 if (size <= 0) { 246 WRITE_LOG(LOG_WARN, "Write failed, size:%d", size); 247 return -1; 248 } 249 auto buf = new(std::nothrow) uint8_t[size]; 250 if (!buf) { 251 return -1; 252 } 253 if (memcpy_s(buf, size, data, size) != EOK) { 254 delete[] buf; 255 return -1; 256 } 257 return WriteWithMem(buf, size); 258} 259 260// Data's memory must be Malloc, and the callback FREE after this function is completed 261int HdcFileDescriptor::WriteWithMem(uint8_t *data, int size) 262{ 263#ifdef CONFIG_USE_JEMALLOC_DFX_INIF 264 mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE); 265 mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE); 266#endif 267 auto contextIO = new(std::nothrow) CtxFileIO(); 268 if (!contextIO) { 269 delete[] data; 270 WRITE_LOG(LOG_FATAL, "Memory alloc failed"); 271 callbackFinish(callerContext, true, "Memory alloc failed"); 272 return -1; 273 } 274 contextIO->bufIO = data; 275 contextIO->size = static_cast<size_t>(size); 276 contextIO->thisClass = this; 277 PushWrite(contextIO); 278 NotifyWrite(); 279 return size; 280} 281 282void HdcFileDescriptor::IOWriteThread(void *object) 283{ 284 HdcFileDescriptor *hfd = reinterpret_cast<HdcFileDescriptor *>(object); 285 while (hfd->workContinue) { 286 hfd->HandleWrite(); 287 hfd->WaitWrite(); 288 } 289} 290 291void HdcFileDescriptor::PushWrite(CtxFileIO *cfio) 292{ 293 std::unique_lock<std::mutex> lock(writeMutex); 294 writeQueue.push(cfio); 295} 296 297CtxFileIO *HdcFileDescriptor::PopWrite() 298{ 299 std::unique_lock<std::mutex> lock(writeMutex); 300 CtxFileIO *cfio = nullptr; 301 if (!writeQueue.empty()) { 302 cfio = writeQueue.front(); 303 writeQueue.pop(); 304 } 305 return cfio; 306} 307 308void HdcFileDescriptor::NotifyWrite() 309{ 310 writeCond.notify_one(); 311} 312 313void HdcFileDescriptor::WaitWrite() 314{ 315 std::unique_lock<std::mutex> lock(writeMutex); 316 writeCond.wait_for(lock, std::chrono::seconds(WAIT_SECONDS), [&]() { 317 return !writeQueue.empty() || !workContinue; 318 }); 319} 320 321void HdcFileDescriptor::HandleWrite() 322{ 323 CtxFileIO *cfio = nullptr; 324 while ((cfio = PopWrite()) != nullptr) { 325 CtxFileIOWrite(cfio); 326 delete cfio; 327 } 328} 329 330void HdcFileDescriptor::CtxFileIOWrite(CtxFileIO *cfio) 331{ 332 std::unique_lock<std::mutex> lock(writeMutex); 333 uint8_t *buf = cfio->bufIO; 334 uint8_t *data = buf; 335 size_t cnt = cfio->size; 336 constexpr int intrmax = 1000; 337 int intrcnt = 0; 338 while (cnt > 0) { 339 ssize_t rc = write(fdIO, data, cnt); 340 if (rc < 0) { 341 if (errno == EINTR || errno == EAGAIN) { 342 if (++intrcnt > intrmax) { 343 WRITE_LOG(LOG_WARN, "CtxFileIOWrite fdIO:%d interrupt errno:%d", fdIO, errno); 344 intrcnt = 0; 345 } 346 continue; 347 } else { 348 WRITE_LOG(LOG_FATAL, "CtxFileIOWrite fdIO:%d rc:%d error:%d", fdIO, rc, errno); 349 break; 350 } 351 } 352 data += rc; 353 cnt -= static_cast<size_t>(rc); 354 } 355 delete[] buf; 356} 357} // namespace Hdc 358