1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "transfer.h"
16 #include "serial_struct.h"
17 #include <sys/stat.h>
18 #ifdef HARMONY_PROJECT
19 #include <lz4.h>
20 #endif
21 #if (!(defined(HOST_MINGW)||defined(HOST_MAC))) && defined(SURPPORT_SELINUX)
22 #include <selinux/selinux.h>
23 #endif
24 #include <memory>
25 namespace Hdc {
26 constexpr uint64_t HDC_TIME_CONVERT_BASE = 1000000000;
27
28
HdcTransferBase(HTaskInfo hTaskInfo)29 HdcTransferBase::HdcTransferBase(HTaskInfo hTaskInfo)
30 : HdcTaskBase(hTaskInfo)
31 {
32 ResetCtx(&ctxNow, true);
33 commandBegin = 0;
34 commandData = 0;
35 isStableBuf = false;
36 }
37
~HdcTransferBase()38 HdcTransferBase::~HdcTransferBase()
39 {
40 if (ctxNow.isFdOpen) {
41 WRITE_LOG(LOG_DEBUG, "~HdcTransferBase channelId:%u lastErrno:%u result:%d ioFinish:%d",
42 taskInfo->channelId, ctxNow.lastErrno, ctxNow.openFd, ctxNow.ioFinish);
43
44 if (ctxNow.lastErrno != 0 || (ctxNow.openFd > 0 && !ctxNow.ioFinish)) {
45 CloseFd(ctxNow.openFd);
46 ctxNow.isFdOpen = false;
47 }
48 } else {
49 WRITE_LOG(LOG_DEBUG, "~HdcTransferBase channelId:%u lastErrno:%u ioFinish:%d",
50 taskInfo->channelId, ctxNow.lastErrno, ctxNow.ioFinish);
51 }
52 };
53
CloseFd(ssize_t fd)54 void HdcTransferBase::CloseFd(ssize_t fd)
55 {
56 uv_fs_t fs;
57 uv_fs_close(nullptr, &fs, fd, nullptr);
58 uv_fs_req_cleanup(&fs);
59 }
60
ResetCtx(CtxFile *context, bool full)61 bool HdcTransferBase::ResetCtx(CtxFile *context, bool full)
62 {
63 if (full) {
64 *context = {};
65 context->thisClass = this;
66 context->loop = loopTask;
67 context->cb = OnFileIO;
68 }
69 context->closeNotify = false;
70 context->indexIO = 0;
71 context->lastErrno = 0;
72 context->ioFinish = false;
73 context->closeReqSubmitted = false;
74 context->openFd = -1;
75 return true;
76 }
77
SimpleFileIO(CtxFile *context, uint64_t index, uint8_t *sendBuf, int bytes)78 int HdcTransferBase::SimpleFileIO(CtxFile *context, uint64_t index, uint8_t *sendBuf, int bytes)
79 {
80 StartTraceScope("HdcTransferBase::SimpleFileIO");
81 // The first 8 bytes file offset
82 #ifndef CONFIG_USE_JEMALLOC_DFX_INIF
83 uint8_t *buf = cirbuf.Malloc();
84 #else
85 uint8_t *buf = new uint8_t[bytes + payloadPrefixReserve]();
86 #endif
87 if (buf == nullptr) {
88 WRITE_LOG(LOG_FATAL, "SimpleFileIO buf nullptr");
89 return -1;
90 }
91 CtxFileIO *ioContext = new(std::nothrow) CtxFileIO();
92 if (ioContext == nullptr) {
93 #ifndef CONFIG_USE_JEMALLOC_DFX_INIF
94 cirbuf.Free(buf);
95 #else
96 delete[] buf;
97 #endif
98 WRITE_LOG(LOG_FATAL, "SimpleFileIO ioContext nullptr");
99 return -1;
100 }
101 bool ret = false;
102 while (true) {
103 size_t bufMaxSize = context->isStableBufSize ?
104 static_cast<size_t>(Base::GetUsbffsBulkSizeStable() - payloadPrefixReserve) :
105 static_cast<size_t>(Base::GetUsbffsBulkSize() - payloadPrefixReserve);
106 if (bytes < 0 || static_cast<size_t>(bytes) > bufMaxSize) {
107 WRITE_LOG(LOG_DEBUG, "SimpleFileIO param check failed");
108 break;
109 }
110 if (context->ioFinish) {
111 WRITE_LOG(LOG_DEBUG, "SimpleFileIO to closed IOStream");
112 break;
113 }
114 uv_fs_t *req = &ioContext->fs;
115 ioContext->bufIO = buf + payloadPrefixReserve;
116 ioContext->context = context;
117 req->data = ioContext;
118 ++refCount;
119 if (context->master) { // master just read, and slave just write.when master/read, sendBuf can be nullptr
120 uv_buf_t iov = uv_buf_init(reinterpret_cast<char *>(ioContext->bufIO), bytes);
121 uv_fs_read(context->loop, req, context->openFd, &iov, 1, index, context->cb);
122 } else {
123 // The US_FS_WRITE here must be brought into the actual file offset, which cannot be incorporated with local
124 // accumulated index because UV_FS_WRITE will be executed multiple times and then trigger a callback.
125 if (bytes > 0 && memcpy_s(ioContext->bufIO, bufMaxSize, sendBuf, bytes) != EOK) {
126 WRITE_LOG(LOG_WARN, "SimpleFileIO memcpy error");
127 break;
128 }
129 uv_buf_t iov = uv_buf_init(reinterpret_cast<char *>(ioContext->bufIO), bytes);
130 uv_fs_write(context->loop, req, context->openFd, &iov, 1, index, context->cb);
131 }
132 ret = true;
133 break;
134 }
135 if (!ret) {
136 if (ioContext != nullptr) {
137 delete ioContext;
138 ioContext = nullptr;
139 }
140 #ifndef CONFIG_USE_JEMALLOC_DFX_INIF
141 cirbuf.Free(buf);
142 #else
143 delete[] buf;
144 #endif
145 return -1;
146 }
147 return bytes;
148 }
149
OnFileClose(CtxFile *context)150 void HdcTransferBase::OnFileClose(CtxFile *context)
151 {
152 StartTraceScope("HdcTransferBase::OnFileClose");
153 context->closeReqSubmitted = false;
154 HdcTransferBase *thisClass = (HdcTransferBase *)context->thisClass;
155 if (context->closeNotify) {
156 // close-step2
157 // maybe successful finish or failed finish
158 thisClass->WhenTransferFinish(context);
159 }
160 --thisClass->refCount;
161 return;
162 }
163
SetFileTime(CtxFile *context)164 void HdcTransferBase::SetFileTime(CtxFile *context)
165 {
166 if (!context->transferConfig.holdTimestamp) {
167 return;
168 }
169 if (!context->transferConfig.mtime) {
170 return;
171 }
172 uv_fs_t fs;
173 double aTimeSec = static_cast<long double>(context->transferConfig.atime) / HDC_TIME_CONVERT_BASE;
174 double mTimeSec = static_cast<long double>(context->transferConfig.mtime) / HDC_TIME_CONVERT_BASE;
175 uv_fs_futime(nullptr, &fs, context->openFd, aTimeSec, mTimeSec, nullptr);
176 uv_fs_req_cleanup(&fs);
177 }
178
SendIOPayload(CtxFile *context, uint64_t index, uint8_t *data, int dataSize)179 bool HdcTransferBase::SendIOPayload(CtxFile *context, uint64_t index, uint8_t *data, int dataSize)
180 {
181 TransferPayload payloadHead;
182 string head;
183 int compressSize = 0;
184 int sendBufSize = payloadPrefixReserve + dataSize;
185 uint8_t *sendBuf = data - payloadPrefixReserve;
186 bool ret = false;
187
188 StartTraceScope("HdcTransferBase::SendIOPayload");
189 payloadHead.compressType = context->transferConfig.compressType;
190 payloadHead.uncompressSize = dataSize;
191 payloadHead.index = index;
192 if (dataSize > 0) {
193 switch (payloadHead.compressType) {
194 #ifdef HARMONY_PROJECT
195 case COMPRESS_LZ4: {
196 sendBuf = new uint8_t[sendBufSize]();
197 if (!sendBuf) {
198 WRITE_LOG(LOG_FATAL, "alloc LZ4 buffer failed");
199 return false;
200 }
201 compressSize = LZ4_compress_default((const char *)data, (char *)sendBuf + payloadPrefixReserve,
202 dataSize, dataSize);
203 break;
204 }
205 #endif
206 default: { // COMPRESS_NONE
207 compressSize = dataSize;
208 break;
209 }
210 }
211 }
212 payloadHead.compressSize = compressSize;
213 head = SerialStruct::SerializeToString(payloadHead);
214 if (head.size() + 1 > payloadPrefixReserve) {
215 goto out;
216 }
217 if (EOK != memcpy_s(sendBuf, sendBufSize, head.c_str(), head.size() + 1)) {
218 goto out;
219 }
220 ret = SendToAnother(commandData, sendBuf, payloadPrefixReserve + compressSize) > 0;
221
222 out:
223 if (dataSize > 0 && payloadHead.compressType == COMPRESS_LZ4) {
224 delete[] sendBuf;
225 }
226 return ret;
227 }
228
OnFileIO(uv_fs_t *req)229 void HdcTransferBase::OnFileIO(uv_fs_t *req)
230 {
231 CtxFileIO *contextIO = reinterpret_cast<CtxFileIO *>(req->data);
232 CtxFile *context = reinterpret_cast<CtxFile *>(contextIO->context);
233 HdcTransferBase *thisClass = (HdcTransferBase *)context->thisClass;
234 uint8_t *bufIO = contextIO->bufIO;
235 StartTraceScope("HdcTransferBase::OnFileIO");
236 uv_fs_req_cleanup(req);
237 while (true) {
238 if (context->ioFinish) {
239 WRITE_LOG(LOG_DEBUG, "OnFileIO finish is true.");
240 break;
241 }
242 if (req->result < 0) {
243 constexpr int bufSize = 1024;
244 char buf[bufSize] = { 0 };
245 uv_strerror_r((int)req->result, buf, bufSize);
246 WRITE_LOG(LOG_DEBUG, "OnFileIO error: %s", buf);
247 context->closeNotify = true;
248 context->lastErrno = abs(req->result);
249 context->ioFinish = true;
250 break;
251 }
252 context->indexIO += req->result;
253 if (req->fs_type == UV_FS_READ) {
254 #ifdef HDC_DEBUG
255 WRITE_LOG(LOG_DEBUG, "read file data %" PRIu64 "/%" PRIu64 "", context->indexIO,
256 context->fileSize);
257 #endif // HDC_DEBUG
258 if (!thisClass->SendIOPayload(context, context->indexIO - req->result, bufIO, req->result)) {
259 context->ioFinish = true;
260 break;
261 }
262 if (req->result == 0) {
263 context->ioFinish = true;
264 WRITE_LOG(LOG_DEBUG, "path:%s fd:%d eof",
265 context->localPath.c_str(), context->openFd);
266 break;
267 }
268 if (context->indexIO < context->fileSize) {
269 thisClass->SimpleFileIO(context, context->indexIO, nullptr, context->isStableBufSize ?
270 (Base::GetMaxBufSizeStable() * thisClass->maxTransferBufFactor) :
271 (Base::GetMaxBufSize() * thisClass->maxTransferBufFactor));
272 } else {
273 context->ioFinish = true;
274 }
275 } else if (req->fs_type == UV_FS_WRITE) { // write
276 #ifdef HDC_DEBUG
277 WRITE_LOG(LOG_DEBUG, "write file data %" PRIu64 "/%" PRIu64 "", context->indexIO,
278 context->fileSize);
279 #endif // HDC_DEBUG
280 if (context->indexIO >= context->fileSize || req->result == 0) {
281 // The active end must first read it first, but you can't make Finish first, because Slave may not
282 // end.Only slave receives complete talents Finish
283 context->closeNotify = true;
284 context->ioFinish = true;
285 thisClass->SetFileTime(context);
286 }
287 } else {
288 context->ioFinish = true;
289 }
290 break;
291 }
292 if (context->ioFinish) {
293 // close-step1
294 ++thisClass->refCount;
295 if (req->fs_type == UV_FS_WRITE) {
296 uv_fs_t req = {};
297 uv_fs_fsync(nullptr, &req, context->openFd, nullptr);
298 uv_fs_req_cleanup(&req);
299 }
300 WRITE_LOG(LOG_DEBUG, "channelId:%u result:%d, closeReqSubmitted:%d",
301 thisClass->taskInfo->channelId, context->openFd, context->closeReqSubmitted);
302 if (context->lastErrno == 0 && !context->closeReqSubmitted) {
303 context->closeReqSubmitted = true;
304 WRITE_LOG(LOG_DEBUG, "OnFileIO fs_close, channelId:%u", thisClass->taskInfo->channelId);
305 uv_fs_t closeReq = {};
306 uv_fs_close(nullptr, &closeReq, context->openFd, nullptr);
307 uv_fs_req_cleanup(&closeReq);
308 OnFileClose(context);
309 context->isFdOpen = false;
310 } else {
311 thisClass->WhenTransferFinish(context);
312 --thisClass->refCount;
313 }
314 }
315 #ifndef CONFIG_USE_JEMALLOC_DFX_INIF
316 thisClass->cirbuf.Free(bufIO - payloadPrefixReserve);
317 #else
318 delete [] (bufIO - payloadPrefixReserve);
319 #endif
320 --thisClass->refCount;
321 delete contextIO; // Req is part of the Contextio structure, no free release
322 }
323
OnFileOpenFailed(CtxFile *context)324 void HdcTransferBase::OnFileOpenFailed(CtxFile *context)
325 {
326 HdcTransferBase *thisClass = (HdcTransferBase *)context->thisClass;
327 if (context->isDir && context->master) {
328 uint8_t payload = 1;
329 thisClass->CommandDispatch(CMD_FILE_FINISH, &payload, 1);
330 } else if (context->isDir && !context->master) {
331 uint8_t payload = 1;
332 thisClass->SendToAnother(CMD_FILE_FINISH, &payload, 1);
333 } else {
334 thisClass->TaskFinish();
335 }
336 return;
337 }
338
OnFileOpen(uv_fs_t *req)339 void HdcTransferBase::OnFileOpen(uv_fs_t *req)
340 {
341 std::unique_ptr<uv_fs_t> uptrReq(req);
342 CtxFile *context = (CtxFile *)req->data;
343 HdcTransferBase *thisClass = (HdcTransferBase *)context->thisClass;
344 StartTraceScope("HdcTransferBase::OnFileOpen");
345 uv_fs_req_cleanup(req);
346 WRITE_LOG(LOG_DEBUG, "Filemod openfile:%s channelId:%u result:%d",
347 context->localPath.c_str(), thisClass->taskInfo->channelId, req->result);
348 --thisClass->refCount;
349 if (req->result <= 0) {
350 constexpr int bufSize = 1024;
351 char buf[bufSize] = { 0 };
352 uv_strerror_r((int)req->result, buf, bufSize);
353 thisClass->LogMsg(MSG_FAIL, "Error opening file: %s, path:%s", buf,
354 context->localPath.c_str());
355 WRITE_LOG(LOG_FATAL, "open path:%s error:%s", context->localPath.c_str(), buf);
356 OnFileOpenFailed(context);
357 return;
358 }
359 thisClass->ResetCtx(context);
360 context->isFdOpen = true;
361 context->openFd = req->result;
362 if (context->master) { // master just read, and slave just write.
363 // init master
364 uv_fs_t fs = {};
365 uv_fs_fstat(nullptr, &fs, context->openFd, nullptr);
366 WRITE_LOG(LOG_DEBUG, "uv_fs_fstat result:%d fileSize:%llu",
367 context->openFd, fs.statbuf.st_size);
368 TransferConfig &st = context->transferConfig;
369 st.fileSize = fs.statbuf.st_size;
370 st.optionalName = context->localName;
371 if (st.holdTimestamp) {
372 st.atime = fs.statbuf.st_atim.tv_sec * HDC_TIME_CONVERT_BASE + fs.statbuf.st_atim.tv_nsec;
373 st.mtime = fs.statbuf.st_mtim.tv_sec * HDC_TIME_CONVERT_BASE + fs.statbuf.st_mtim.tv_nsec;
374 }
375 st.path = context->remotePath;
376 // update ctxNow=context child value
377 context->fileSize = st.fileSize;
378 context->fileMode.perm = fs.statbuf.st_mode;
379 context->fileMode.uId = fs.statbuf.st_uid;
380 context->fileMode.gId = fs.statbuf.st_gid;
381 #if (!(defined(HOST_MINGW)||defined(HOST_MAC))) && defined(SURPPORT_SELINUX)
382 char *con = nullptr;
383 getfilecon(context->localPath.c_str(), &con);
384 if (con != nullptr) {
385 context->fileMode.context = con;
386 freecon(con);
387 }
388 #endif
389 uv_fs_req_cleanup(&fs);
390 thisClass->CheckMaster(context);
391 } else { // write
392 if (context->fileModeSync) {
393 FileMode &mode = context->fileMode;
394 uv_fs_t fs = {};
395 uv_fs_chmod(nullptr, &fs, context->localPath.c_str(), mode.perm, nullptr);
396 uv_fs_chown(nullptr, &fs, context->localPath.c_str(), mode.uId, mode.gId, nullptr);
397 uv_fs_req_cleanup(&fs);
398
399 #if (!(defined(HOST_MINGW)||defined(HOST_MAC))) && defined(SURPPORT_SELINUX)
400 if (!mode.context.empty()) {
401 WRITE_LOG(LOG_DEBUG, "setfilecon from master = %s", mode.context.c_str());
402 setfilecon(context->localPath.c_str(), mode.context.c_str());
403 }
404 #endif
405 }
406 union FeatureFlagsUnion f{};
407 if (!thisClass->AddFeatures(f)) {
408 WRITE_LOG(LOG_FATAL, "AddFeatureFlag failed");
409 thisClass->SendToAnother(thisClass->commandBegin, nullptr, 0);
410 } else {
411 thisClass->SendToAnother(thisClass->commandBegin, f.raw, sizeof(f));
412 }
413 }
414 }
415
MatchPackageExtendName(string fileName, string extName)416 bool HdcTransferBase::MatchPackageExtendName(string fileName, string extName)
417 {
418 bool match = false;
419 int subfixIndex = fileName.rfind(extName);
420 if ((fileName.size() - subfixIndex) != extName.size()) {
421 return false;
422 }
423 match = true;
424 return match;
425 }
426
427 // filter can be empty
GetSubFiles(const char *path, string filter, vector<string> *out)428 int HdcTransferBase::GetSubFiles(const char *path, string filter, vector<string> *out)
429 {
430 int retNum = 0;
431 uv_fs_t req = {};
432 uv_dirent_t dent;
433 vector<string> filterStrings;
434 if (!strlen(path)) {
435 return retNum;
436 }
437 if (filter.size()) {
438 Base::SplitString(filter, ";", filterStrings);
439 }
440
441 if (uv_fs_scandir(nullptr, &req, path, 0, nullptr) < 0) {
442 uv_fs_req_cleanup(&req);
443 return retNum;
444 }
445 while (uv_fs_scandir_next(&req, &dent) != UV_EOF) {
446 // Skip. File
447 if (strcmp(dent.name, ".") == 0 || strcmp(dent.name, "..") == 0) {
448 continue;
449 }
450 if (!(static_cast<uint32_t>(dent.type) & UV_DIRENT_FILE)) {
451 continue;
452 }
453 string fileName = dent.name;
454 for (auto &&s : filterStrings) {
455 int subfixIndex = fileName.rfind(s);
456 if ((fileName.size() - subfixIndex) != s.size())
457 continue;
458 string fullPath = string(path) + Base::GetPathSep();
459 fullPath += fileName;
460 out->push_back(fullPath);
461 ++retNum;
462 }
463 }
464 uv_fs_req_cleanup(&req);
465 return retNum;
466 }
467
468
GetSubFilesRecursively(string path, string currentDirname, vector<string> *out)469 int HdcTransferBase::GetSubFilesRecursively(string path, string currentDirname, vector<string> *out)
470 {
471 int retNum = 0;
472 uv_fs_t req = {};
473 uv_dirent_t dent;
474
475 WRITE_LOG(LOG_DEBUG, "GetSubFiles path = %s currentDirname = %s", path.c_str(), currentDirname.c_str());
476
477 if (!path.size()) {
478 return retNum;
479 }
480
481 if (uv_fs_scandir(nullptr, &req, path.c_str(), 0, nullptr) < 0) {
482 uv_fs_req_cleanup(&req);
483 return retNum;
484 }
485
486 uv_fs_t fs = {};
487 int ret = uv_fs_stat(nullptr, &fs, path.c_str(), nullptr);
488 if (ret == 0) {
489 FileMode mode;
490 mode.fullName = currentDirname;
491 mode.perm = fs.statbuf.st_mode;
492 mode.uId = fs.statbuf.st_uid;
493 mode.gId = fs.statbuf.st_gid;
494
495 #if (!(defined(HOST_MINGW)||defined(HOST_MAC))) && defined(SURPPORT_SELINUX)
496 char *con = nullptr;
497 getfilecon(path.c_str(), &con);
498 if (con != nullptr) {
499 mode.context = con;
500 freecon(con);
501 }
502 #endif
503 ctxNow.dirMode.push_back(mode);
504 }
505 while (uv_fs_scandir_next(&req, &dent) != UV_EOF) {
506 // Skip. File
507 if (strcmp(dent.name, ".") == 0 || strcmp(dent.name, "..") == 0) {
508 continue;
509 }
510 if (!(static_cast<uint32_t>(dent.type) & UV_DIRENT_FILE)) {
511 WRITE_LOG(LOG_DEBUG, "subdir dent.name fileName = %s", dent.name);
512 GetSubFilesRecursively(path + Base::GetPathSep() + dent.name,
513 currentDirname + Base::GetPathSep() + dent.name, out);
514 continue;
515 }
516 string fileName = dent.name;
517 WRITE_LOG(LOG_DEBUG, "GetSubFiles fileName = %s", fileName.c_str());
518
519 out->push_back(currentDirname + Base::GetPathSep() + fileName);
520 }
521 uv_fs_req_cleanup(&req);
522 WRITE_LOG(LOG_DEBUG, "GetSubFiles end.");
523 return retNum;
524 }
525
526
CheckLocalPath(string &localPath, string &optName, string &errStr)527 bool HdcTransferBase::CheckLocalPath(string &localPath, string &optName, string &errStr)
528 {
529 // If optName show this is directory mode, check localPath and try create each layer
530 WRITE_LOG(LOG_DEBUG, "CheckDirectory localPath = %s optName = %s", localPath.c_str(), optName.c_str());
531 if ((optName.find('/') == string::npos) && (optName.find('\\') == string::npos)) {
532 WRITE_LOG(LOG_DEBUG, "Not directory mode optName = %s, return", optName.c_str());
533 return true;
534 }
535 ctxNow.isDir = true;
536 uv_fs_t req;
537 int r = uv_fs_lstat(nullptr, &req, localPath.c_str(), nullptr);
538 mode_t mode = req.statbuf.st_mode;
539 uv_fs_req_cleanup(&req);
540
541 if (r) {
542 vector<string> dirsOflocalPath;
543 string split(1, Base::GetPathSep());
544 Base::SplitString(localPath, split, dirsOflocalPath);
545
546 WRITE_LOG(LOG_DEBUG, "localPath = %s dir layers = %zu", localPath.c_str(), dirsOflocalPath.size());
547 string makedirPath;
548
549 if (!Base::IsAbsolutePath(localPath)) {
550 makedirPath = ".";
551 }
552
553 for (auto dir : dirsOflocalPath) {
554 WRITE_LOG(LOG_DEBUG, "CheckLocalPath create dir = %s", dir.c_str());
555
556 if (dir == ".") {
557 continue;
558 } else {
559 #ifdef _WIN32
560 if (dir.find(":") == 1) {
561 makedirPath = dir;
562 continue;
563 }
564 #endif
565 makedirPath = makedirPath + Base::GetPathSep() + dir;
566 if (!Base::TryCreateDirectory(makedirPath, errStr)) {
567 return false;
568 }
569 }
570 }
571 // set flag to remove first layer directory of filename from master
572 ctxNow.targetDirNotExist = true;
573 } else if (!(mode & S_IFDIR)) {
574 WRITE_LOG(LOG_WARN, "Not a directory, path:%s", localPath.c_str());
575 errStr = "Not a directory, path:";
576 errStr += localPath.c_str();
577 return false;
578 }
579 return true;
580 }
581
CheckFilename(string &localPath, string &optName, string &errStr)582 bool HdcTransferBase::CheckFilename(string &localPath, string &optName, string &errStr)
583 {
584 string localPathBackup = localPath;
585 if (ctxNow.targetDirNotExist) {
586 // If target directory not exist, the first layer directory from master should remove
587 if (optName.find('/') != string::npos) {
588 optName = optName.substr(optName.find('/') + 1);
589 } else if (optName.find('\\') != string::npos) {
590 optName = optName.substr(optName.find('\\') + 1);
591 }
592 }
593 vector<string> dirsOfOptName;
594
595 if (optName.find('/') != string::npos) {
596 Base::SplitString(optName, "/", dirsOfOptName);
597 } else if (optName.find('\\') != string::npos) {
598 Base::SplitString(optName, "\\", dirsOfOptName);
599 } else {
600 WRITE_LOG(LOG_DEBUG, "No need create dir for file = %s", optName.c_str());
601 return true;
602 }
603
604 // If filename still include dir, try create each layer
605 optName = dirsOfOptName.back();
606 dirsOfOptName.pop_back();
607
608 for (auto s : dirsOfOptName) {
609 // Add each layer directory to localPath
610 localPath = localPath + Base::GetPathSep() + s;
611 if (!Base::TryCreateDirectory(localPath, errStr)) {
612 return false;
613 }
614 if (ctxNow.fileModeSync) {
615 string resolvedPath = Base::CanonicalizeSpecPath(localPath);
616 auto pos = resolvedPath.find(localPathBackup);
617 if (pos == 0) {
618 string shortPath = resolvedPath.substr(localPathBackup.size());
619 if (shortPath.at(0) == Base::GetPathSep()) {
620 shortPath = shortPath.substr(1);
621 }
622 WRITE_LOG(LOG_DEBUG, "pos = %zu, shortPath = %s", pos, shortPath.c_str());
623
624 // set mode
625 auto it = ctxNow.dirModeMap.find(shortPath);
626 if (it != ctxNow.dirModeMap.end()) {
627 auto mode = it->second;
628 uv_fs_t fs = {};
629 uv_fs_chmod(nullptr, &fs, localPath.c_str(), mode.perm, nullptr);
630 uv_fs_chown(nullptr, &fs, localPath.c_str(), mode.uId, mode.gId, nullptr);
631 uv_fs_req_cleanup(&fs);
632 #if (!(defined(HOST_MINGW) || defined(HOST_MAC))) && defined(SURPPORT_SELINUX)
633 if (!mode.context.empty()) {
634 WRITE_LOG(LOG_DEBUG, "setfilecon from master = %s", mode.context.c_str());
635 setfilecon(localPath.c_str(), mode.context.c_str());
636 }
637 #endif
638 }
639 }
640 }
641 }
642
643 WRITE_LOG(LOG_DEBUG, "CheckFilename finish localPath:%s optName:%s", localPath.c_str(), optName.c_str());
644 return true;
645 }
646
647 // https://en.cppreference.com/w/cpp/filesystem/is_directory
648 // return true if file exist, false if file not exist
SmartSlavePath(string &cwd, string &localPath, const char *optName)649 bool HdcTransferBase::SmartSlavePath(string &cwd, string &localPath, const char *optName)
650 {
651 string errStr;
652 if (taskInfo->serverOrDaemon) {
653 // slave and server
654 ExtractRelativePath(cwd, localPath);
655 }
656 mode_t mode = mode_t(~S_IFMT);
657 if (Base::CheckDirectoryOrPath(localPath.c_str(), true, false, errStr, mode)) {
658 WRITE_LOG(LOG_DEBUG, "%s", errStr.c_str());
659 return true;
660 }
661
662 uv_fs_t req;
663 int r = uv_fs_lstat(nullptr, &req, localPath.c_str(), nullptr);
664 uv_fs_req_cleanup(&req);
665 if (r == 0 && (req.statbuf.st_mode & S_IFDIR)) { // is dir
666 localPath = localPath + Base::GetPathSep() + optName;
667 }
668 if (r != 0 && (localPath.back() == Base::GetPathSep())) { // not exist and is dir
669 localPath = localPath + optName;
670 }
671 return false;
672 }
673
RecvIOPayload(CtxFile *context, uint8_t *data, int dataSize)674 bool HdcTransferBase::RecvIOPayload(CtxFile *context, uint8_t *data, int dataSize)
675 {
676 if (dataSize < static_cast<int>(payloadPrefixReserve)) {
677 WRITE_LOG(LOG_WARN, "unable to parse TransferPayload: invalid dataSize %d", dataSize);
678 return false;
679 }
680 uint8_t *clearBuf = nullptr;
681 string serialString(reinterpret_cast<char *>(data), payloadPrefixReserve);
682 TransferPayload pld;
683 Base::ZeroStruct(pld);
684 bool ret = false;
685 SerialStruct::ParseFromString(pld, serialString);
686 int clearSize = 0;
687 StartTraceScope("HdcTransferBase::RecvIOPayload");
688 if (pld.compressSize > static_cast<uint32_t>(dataSize) || pld.uncompressSize > MAX_SIZE_IOBUF) {
689 WRITE_LOG(LOG_FATAL, "compress size is greater than the dataSize. pld.compressSize = %d", pld.compressSize);
690 return false;
691 }
692 if (pld.compressSize > 0) {
693 switch (pld.compressType) {
694 #ifdef HARMONY_PROJECT
695 case COMPRESS_LZ4: {
696 clearBuf = new uint8_t[pld.uncompressSize]();
697 if (!clearBuf) {
698 WRITE_LOG(LOG_FATAL, "alloc LZ4 buffer failed");
699 return false;
700 }
701 clearSize = LZ4_decompress_safe((const char *)data + payloadPrefixReserve, (char *)clearBuf,
702 pld.compressSize, pld.uncompressSize);
703 break;
704 }
705 #endif
706 default: { // COMPRESS_NONE
707 clearBuf = data + payloadPrefixReserve;
708 clearSize = pld.compressSize;
709 break;
710 }
711 }
712 }
713 while (true) {
714 if (static_cast<uint32_t>(clearSize) != pld.uncompressSize || dataSize - payloadPrefixReserve < clearSize) {
715 WRITE_LOG(LOG_WARN, "invalid data size for fileIO: %d", clearSize);
716 break;
717 }
718 if (SimpleFileIO(context, pld.index, clearBuf, clearSize) < 0) {
719 break;
720 }
721 ret = true;
722 break;
723 }
724 if (pld.compressSize > 0 && pld.compressType != COMPRESS_NONE) {
725 delete[] clearBuf;
726 }
727 return ret;
728 }
729
CommandDispatch(const uint16_t command, uint8_t *payload, const int payloadSize)730 bool HdcTransferBase::CommandDispatch(const uint16_t command, uint8_t *payload, const int payloadSize)
731 {
732 StartTraceScope("HdcTransferBase::CommandDispatch");
733 bool ret = true;
734 while (true) {
735 if (command == commandBegin) {
736 CtxFile *context = &ctxNow;
737 if (!CheckFeatures(context, payload, payloadSize)) {
738 WRITE_LOG(LOG_FATAL, "CommandDispatch CheckFeatures command:%u", command);
739 ret = false;
740 break;
741 }
742 int ioRet = SimpleFileIO(context, context->indexIO, nullptr, (context->isStableBufSize) ?
743 Base::GetMaxBufSizeStable() * maxTransferBufFactor :
744 Base::GetMaxBufSize() * maxTransferBufFactor);
745 if (ioRet < 0) {
746 WRITE_LOG(LOG_FATAL, "CommandDispatch SimpleFileIO ioRet:%d", ioRet);
747 ret = false;
748 break;
749 }
750 context->transferBegin = Base::GetRuntimeMSec();
751 } else if (command == commandData) {
752 if (static_cast<uint32_t>(payloadSize) > HDC_BUF_MAX_BYTES || payloadSize < 0) {
753 WRITE_LOG(LOG_FATAL, "CommandDispatch payloadSize:%d", payloadSize);
754 ret = false;
755 break;
756 }
757 // Note, I will trigger FileIO after multiple times.
758 CtxFile *context = &ctxNow;
759 if (!RecvIOPayload(context, payload, payloadSize)) {
760 WRITE_LOG(LOG_DEBUG, "RecvIOPayload return false. channelId:%u lastErrno:%u result:%d",
761 taskInfo->channelId, ctxNow.lastErrno, ctxNow.openFd);
762 CloseFd(ctxNow.openFd);
763 ctxNow.isFdOpen = false;
764 HdcTransferBase *thisClass = (HdcTransferBase *)context->thisClass;
765 thisClass->CommandDispatch(CMD_FILE_FINISH, payload, 1);
766 ret = false;
767 break;
768 }
769 } else {
770 // Other subclass commands
771 }
772 break;
773 }
774 return ret;
775 }
776
ExtractRelativePath(string &cwd, string &path)777 void HdcTransferBase::ExtractRelativePath(string &cwd, string &path)
778 {
779 bool absPath = Base::IsAbsolutePath(path);
780 if (!absPath) {
781 path = cwd + path;
782 }
783 }
784
AddFeatures(FeatureFlagsUnion &feature)785 bool HdcTransferBase::AddFeatures(FeatureFlagsUnion &feature)
786 {
787 feature.bits.hugeBuf = !isStableBuf;
788 return true;
789 }
790
CheckFeatures(CtxFile *context, uint8_t *payload, const int payloadSize)791 bool HdcTransferBase::CheckFeatures(CtxFile *context, uint8_t *payload, const int payloadSize)
792 {
793 if (payloadSize == FEATURE_FLAG_MAX_SIZE) {
794 union FeatureFlagsUnion feature{};
795 if (memcpy_s(&feature, sizeof(feature), payload, payloadSize) != EOK) {
796 WRITE_LOG(LOG_FATAL, "CheckFeatures memcpy_s failed");
797 return false;
798 }
799 WRITE_LOG(LOG_DEBUG, "isStableBuf:%d, hugeBuf:%d", isStableBuf, feature.bits.hugeBuf);
800 context->isStableBufSize = isStableBuf ? true : (!feature.bits.hugeBuf);
801 return true;
802 } else if (payloadSize == 0) {
803 WRITE_LOG(LOG_DEBUG, "FileBegin CheckFeatures payloadSize:%d, use default feature.", payloadSize);
804 context->isStableBufSize = true;
805 return true;
806 } else {
807 WRITE_LOG(LOG_FATAL, "CheckFeatures payloadSize:%d", payloadSize);
808 return false;
809 }
810 }
811 } // namespace Hdc
812