1/* 2 * Copyright (C) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15#include "forward.h" 16#include "base.h" 17 18namespace Hdc { 19HdcForwardBase::HdcForwardBase(HTaskInfo hTaskInfo) 20 : HdcTaskBase(hTaskInfo) 21{ 22 fds[0] = -1; 23 fds[1] = -1; 24} 25 26HdcForwardBase::~HdcForwardBase() 27{ 28 WRITE_LOG(LOG_DEBUG, "~HdcForwardBase channelId:%u", taskInfo->channelId); 29}; 30 31bool HdcForwardBase::ReadyForRelease() 32{ 33 if (!HdcTaskBase::ReadyForRelease()) { 34 WRITE_LOG(LOG_WARN, "not ready for release channelId:%u", taskInfo->channelId); 35 return false; 36 } 37 return true; 38} 39 40void HdcForwardBase::StopTask() 41{ 42 ctxPointMutex.lock(); 43 vector<HCtxForward> ctxs; 44 map<uint32_t, HCtxForward>::iterator iter; 45 for (iter = mapCtxPoint.begin(); iter != mapCtxPoint.end(); ++iter) { 46 HCtxForward ctx = iter->second; 47 ctxs.push_back(ctx); 48 } 49 // FREECONTEXT in the STOP is triggered by the other party sector, no longer notifying each other. 50 mapCtxPoint.clear(); 51 ctxPointMutex.unlock(); 52 for (auto ctx: ctxs) { 53 FreeContext(ctx, 0, false); 54 } 55} 56 57void HdcForwardBase::OnAccept(uv_stream_t *server, HCtxForward ctxClient, uv_stream_t *client) 58{ 59 HCtxForward ctxListen = (HCtxForward)server->data; 60 char buf[BUF_SIZE_DEFAULT] = { 0 }; 61 bool ret = false; 62 while (true) { 63 if (uv_accept(server, client)) { 64 WRITE_LOG(LOG_FATAL, "uv_accept id:%u type:%d remoteParamenters:%s", 65 ctxListen->id, ctxListen->type, ctxListen->remoteParamenters.c_str()); 66 break; 67 } 68 ctxClient->type = ctxListen->type; 69 ctxClient->remoteParamenters = ctxListen->remoteParamenters; 70 int maxSize = sizeof(buf) - forwardParameterBufSize; 71 // clang-format off 72 if (snprintf_s(buf + forwardParameterBufSize, maxSize, maxSize - 1, "%s", 73 ctxClient->remoteParamenters.c_str()) < 0) { 74 break; 75 } 76 WRITE_LOG(LOG_DEBUG, "OnAccept id:%u type:%d remoteParamenters:%s", 77 ctxClient->id, ctxClient->type, ctxClient->remoteParamenters.c_str()); 78 SendToTask(ctxClient->id, CMD_FORWARD_ACTIVE_SLAVE, reinterpret_cast<uint8_t *>(buf), 79 strlen(buf + forwardParameterBufSize) + 9); // 9: pre 8bytes preserve for param bits 80 ret = true; 81 break; 82 } 83 if (!ret) { 84 FreeContext(ctxClient, 0, false); 85 } 86} 87 88void HdcForwardBase::ListenCallback(uv_stream_t *server, const int status) 89{ 90 HCtxForward ctxListen = (HCtxForward)server->data; 91 HdcForwardBase *thisClass = ctxListen->thisClass; 92 uv_stream_t *client = nullptr; 93 94 if (status == -1 || !ctxListen->ready) { 95 WRITE_LOG(LOG_FATAL, "ListenCallback status:%d id:%u ready:%d", 96 status, ctxListen->id, ctxListen->ready); 97 thisClass->FreeContext(ctxListen, 0, false); 98 thisClass->TaskFinish(); 99 return; 100 } 101 HCtxForward ctxClient = (HCtxForward)thisClass->MallocContext(true); 102 if (!ctxClient) { 103 return; 104 } 105 if (ctxListen->type == FORWARD_TCP) { 106 uv_tcp_init(ctxClient->thisClass->loopTask, &ctxClient->tcp); 107 client = (uv_stream_t *)&ctxClient->tcp; 108 } else { 109 // FORWARD_ABSTRACT, FORWARD_RESERVED, FORWARD_FILESYSTEM, 110 uv_pipe_init(ctxClient->thisClass->loopTask, &ctxClient->pipe, 0); 111 client = (uv_stream_t *)&ctxClient->pipe; 112 } 113 thisClass->OnAccept(server, ctxClient, client); 114} 115 116void *HdcForwardBase::MallocContext(bool masterSlave) 117{ 118 HCtxForward ctx = nullptr; 119 if ((ctx = new ContextForward()) == nullptr) { 120 return nullptr; 121 } 122 ctx->id = Base::GetRandomU32(); 123 ctx->masterSlave = masterSlave; 124 ctx->thisClass = this; 125 ctx->fdClass = nullptr; 126 ctx->tcp.data = ctx; 127 ctx->pipe.data = ctx; 128 AdminContext(OP_ADD, ctx->id, ctx); 129 refCount++; 130 return ctx; 131} 132 133void HdcForwardBase::FreeContextCallBack(HCtxForward ctx) 134{ 135 Base::DoNextLoop(loopTask, ctx, [this](const uint8_t flag, string &msg, const void *data) { 136 HCtxForward ctx = (HCtxForward)data; 137 AdminContext(OP_REMOVE, ctx->id, nullptr); 138 if (ctx != nullptr) { 139 WRITE_LOG(LOG_DEBUG, "Finally to delete id:%u", ctx->id); 140 delete ctx; 141 ctx = nullptr; 142 } 143 if (refCount > 0) { 144 --refCount; 145 } 146 }); 147} 148 149void HdcForwardBase::FreeJDWP(HCtxForward ctx) 150{ 151 Base::CloseFd(ctx->fd); 152 if (ctx->fdClass) { 153 ctx->fdClass->StopWorkOnThread(false, nullptr); 154 155 auto funcReqClose = [](uv_idle_t *handle) -> void { 156 uv_close_cb funcIdleHandleClose = [](uv_handle_t *handle) -> void { 157 HCtxForward ctx = (HCtxForward)handle->data; 158 ctx->thisClass->FreeContextCallBack(ctx); 159 delete (uv_idle_t *)handle; 160 }; 161 HCtxForward context = (HCtxForward)handle->data; 162 if (context->fdClass->ReadyForRelease()) { 163 delete context->fdClass; 164 context->fdClass = nullptr; 165 Base::TryCloseHandle((uv_handle_t *)handle, funcIdleHandleClose); 166 } 167 }; 168 Base::IdleUvTask(loopTask, ctx, funcReqClose); 169 } 170} 171 172void HdcForwardBase::FreeContext(HCtxForward ctxIn, const uint32_t id, bool bNotifyRemote) 173{ 174 std::lock_guard<std::mutex> lock(ctxFreeMutex); 175 HCtxForward ctx = nullptr; 176 if (!ctxIn) { 177 if (!(ctx = (HCtxForward)AdminContext(OP_QUERY, id, nullptr))) { 178 WRITE_LOG(LOG_DEBUG, "Query id:%u failed", id); 179 return; 180 } 181 } else { 182 ctx = ctxIn; 183 } 184 WRITE_LOG(LOG_DEBUG, "FreeContext id:%u, bNotifyRemote:%d, finish:%d", 185 ctx->id, bNotifyRemote, ctx->finish); 186 if (ctx->finish) { 187 return; 188 } 189 if (bNotifyRemote) { 190 SendToTask(ctx->id, CMD_FORWARD_FREE_CONTEXT, nullptr, 0); 191 } 192 uv_close_cb funcHandleClose = [](uv_handle_t *handle) -> void { 193 HCtxForward ctx = (HCtxForward)handle->data; 194 ctx->thisClass->FreeContextCallBack(ctx); 195 }; 196 switch (ctx->type) { 197 case FORWARD_TCP: 198 case FORWARD_JDWP: 199 case FORWARD_ARK: 200 Base::TryCloseHandle((uv_handle_t *)&ctx->tcp, true, funcHandleClose); 201 break; 202 case FORWARD_ABSTRACT: 203 case FORWARD_RESERVED: 204 case FORWARD_FILESYSTEM: 205 Base::TryCloseHandle((uv_handle_t *)&ctx->pipe, true, funcHandleClose); 206 break; 207 case FORWARD_DEVICE: { 208 FreeJDWP(ctx); 209 break; 210 } 211 default: 212 break; 213 } 214 ctx->finish = true; 215} 216 217bool HdcForwardBase::SendToTask(const uint32_t cid, const uint16_t command, uint8_t *bufPtr, const int bufSize) 218{ 219 StartTraceScope("HdcForwardBase::SendToTask"); 220 bool ret = false; 221 // usually MAX_SIZE_IOBUF*2 from HdcFileDescriptor maxIO 222 if (bufSize > Base::GetMaxBufSizeStable() * BUF_MULTIPLE) { 223 WRITE_LOG(LOG_FATAL, "SendToTask bufSize:%d", bufSize); 224 return false; 225 } 226 auto newBuf = new uint8_t[bufSize + BUF_EXTEND_SIZE]; 227 if (!newBuf) { 228 return false; 229 } 230 *reinterpret_cast<uint32_t *>(newBuf) = htonl(cid); 231 if (bufSize > 0 && bufPtr != nullptr && memcpy_s(newBuf + BUF_EXTEND_SIZE, bufSize, bufPtr, bufSize) != EOK) { 232 delete[] newBuf; 233 return false; 234 } 235 ret = SendToAnother(command, newBuf, bufSize + BUF_EXTEND_SIZE); 236 delete[] newBuf; 237 return ret; 238} 239 240// Forward flow is small and frequency is fast 241void HdcForwardBase::AllocForwardBuf(uv_handle_t *handle, size_t sizeSuggested, uv_buf_t *buf) 242{ 243 size_t size = sizeSuggested; 244 if (size > MAX_USBFFS_BULK) { 245 size = MAX_USBFFS_BULK; 246 } 247 buf->base = (char *)new char[size]; 248 if (buf->base) { 249 buf->len = (size > 0) ? (size - 1) : 0; 250 } else { 251 WRITE_LOG(LOG_WARN, "AllocForwardBuf == null"); 252 } 253} 254 255void HdcForwardBase::ReadForwardBuf(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) 256{ 257 HCtxForward ctx = (HCtxForward)stream->data; 258 if (nread < 0) { 259 WRITE_LOG(LOG_INFO, "ReadForwardBuf nread:%zd id:%u", nread, ctx->id); 260 ctx->thisClass->FreeContext(ctx, 0, true); 261 delete[] buf->base; 262 return; 263 } 264 if (nread == 0) { 265 WRITE_LOG(LOG_INFO, "ReadForwardBuf nread:0 id:%u", ctx->id); 266 delete[] buf->base; 267 return; 268 } 269 ctx->thisClass->SendToTask(ctx->id, CMD_FORWARD_DATA, (uint8_t *)buf->base, nread); 270 // clear 271 delete[] buf->base; 272} 273 274void HdcForwardBase::ConnectTarget(uv_connect_t *connection, int status) 275{ 276 HCtxForward ctx = (HCtxForward)connection->data; 277 HdcForwardBase *thisClass = ctx->thisClass; 278 delete connection; 279 if (status < 0) { 280 constexpr int bufSize = 1024; 281 char buf[bufSize] = { 0 }; 282 uv_err_name_r(status, buf, bufSize); 283 WRITE_LOG(LOG_WARN, "Forward connect result:%d error:%s", status, buf); 284 } 285 thisClass->SetupPointContinue(ctx, status); 286} 287 288bool HdcForwardBase::CheckNodeInfo(const char *nodeInfo, string as[2]) 289{ 290 string str = nodeInfo; 291 size_t strLen = str.size(); 292 if (strLen < 1) { 293 return false; 294 } 295 size_t pos = str.find(':'); 296 if (pos != string::npos) { 297 if (pos == 0 || pos == strLen - 1) { 298 return false; 299 } 300 as[0] = str.substr(0, pos); 301 as[1] = str.substr(pos + 1); 302 } else { 303 return false; 304 } 305 if (as[0] == "tcp") { 306 if (as[1].size() > std::to_string(MAX_IP_PORT).size()) { 307 return false; 308 } 309 int port = atoi(as[1].c_str()); 310 if (port <= 0 || port > MAX_IP_PORT) { 311 return false; 312 } 313 } 314 return true; 315} 316 317bool HdcForwardBase::SetupPointContinue(HCtxForward ctx, int status) 318{ 319 if (ctx->checkPoint) { 320 // send to active 321 uint8_t flag = status > 0; 322 SendToTask(ctx->id, CMD_FORWARD_CHECK_RESULT, &flag, 1); 323 FreeContext(ctx, 0, false); 324 return true; 325 } 326 if (status < 0) { 327 FreeContext(ctx, 0, true); 328 return false; 329 } 330 // send to active 331 if (!SendToTask(ctx->id, CMD_FORWARD_ACTIVE_MASTER, nullptr, 0)) { 332 WRITE_LOG(LOG_FATAL, "SetupPointContinue SendToTask failed id:%u", ctx->id); 333 FreeContext(ctx, 0, true); 334 return false; 335 } 336 return DoForwardBegin(ctx); 337} 338 339bool HdcForwardBase::DetechForwardType(HCtxForward ctxPoint) 340{ 341 string &sFType = ctxPoint->localArgs[0]; 342 string &sNodeCfg = ctxPoint->localArgs[1]; 343 // string to enum 344 if (sFType == "tcp") { 345 ctxPoint->type = FORWARD_TCP; 346 } else if (sFType == "dev") { 347 ctxPoint->type = FORWARD_DEVICE; 348 } else if (sFType == "localabstract") { 349 // daemon shell: /system/bin/socat abstract-listen:linux-abstract - 350 // daemon shell: /system/bin/socat - abstract-connect:linux-abstract 351 // host: hdc fport tcp:8080 localabstract:linux-abstract 352 ctxPoint->type = FORWARD_ABSTRACT; 353 } else if (sFType == "localreserved") { 354 sNodeCfg = harmonyReservedSocketPrefix + sNodeCfg; 355 ctxPoint->type = FORWARD_RESERVED; 356 } else if (sFType == "localfilesystem") { 357 sNodeCfg = filesystemSocketPrefix + sNodeCfg; 358 ctxPoint->type = FORWARD_FILESYSTEM; 359 } else if (sFType == "jdwp") { 360 ctxPoint->type = FORWARD_JDWP; 361 } else if (sFType == "ark") { 362 ctxPoint->type = FORWARD_ARK; 363 } else { 364 return false; 365 } 366 return true; 367} 368 369bool HdcForwardBase::SetupTCPPoint(HCtxForward ctxPoint) 370{ 371 string &sNodeCfg = ctxPoint->localArgs[1]; 372 int port = atoi(sNodeCfg.c_str()); 373 ctxPoint->tcp.data = ctxPoint; 374 uv_tcp_init(loopTask, &ctxPoint->tcp); 375 struct sockaddr_in addr; 376 if (ctxPoint->masterSlave) { 377 uv_ip4_addr("127.0.0.1", port, &addr); // loop interface 378 uv_tcp_bind(&ctxPoint->tcp, (const struct sockaddr *)&addr, 0); 379 if (uv_listen((uv_stream_t *)&ctxPoint->tcp, UV_LISTEN_LBACKOG, ListenCallback)) { 380 ctxPoint->lastError = "TCP Port listen failed at " + sNodeCfg; 381 return false; 382 } 383 } else { 384 uv_ip4_addr("127.0.0.1", port, &addr); // loop interface 385 uv_connect_t *conn = new(std::nothrow) uv_connect_t(); 386 if (conn == nullptr) { 387 WRITE_LOG(LOG_FATAL, "SetupTCPPoint new conn failed"); 388 return false; 389 } 390 conn->data = ctxPoint; 391 uv_tcp_connect(conn, (uv_tcp_t *)&ctxPoint->tcp, (const struct sockaddr *)&addr, ConnectTarget); 392 } 393 return true; 394} 395 396bool HdcForwardBase::SetupDevicePoint(HCtxForward ctxPoint) 397{ 398 uint8_t flag = 1; 399 string &sNodeCfg = ctxPoint->localArgs[1]; 400 string resolvedPath = Base::CanonicalizeSpecPath(sNodeCfg); 401 if ((ctxPoint->fd = open(resolvedPath.c_str(), O_RDWR)) < 0) { 402 ctxPoint->lastError = "Open unix-dev failed"; 403 flag = -1; 404 } 405 auto funcRead = [&](const void *a, uint8_t *b, const int c) -> bool { 406 HCtxForward ctx = (HCtxForward)a; 407 return SendToTask(ctx->id, CMD_FORWARD_DATA, b, c); 408 }; 409 auto funcFinish = [&](const void *a, const bool b, const string c) -> bool { 410 HCtxForward ctx = (HCtxForward)a; 411 WRITE_LOG(LOG_DEBUG, "funcFinish id:%u ret:%d reason:%s", ctx->id, b, c.c_str()); 412 FreeContext(ctx, 0, true); 413 return false; 414 }; 415 ctxPoint->fdClass = new(std::nothrow) HdcFileDescriptor(loopTask, ctxPoint->fd, ctxPoint, funcRead, 416 funcFinish, true); 417 if (ctxPoint->fdClass == nullptr) { 418 WRITE_LOG(LOG_FATAL, "SetupDevicePoint new ctxPoint->fdClass failed"); 419 return false; 420 } 421 SetupPointContinue(ctxPoint, flag); 422 return true; 423} 424 425bool HdcForwardBase::LocalAbstractConnect(uv_pipe_t *pipe, string &sNodeCfg) 426{ 427 bool abstractRet = false; 428#ifndef _WIN32 429 int s = 0; 430 do { 431 if ((s = socket(AF_LOCAL, SOCK_STREAM, 0)) < 0) { 432 break; 433 } 434 fcntl(s, F_SETFD, FD_CLOEXEC); 435 struct sockaddr_un addr; 436 Base::ZeroStruct(addr); 437 int addrLen = sNodeCfg.size() + offsetof(struct sockaddr_un, sun_path) + 1; 438 addr.sun_family = AF_LOCAL; 439 addr.sun_path[0] = 0; 440 441 if (memcpy_s(addr.sun_path + 1, sizeof(addr.sun_path) - 1, sNodeCfg.c_str(), sNodeCfg.size()) != EOK) { 442 break; 443 }; 444 struct timeval timeout = { 3, 0 }; 445 setsockopt(s, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout)); 446 if (connect(s, reinterpret_cast<struct sockaddr *>(&addr), addrLen) < 0) { 447 WRITE_LOG(LOG_FATAL, "LocalAbstractConnect failed errno:%d", errno); 448 break; 449 } 450 if (uv_pipe_open(pipe, s)) { 451 break; 452 } 453 abstractRet = true; 454 } while (false); 455 if (!abstractRet) { 456 Base::CloseFd(s); 457 } 458#endif 459 return abstractRet; 460} 461 462bool HdcForwardBase::SetupFilePoint(HCtxForward ctxPoint) 463{ 464 string &sNodeCfg = ctxPoint->localArgs[1]; 465 ctxPoint->pipe.data = ctxPoint; 466 uv_pipe_init(loopTask, &ctxPoint->pipe, 0); 467 if (ctxPoint->masterSlave) { 468 if (ctxPoint->type == FORWARD_RESERVED || ctxPoint->type == FORWARD_FILESYSTEM) { 469 unlink(sNodeCfg.c_str()); 470 } 471 if (uv_pipe_bind(&ctxPoint->pipe, sNodeCfg.c_str())) { 472 ctxPoint->lastError = "Unix pipe bind failed"; 473 return false; 474 } 475 if (uv_listen((uv_stream_t *)&ctxPoint->pipe, UV_LISTEN_LBACKOG, ListenCallback)) { 476 ctxPoint->lastError = "Unix pipe listen failed"; 477 return false; 478 } 479 } else { 480 if (ctxPoint->type == FORWARD_ABSTRACT) { 481 bool abstractRet = LocalAbstractConnect(&ctxPoint->pipe, sNodeCfg); 482 SetupPointContinue(ctxPoint, abstractRet ? 0 : -1); 483 if (!abstractRet) { 484 ctxPoint->lastError = "LocalAbstractConnect failed"; 485 return false; 486 } 487 } else { 488 uv_connect_t *connect = new(std::nothrow) uv_connect_t(); 489 if (connect == nullptr) { 490 WRITE_LOG(LOG_FATAL, "SetupFilePoint new connect failed"); 491 return false; 492 } 493 connect->data = ctxPoint; 494 uv_pipe_connect(connect, &ctxPoint->pipe, sNodeCfg.c_str(), ConnectTarget); 495 } 496 } 497 return true; 498} 499 500bool HdcForwardBase::SetupPoint(HCtxForward ctxPoint) 501{ 502 bool ret = true; 503 if (!DetechForwardType(ctxPoint)) { 504 return false; 505 } 506 switch (ctxPoint->type) { 507 case FORWARD_TCP: 508 if (!SetupTCPPoint(ctxPoint)) { 509 ret = false; 510 }; 511 break; 512#ifndef _WIN32 513 case FORWARD_DEVICE: 514 if (!SetupDevicePoint(ctxPoint)) { 515 ret = false; 516 }; 517 break; 518 case FORWARD_JDWP: 519 if (!SetupJdwpPoint(ctxPoint)) { 520 ret = false; 521 }; 522 break; 523 case FORWARD_ABSTRACT: 524 case FORWARD_RESERVED: 525 case FORWARD_FILESYSTEM: 526 if (!SetupFilePoint(ctxPoint)) { 527 ret = false; 528 }; 529 break; 530#else 531 case FORWARD_DEVICE: 532 case FORWARD_JDWP: 533 case FORWARD_ABSTRACT: 534 case FORWARD_RESERVED: 535 case FORWARD_FILESYSTEM: 536 ctxPoint->lastError = "Not supoort forward-type"; 537 ret = false; 538 break; 539#endif 540 default: 541 ctxPoint->lastError = "Not supoort forward-type"; 542 ret = false; 543 break; 544 } 545 return ret; 546} 547 548bool HdcForwardBase::BeginForward(const string &command, string &sError) 549{ 550 bool ret = false; 551 int argc = 0; 552 char bufString[BUF_SIZE_SMALL] = ""; 553 HCtxForward ctxPoint = (HCtxForward)MallocContext(true); 554 if (!ctxPoint) { 555 WRITE_LOG(LOG_FATAL, "MallocContext failed"); 556 return false; 557 } 558 char **argv = Base::SplitCommandToArgs(command.c_str(), &argc); 559 if (argv == nullptr) { 560 WRITE_LOG(LOG_FATAL, "SplitCommandToArgs failed"); 561 return false; 562 } 563 while (true) { 564 if (argc < CMD_ARG1_COUNT) { 565 break; 566 } 567 if (strlen(argv[0]) > BUF_SIZE_SMALL || strlen(argv[1]) > BUF_SIZE_SMALL) { 568 break; 569 } 570 if (!CheckNodeInfo(argv[0], ctxPoint->localArgs)) { 571 break; 572 } 573 if (!CheckNodeInfo(argv[1], ctxPoint->remoteArgs)) { 574 break; 575 } 576 ctxPoint->remoteParamenters = argv[1]; 577 if (!SetupPoint(ctxPoint)) { 578 break; 579 } 580 581 ret = true; 582 break; 583 } 584 sError = ctxPoint->lastError; 585 if (ret) { 586 // First 8-byte parameter bit 587 int maxBufSize = sizeof(bufString) - forwardParameterBufSize; 588 if (snprintf_s(bufString + forwardParameterBufSize, maxBufSize, maxBufSize - 1, "%s", argv[1]) > 0) { 589 SendToTask(ctxPoint->id, CMD_FORWARD_CHECK, reinterpret_cast<uint8_t *>(bufString), 590 forwardParameterBufSize + strlen(bufString + forwardParameterBufSize) + 1); 591 taskCommand = command; 592 } 593 } 594 delete[](reinterpret_cast<char *>(argv)); 595 return ret; 596} 597 598inline bool HdcForwardBase::FilterCommand(uint8_t *bufCmdIn, uint32_t *idOut, uint8_t **pContentBuf) 599{ 600 *pContentBuf = bufCmdIn + DWORD_SERIALIZE_SIZE; 601 *idOut = ntohl(*reinterpret_cast<uint32_t *>(bufCmdIn)); 602 return true; 603} 604 605bool HdcForwardBase::SlaveConnect(uint8_t *bufCmd, const int bufSize, bool bCheckPoint, string &sError) 606{ 607 if (bufSize <= DWORD_SERIALIZE_SIZE + forwardParameterBufSize) { 608 WRITE_LOG(LOG_FATAL, "Illegal payloadSize, shorter than forward header"); 609 return false; 610 } 611 bool ret = false; 612 char *content = nullptr; 613 uint32_t idSlaveOld = 0; 614 HCtxForward ctxPoint = (HCtxForward)MallocContext(false); 615 if (!ctxPoint) { 616 WRITE_LOG(LOG_FATAL, "MallocContext failed"); 617 return false; 618 } 619 idSlaveOld = ctxPoint->id; 620 ctxPoint->checkPoint = bCheckPoint; 621 // refresh another id,8byte param 622 FilterCommand(bufCmd, &ctxPoint->id, reinterpret_cast<uint8_t **>(&content)); 623 AdminContext(OP_UPDATE, idSlaveOld, ctxPoint); 624 content += forwardParameterBufSize; 625 if (!CheckNodeInfo(content, ctxPoint->localArgs)) { 626 WRITE_LOG(LOG_FATAL, "SlaveConnect CheckNodeInfo failed content:%s", content); 627 goto Finish; 628 } 629 if (!DetechForwardType(ctxPoint)) { 630 WRITE_LOG(LOG_FATAL, "SlaveConnect DetechForwardType failed content:%s", content); 631 goto Finish; 632 } 633 WRITE_LOG(LOG_DEBUG, "id:%u type:%d", ctxPoint->id, ctxPoint->type); 634 if (ctxPoint->type == FORWARD_ARK) { 635 if (ctxPoint->checkPoint) { 636 if (!SetupArkPoint(ctxPoint)) { 637 sError = ctxPoint->lastError; 638 WRITE_LOG(LOG_FATAL, "SlaveConnect SetupArkPoint failed content:%s", content); 639 goto Finish; 640 } 641 } else { 642 SetupPointContinue(ctxPoint, 0); 643 } 644 ret = true; 645 } else { 646 if (!ctxPoint->checkPoint) { 647 if (!SetupPoint(ctxPoint)) { 648 sError = ctxPoint->lastError; 649 WRITE_LOG(LOG_FATAL, "SlaveConnect SetupPoint failed content:%s", content); 650 goto Finish; 651 } 652 } else { 653 SetupPointContinue(ctxPoint, 0); 654 } 655 ret = true; 656 } 657Finish: 658 if (!ret) { 659 FreeContext(ctxPoint, 0, true); 660 } 661 return ret; 662} 663 664bool HdcForwardBase::DoForwardBegin(HCtxForward ctx) 665{ 666 switch (ctx->type) { 667 case FORWARD_TCP: 668 case FORWARD_JDWP: // jdwp use tcp ->socketpair->jvm 669 uv_tcp_nodelay((uv_tcp_t *)&ctx->tcp, 1); 670 uv_read_start((uv_stream_t *)&ctx->tcp, AllocForwardBuf, ReadForwardBuf); 671 break; 672 case FORWARD_ARK: 673 WRITE_LOG(LOG_DEBUG, "DoForwardBegin ark socketpair id:%u fds[0]:%d", ctx->id, fds[0]); 674 uv_tcp_init(loopTask, &ctx->tcp); 675 uv_tcp_open(&ctx->tcp, fds[0]); 676 uv_tcp_nodelay((uv_tcp_t *)&ctx->tcp, 1); 677 uv_read_start((uv_stream_t *)&ctx->tcp, AllocForwardBuf, ReadForwardBuf); 678 break; 679 case FORWARD_ABSTRACT: 680 case FORWARD_RESERVED: 681 case FORWARD_FILESYSTEM: 682 uv_read_start((uv_stream_t *)&ctx->pipe, AllocForwardBuf, ReadForwardBuf); 683 break; 684 case FORWARD_DEVICE: { 685 ctx->fdClass->StartWorkOnThread(); 686 break; 687 } 688 default: 689 break; 690 } 691 ctx->ready = true; 692 return true; 693} 694 695void *HdcForwardBase::AdminContext(const uint8_t op, const uint32_t id, HCtxForward hInput) 696{ 697 ctxPointMutex.lock(); 698 void *hRet = nullptr; 699 map<uint32_t, HCtxForward> &mapCtx = mapCtxPoint; 700 switch (op) { 701 case OP_ADD: 702 mapCtx[id] = hInput; 703 break; 704 case OP_REMOVE: 705 mapCtx.erase(id); 706 break; 707 case OP_QUERY: 708 if (mapCtx.count(id)) { 709 hRet = mapCtx[id]; 710 } 711 break; 712 case OP_UPDATE: 713 mapCtx.erase(id); 714 mapCtx[hInput->id] = hInput; 715 break; 716 default: 717 break; 718 } 719 ctxPointMutex.unlock(); 720 return hRet; 721} 722 723void HdcForwardBase::SendCallbackForwardBuf(uv_write_t *req, int status) 724{ 725 ContextForwardIO *ctxIO = (ContextForwardIO *)req->data; 726 HCtxForward ctx = reinterpret_cast<HCtxForward>(ctxIO->ctxForward); 727 if (status < 0 && !ctx->finish) { 728 WRITE_LOG(LOG_DEBUG, "SendCallbackForwardBuf ctx->type:%d, status:%d finish", ctx->type, status); 729 ctx->thisClass->FreeContext(ctx, 0, true); 730 } 731 delete[] ctxIO->bufIO; 732 delete ctxIO; 733 delete req; 734} 735 736int HdcForwardBase::SendForwardBuf(HCtxForward ctx, uint8_t *bufPtr, const int size) 737{ 738 int nRet = 0; 739 if (size > static_cast<int>(HDC_BUF_MAX_BYTES - 1)) { 740 WRITE_LOG(LOG_WARN, "SendForwardBuf size:%d > HDC_BUF_MAX_BYTES, ctxId:%u", size, ctx->id); 741 return -1; 742 } 743 if (size <= 0) { 744 WRITE_LOG(LOG_WARN, "SendForwardBuf failed size:%d, ctxId:%u", size, ctx->id); 745 return -1; 746 } 747 auto pDynBuf = new(std::nothrow) uint8_t[size]; 748 if (!pDynBuf) { 749 WRITE_LOG(LOG_WARN, "SendForwardBuf new DynBuf failed size:%d, ctxId:%u", size, ctx->id); 750 return -1; 751 } 752 (void)memcpy_s(pDynBuf, size, bufPtr, size); 753 if (ctx->type == FORWARD_DEVICE) { 754 nRet = ctx->fdClass->WriteWithMem(pDynBuf, size); 755 } else { 756 auto ctxIO = new ContextForwardIO(); 757 if (!ctxIO) { 758 WRITE_LOG(LOG_WARN, "SendForwardBuf new ContextForwardIO failed, ctxId:%u", ctx->id); 759 delete[] pDynBuf; 760 return -1; 761 } 762 ctxIO->ctxForward = ctx; 763 ctxIO->bufIO = pDynBuf; 764 if (ctx->type == FORWARD_TCP || ctx->type == FORWARD_JDWP || ctx->type == FORWARD_ARK) { 765 nRet = Base::SendToStreamEx((uv_stream_t *)&ctx->tcp, pDynBuf, size, nullptr, 766 (void *)SendCallbackForwardBuf, (void *)ctxIO); 767 } else { 768 // FORWARD_ABSTRACT, FORWARD_RESERVED, FORWARD_FILESYSTEM, 769 nRet = Base::SendToStreamEx((uv_stream_t *)&ctx->pipe, pDynBuf, size, nullptr, 770 (void *)SendCallbackForwardBuf, (void *)ctxIO); 771 } 772 if (nRet < 0) { 773 WRITE_LOG(LOG_WARN, "SendForwardBuf SendToStreamEx ret:%d, size:%d ctxId:%u type:%d", 774 nRet, size, ctx->id, ctx->type); 775 } 776 } 777 return nRet; 778} 779 780bool HdcForwardBase::CommandForwardCheckResult(HCtxForward ctx, uint8_t *payload) 781{ 782 bool ret = true; 783 bool bCheck = static_cast<bool>(payload); 784 LogMsg(bCheck ? MSG_OK : MSG_FAIL, "Forwardport result:%s", bCheck ? "OK" : "Failed"); 785 if (bCheck) { 786 string mapInfo = taskInfo->serverOrDaemon ? "1|" : "0|"; 787 mapInfo += taskCommand; 788 ctx->ready = true; 789 ServerCommand(CMD_FORWARD_SUCCESS, reinterpret_cast<uint8_t *>(const_cast<char *>(mapInfo.c_str())), 790 mapInfo.size() + 1); 791 } else { 792 ret = false; 793 FreeContext(ctx, 0, false); 794 } 795 return ret; 796} 797 798bool HdcForwardBase::ForwardCommandDispatch(const uint16_t command, uint8_t *payload, const int payloadSize) 799{ 800 if (payloadSize <= DWORD_SERIALIZE_SIZE && command != CMD_FORWARD_FREE_CONTEXT 801 && command != CMD_FORWARD_ACTIVE_MASTER) { 802 WRITE_LOG(LOG_FATAL, "Illegal payloadSize, shorter than forward command header"); 803 return false; 804 } 805 bool ret = true; 806 uint8_t *pContent = nullptr; 807 int sizeContent = 0; 808 uint32_t id = 0; 809 HCtxForward ctx = nullptr; 810 FilterCommand(payload, &id, &pContent); 811 sizeContent = payloadSize - DWORD_SERIALIZE_SIZE; 812 if (!(ctx = (HCtxForward)AdminContext(OP_QUERY, id, nullptr))) { 813 WRITE_LOG(LOG_WARN, "Query id:%u failed", id); 814 return true; 815 } 816 switch (command) { 817 case CMD_FORWARD_CHECK_RESULT: { 818 ret = CommandForwardCheckResult(ctx, pContent); 819 break; 820 } 821 case CMD_FORWARD_ACTIVE_MASTER: { 822 ret = DoForwardBegin(ctx); 823 break; 824 } 825 case CMD_FORWARD_DATA: { 826 if (ctx->finish) { 827 break; 828 } 829 if (SendForwardBuf(ctx, pContent, sizeContent) < 0) { 830 WRITE_LOG(LOG_WARN, "ForwardCommandDispatch SendForwardBuf rc < 0, ctxid:%u", ctx->id); 831 FreeContext(ctx, 0, true); 832 } 833 break; 834 } 835 case CMD_FORWARD_FREE_CONTEXT: { 836 FreeContext(ctx, 0, false); 837 break; 838 } 839 default: 840 ret = false; 841 break; 842 } 843 if (!ret) { 844 if (ctx) { 845 FreeContext(ctx, 0, true); 846 } else { 847 WRITE_LOG(LOG_DEBUG, "ctx==nullptr raw free"); 848 TaskFinish(); 849 } 850 } 851 return ret; 852} 853 854bool HdcForwardBase::CommandDispatch(const uint16_t command, uint8_t *payload, const int payloadSize) 855{ 856 if (command != CMD_FORWARD_DATA) { 857 WRITE_LOG(LOG_WARN, "CommandDispatch command:%d payloadSize:%d", command, payloadSize); 858 } 859 bool ret = true; 860 string sError; 861 // prepare 862 if (command == CMD_FORWARD_INIT) { 863 string strCommand(reinterpret_cast<char *>(payload), payloadSize); 864 if (!BeginForward(strCommand, sError)) { 865 ret = false; 866 goto Finish; 867 } 868 return true; 869 } else if (command == CMD_FORWARD_CHECK) { 870 // Detect remote if it's reachable 871 if (!SlaveConnect(payload, payloadSize, true, sError)) { 872 ret = false; 873 goto Finish; 874 } 875 return true; 876 } else if (command == CMD_FORWARD_ACTIVE_SLAVE) { 877 // slave connect target port when activating 878 if (!SlaveConnect(payload, payloadSize, false, sError)) { 879 ret = false; 880 goto Finish; 881 } 882 return true; 883 } 884 if (!ForwardCommandDispatch(command, payload, payloadSize)) { 885 ret = false; 886 goto Finish; 887 } 888Finish: 889 if (!ret) { 890 if (!sError.size()) { 891 LogMsg(MSG_FAIL, "Forward parament failed"); 892 } else { 893 LogMsg(MSG_FAIL, const_cast<char *>(sError.c_str())); 894 WRITE_LOG(LOG_WARN, const_cast<char *>(sError.c_str())); 895 } 896 } 897 return ret; 898} 899} // namespace Hdc 900