1/* 2 * Copyright (C) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15#include "channel.h" 16namespace Hdc { 17HdcChannelBase::HdcChannelBase(const bool serverOrClient, const string &addrString, uv_loop_t *loopMainIn) 18{ 19 SetChannelTCPString(addrString); 20 isServerOrClient = serverOrClient; 21 loopMain = loopMainIn; 22 threadChanneMain = uv_thread_self(); 23 uv_rwlock_init(&mainAsync); 24 uv_async_init(loopMain, &asyncMainLoop, MainAsyncCallback); 25 uv_rwlock_init(&lockMapChannel); 26} 27 28HdcChannelBase::~HdcChannelBase() 29{ 30 ClearChannels(); 31 // clear 32 if (!uv_is_closing((uv_handle_t *)&asyncMainLoop)) { 33 uv_close((uv_handle_t *)&asyncMainLoop, nullptr); 34 } 35 36 uv_rwlock_destroy(&mainAsync); 37 uv_rwlock_destroy(&lockMapChannel); 38} 39 40vector<uint8_t> HdcChannelBase::GetChannelHandshake(string &connectKey) const 41{ 42 vector<uint8_t> ret; 43 struct ChannelHandShake handshake = {}; 44 if (strcpy_s(handshake.banner, sizeof(handshake.banner), HANDSHAKE_MESSAGE.c_str()) != EOK) { 45 return ret; 46 } 47 if (strcpy_s(handshake.connectKey, sizeof(handshake.connectKey), connectKey.c_str()) != EOK) { 48 return ret; 49 } 50 ret.insert(ret.begin(), (uint8_t *)&handshake, (uint8_t *)&handshake + sizeof(ChannelHandShake)); 51 return ret; 52} 53 54bool HdcChannelBase::SetChannelTCPString(const string &addrString) 55{ 56 bool ret = false; 57 while (true) { 58 if (addrString.find(":") == string::npos) { 59 break; 60 } 61 std::size_t found = addrString.find_last_of(":"); 62 if (found == string::npos) { 63 break; 64 } 65 66 string host = addrString.substr(0, found); 67 string port = addrString.substr(found + 1); 68 69 channelPort = std::atoi(port.c_str()); 70 sockaddr_in addrv4; 71 sockaddr_in6 addrv6; 72 if (!channelPort) { 73 break; 74 } 75 76 if (uv_ip6_addr(host.c_str(), channelPort, &addrv6) != 0 && 77 uv_ip4_addr(host.c_str(), channelPort, &addrv4) != 0) { 78 break; 79 } 80 channelHost = host; 81 channelHostPort = addrString; 82 ret = true; 83 break; 84 } 85 if (!ret) { 86 channelPort = 0; 87 channelHost = STRING_EMPTY; 88 channelHostPort = STRING_EMPTY; 89 } 90 return ret; 91} 92 93void HdcChannelBase::ClearChannels() 94{ 95 for (auto v : mapChannel) { 96 HChannel hChannel = (HChannel)v.second; 97 if (!hChannel->isDead) { 98 FreeChannel(hChannel->channelId); 99 } 100 } 101} 102 103void HdcChannelBase::WorkerPendding() 104{ 105 WRITE_LOG(LOG_DEBUG, "Begin host channel pendding"); 106 uv_run(loopMain, UV_RUN_DEFAULT); 107 uv_loop_close(loopMain); 108} 109 110void HdcChannelBase::ReadStream(uv_stream_t *tcp, ssize_t nread, const uv_buf_t *buf) 111{ 112 StartTraceScope("HdcChannelBase::ReadStream"); 113 int size = 0; 114 int indexBuf = 0; 115 int childRet = 0; 116 bool needExit = false; 117 HChannel hChannel = (HChannel)tcp->data; 118 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel; 119 uint32_t channelId = hChannel->channelId; 120 121 if (nread == UV_ENOBUFS) { 122 WRITE_LOG(LOG_FATAL, "ReadStream nobufs channelId:%u", channelId); 123 return; 124 } else if (nread == 0) { 125 // maybe just after accept, second client req 126 WRITE_LOG(LOG_DEBUG, "ReadStream idle read channelId:%u", channelId); 127 return; 128 } else if (nread < 0) { 129 Base::TryCloseHandle((uv_handle_t *)tcp); 130 constexpr int bufSize = 1024; 131 char buffer[bufSize] = { 0 }; 132 uv_err_name_r(nread, buffer, bufSize); 133 WRITE_LOG(LOG_DEBUG, "ReadStream channelId:%u failed:%s", channelId, buffer); 134 needExit = true; 135 goto Finish; 136 } else { 137 hChannel->availTailIndex += nread; 138 } 139 while (hChannel->availTailIndex > DWORD_SERIALIZE_SIZE) { 140 size = ntohl(*reinterpret_cast<uint32_t *>(hChannel->ioBuf + indexBuf)); // big endian 141 if (size <= 0 || static_cast<uint32_t>(size) > HDC_BUF_MAX_BYTES) { 142 WRITE_LOG(LOG_FATAL, "ReadStream size:%d channelId:%u", size, channelId); 143 needExit = true; 144 break; 145 } 146 if (hChannel->availTailIndex - DWORD_SERIALIZE_SIZE < size) { 147 break; 148 } 149 childRet = thisClass->ReadChannel(hChannel, reinterpret_cast<uint8_t *>(hChannel->ioBuf) + 150 DWORD_SERIALIZE_SIZE + indexBuf, size); 151 if (childRet < 0) { 152 WRITE_LOG(LOG_WARN, "ReadStream childRet:%d channelId:%u keepAlive:%d", 153 childRet, channelId, hChannel->keepAlive); 154 if (!hChannel->keepAlive) { 155 needExit = true; 156 break; 157 } 158 } 159 // update io 160 hChannel->availTailIndex -= (DWORD_SERIALIZE_SIZE + size); 161 indexBuf += DWORD_SERIALIZE_SIZE + size; 162 } 163 if (indexBuf > 0 && hChannel->availTailIndex > 0) { 164 if (memmove_s(hChannel->ioBuf, hChannel->bufSize, hChannel->ioBuf + indexBuf, hChannel->availTailIndex)) { 165 needExit = true; 166 goto Finish; 167 } 168 } 169 170Finish: 171 if (needExit) { 172 thisClass->FreeChannel(hChannel->channelId); 173 WRITE_LOG(LOG_DEBUG, "Read Stream needExit, FreeChannel finish channelId:%u", channelId); 174 } 175} 176 177void HdcChannelBase::WriteCallback(uv_write_t *req, int status) 178{ 179 HChannel hChannel = (HChannel)req->handle->data; 180 --hChannel->ref; 181 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel; 182 if (status < 0) { 183 hChannel->writeFailedTimes++; 184 Base::TryCloseHandle((uv_handle_t *)req->handle); 185 if (!hChannel->isDead && !hChannel->ref) { 186 thisClass->FreeChannel(hChannel->channelId); 187 } 188 } 189 delete[]((uint8_t *)req->data); 190 delete req; 191} 192 193void HdcChannelBase::AsyncMainLoopTask(uv_idle_t *handle) 194{ 195 AsyncParam *param = (AsyncParam *)handle->data; 196 HdcChannelBase *thisClass = (HdcChannelBase *)param->thisClass; 197 198 switch (param->method) { 199 case ASYNC_FREE_CHANNEL: { 200 // alloc/release should pair in main thread. 201 thisClass->FreeChannel(param->sid); 202 break; 203 } 204 default: 205 break; 206 } 207 if (param->data) { 208 delete[]((uint8_t *)param->data); 209 } 210 delete param; 211 uv_close((uv_handle_t *)handle, Base::CloseIdleCallback); 212} 213 214// multiple uv_async_send() calls may be merged by libuv,so not each call will yield callback as expected. 215// eg: if uv_async_send() 5 times before callback calling,it will be called only once. 216// if uv_async_send() is called again after callback calling, it will be called again. 217void HdcChannelBase::MainAsyncCallback(uv_async_t *handle) 218{ 219 HdcChannelBase *thisClass = (HdcChannelBase *)handle->data; 220 if (uv_is_closing((uv_handle_t *)thisClass->loopMain)) { 221 WRITE_LOG(LOG_WARN, "MainAsyncCallback uv_is_closing loopMain"); 222 return; 223 } 224 list<void *>::iterator i; 225 list<void *> &lst = thisClass->lstMainThreadOP; 226 uv_rwlock_wrlock(&thisClass->mainAsync); 227 for (i = lst.begin(); i != lst.end();) { 228 AsyncParam *param = (AsyncParam *)*i; 229 Base::IdleUvTask(thisClass->loopMain, param, AsyncMainLoopTask); 230 i = lst.erase(i); 231 } 232 uv_rwlock_wrunlock(&thisClass->mainAsync); 233} 234 235void HdcChannelBase::PushAsyncMessage(const uint32_t channelId, const uint8_t method, const void *data, 236 const int dataSize) 237{ 238 if (uv_is_closing((uv_handle_t *)&asyncMainLoop)) { 239 WRITE_LOG(LOG_WARN, "PushAsyncMessage uv_is_closing asyncMainLoop"); 240 return; 241 } 242 auto param = new AsyncParam(); 243 if (!param) { 244 return; 245 } 246 param->sid = channelId; // Borrow SID storage 247 param->thisClass = this; 248 param->method = method; 249 if (dataSize > 0) { 250 param->dataSize = dataSize; 251 param->data = new uint8_t[param->dataSize](); 252 if (!param->data) { 253 delete param; 254 return; 255 } 256 if (memcpy_s((uint8_t *)param->data, param->dataSize, data, dataSize)) { 257 delete[]((uint8_t *)param->data); 258 delete param; 259 return; 260 } 261 } 262 asyncMainLoop.data = this; 263 uv_rwlock_wrlock(&mainAsync); 264 lstMainThreadOP.push_back(param); 265 uv_rwlock_wrunlock(&mainAsync); 266 uv_async_send(&asyncMainLoop); 267} 268 269// add commandflag ahead real buf data 270void HdcChannelBase::SendChannelWithCmd(HChannel hChannel, const uint16_t commandFlag, uint8_t *bufPtr, const int size) 271{ 272 StartTraceScope("HdcChannelBase::SendChannelWithCmd"); 273 auto data = new uint8_t[size + sizeof(commandFlag)](); 274 if (!data) { 275 return; 276 } 277 278 if (memcpy_s(data, size + sizeof(commandFlag), &commandFlag, sizeof(commandFlag))) { 279 delete[] data; 280 return; 281 } 282 283 if (size > 0 && memcpy_s(data + sizeof(commandFlag), size, bufPtr, size)) { 284 delete[] data; 285 return; 286 } 287 288 SendChannel(hChannel, data, size + sizeof(commandFlag)); 289 delete[] data; 290} 291 292void HdcChannelBase::SendWithCmd(const uint32_t channelId, const uint16_t commandFlag, uint8_t *bufPtr, const int size) 293{ 294 StartTraceScope("HdcChannelBase::SendWithCmd"); 295 HChannel hChannel = reinterpret_cast<HChannel>(AdminChannel(OP_QUERY_REF, channelId, nullptr)); 296 if (!hChannel) { 297 WRITE_LOG(LOG_FATAL, "SendWithCmd hChannel nullptr channelId:%u", channelId); 298 return; 299 } 300 do { 301 if (hChannel->isDead) { 302 WRITE_LOG(LOG_FATAL, "SendWithCmd isDead channelId:%u", channelId); 303 break; 304 } 305 SendChannelWithCmd(hChannel, commandFlag, bufPtr, size); 306 } while (false); 307 --hChannel->ref; 308} 309 310void HdcChannelBase::SendChannel(HChannel hChannel, uint8_t *bufPtr, const int size) 311{ 312 StartTraceScope("HdcChannelBase::SendChannel"); 313 uv_stream_t *sendStream = nullptr; 314 int sizeNewBuf = size + DWORD_SERIALIZE_SIZE; 315 auto data = new uint8_t[sizeNewBuf](); 316 if (!data) { 317 return; 318 } 319 *reinterpret_cast<uint32_t *>(data) = htonl(size); // big endian 320 if (memcpy_s(data + DWORD_SERIALIZE_SIZE, sizeNewBuf - DWORD_SERIALIZE_SIZE, bufPtr, size)) { 321 delete[] data; 322 return; 323 } 324 if (hChannel->hWorkThread == uv_thread_self()) { 325 sendStream = (uv_stream_t *)&hChannel->hWorkTCP; 326 } else { 327 sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP; 328 } 329 if (!uv_is_closing((const uv_handle_t *)sendStream) && uv_is_writable(sendStream)) { 330 ++hChannel->ref; 331 Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)WriteCallback, data); 332 } else { 333 delete[] data; 334 } 335} 336 337// works only in current working thread 338void HdcChannelBase::Send(const uint32_t channelId, uint8_t *bufPtr, const int size) 339{ 340 StartTraceScope("HdcChannelBase::Send"); 341 HChannel hChannel = reinterpret_cast<HChannel>(AdminChannel(OP_QUERY_REF, channelId, nullptr)); 342 if (!hChannel) { 343 WRITE_LOG(LOG_FATAL, "Send hChannel nullptr channelId:%u", channelId); 344 return; 345 } 346 do { 347 if (hChannel->isDead) { 348 WRITE_LOG(LOG_FATAL, "Send isDead channelId:%u", channelId); 349 break; 350 } 351 SendChannel(hChannel, bufPtr, size); 352 } while (false); 353 --hChannel->ref; 354} 355 356void HdcChannelBase::AllocCallback(uv_handle_t *handle, size_t sizeWanted, uv_buf_t *buf) 357{ 358 HChannel context = (HChannel)handle->data; 359 Base::ReallocBuf(&context->ioBuf, &context->bufSize, Base::GetMaxBufSize() * BUF_EXTEND_SIZE); 360 buf->base = (char *)context->ioBuf + context->availTailIndex; 361 int size = context->bufSize - context->availTailIndex; 362 buf->len = std::min(size, static_cast<int>(sizeWanted)); 363} 364 365uint32_t HdcChannelBase::GetChannelPseudoUid() 366{ 367 uint32_t uid = 0; 368 do { 369 uid = Base::GetSecureRandom(); 370 } while (AdminChannel(OP_QUERY, uid, nullptr) != nullptr); 371 return uid; 372} 373 374uint32_t HdcChannelBase::MallocChannel(HChannel *hOutChannel) 375{ 376#ifdef CONFIG_USE_JEMALLOC_DFX_INIF 377 mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE); 378 mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE); 379#endif 380 auto hChannel = new HdcChannel(); 381 if (!hChannel) { 382 WRITE_LOG(LOG_FATAL, "malloc channel failed"); 383 return 0; 384 } 385 hChannel->stdinTty.data = nullptr; 386 hChannel->stdoutTty.data = nullptr; 387 uint32_t channelId = GetChannelPseudoUid(); 388 if (isServerOrClient) { 389 hChannel->serverOrClient = isServerOrClient; 390 ++channelId; // Use different value for serverForClient&client in per process 391 } 392 uv_tcp_init(loopMain, &hChannel->hWorkTCP); 393 ++hChannel->uvHandleRef; 394 hChannel->hWorkThread = uv_thread_self(); 395 hChannel->hWorkTCP.data = hChannel; 396 hChannel->clsChannel = this; 397 hChannel->channelId = channelId; 398 (void)memset_s(&hChannel->hChildWorkTCP, sizeof(hChannel->hChildWorkTCP), 0, sizeof(uv_tcp_t)); 399 AdminChannel(OP_ADD, channelId, hChannel); 400 *hOutChannel = hChannel; 401 if (isServerOrClient) { 402 WRITE_LOG(LOG_INFO, "Mallocchannel:%u", channelId); 403 } else { 404 WRITE_LOG(LOG_DEBUG, "Mallocchannel:%u", channelId); 405 } 406 return channelId; 407} 408 409// work when libuv-handle at struct of HdcSession has all callback finished 410void HdcChannelBase::FreeChannelFinally(uv_idle_t *handle) 411{ 412 HChannel hChannel = (HChannel)handle->data; 413 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel; 414 if (hChannel->uvHandleRef > 0) { 415 if (hChannel->serverOrClient) { 416 WRITE_LOG(LOG_INFO, "FreeChannelFinally uvHandleRef:%d channelId:%u sid:%u", 417 hChannel->uvHandleRef, hChannel->channelId, hChannel->targetSessionId); 418 } else { 419 WRITE_LOG(LOG_DEBUG, "FreeChannelFinally uvHandleRef:%d channelId:%u sid:%u", 420 hChannel->uvHandleRef, hChannel->channelId, hChannel->targetSessionId); 421 } 422 return; 423 } 424 thisClass->NotifyInstanceChannelFree(hChannel); 425 thisClass->AdminChannel(OP_REMOVE, hChannel->channelId, nullptr); 426 427 if (!hChannel->serverOrClient) { 428 WRITE_LOG(LOG_DEBUG, "!!!FreeChannelFinally channelId:%u sid:%u finish", 429 hChannel->channelId, hChannel->targetSessionId); 430 uv_stop(thisClass->loopMain); 431 } else { 432 WRITE_LOG(LOG_INFO, "!!!FreeChannelFinally channelId:%u sid:%u finish", 433 hChannel->channelId, hChannel->targetSessionId); 434 } 435#ifdef HDC_HOST 436 Base::TryCloseHandle((const uv_handle_t *)&hChannel->hChildWorkTCP); 437#endif 438 delete hChannel; 439 Base::TryCloseHandle((const uv_handle_t *)handle, Base::CloseIdleCallback); 440} 441 442void HdcChannelBase::FreeChannelContinue(HChannel hChannel) 443{ 444 auto closeChannelHandle = [](uv_handle_t *handle) -> void { 445 if (handle->data == nullptr) { 446 WRITE_LOG(LOG_DEBUG, "FreeChannelContinue handle->data is nullptr"); 447 return; 448 } 449 HChannel channel = reinterpret_cast<HChannel>(handle->data); 450 --channel->uvHandleRef; 451 Base::TryCloseHandle((uv_handle_t *)handle); 452 }; 453 hChannel->availTailIndex = 0; 454 if (hChannel->ioBuf) { 455 delete[] hChannel->ioBuf; 456 hChannel->ioBuf = nullptr; 457 } 458 if (!hChannel->serverOrClient) { 459 Base::TryCloseHandle((uv_handle_t *)&hChannel->stdinTty, closeChannelHandle); 460 Base::TryCloseHandle((uv_handle_t *)&hChannel->stdoutTty, closeChannelHandle); 461 } 462 if (uv_is_closing((const uv_handle_t *)&hChannel->hWorkTCP)) { 463 --hChannel->uvHandleRef; 464 } else { 465 Base::TryCloseHandle((uv_handle_t *)&hChannel->hWorkTCP, closeChannelHandle); 466 } 467 Base::IdleUvTask(loopMain, hChannel, FreeChannelFinally); 468} 469 470void HdcChannelBase::FreeChannelOpeate(uv_timer_t *handle) 471{ 472 HChannel hChannel = (HChannel)handle->data; 473 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel; 474 if (hChannel->ref > 0) { 475 return; 476 } 477 thisClass->DispMntnInfo(hChannel); 478 if (hChannel->hChildWorkTCP.loop) { 479 auto ctrl = HdcSessionBase::BuildCtrlString(SP_DEATCH_CHANNEL, hChannel->channelId, nullptr, 0); 480 bool ret = thisClass->ChannelSendSessionCtrlMsg(ctrl, hChannel->targetSessionId); 481 if (!ret) { 482 WRITE_LOG(LOG_WARN, "FreeChannelOpeate deatch failed channelId:%u sid:%u", 483 hChannel->channelId, hChannel->targetSessionId); 484 hChannel->childCleared = true; 485 } 486 auto callbackCheckFreeChannelContinue = [](uv_timer_t *handle) -> void { 487 HChannel hChannel = (HChannel)handle->data; 488 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel; 489 if (!hChannel->childCleared) { 490 WRITE_LOG(LOG_WARN, "FreeChannelOpeate childCleared:%d channelId:%u sid:%u", 491 hChannel->childCleared, hChannel->channelId, hChannel->targetSessionId); 492 return; 493 } 494 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback); 495 thisClass->FreeChannelContinue(hChannel); 496 }; 497 Base::TimerUvTask(thisClass->loopMain, hChannel, callbackCheckFreeChannelContinue); 498 } else { 499 thisClass->FreeChannelContinue(hChannel); 500 } 501 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback); 502} 503 504void HdcChannelBase::FreeChannel(const uint32_t channelId) 505{ 506 if (threadChanneMain != uv_thread_self()) { 507 PushAsyncMessage(channelId, ASYNC_FREE_CHANNEL, nullptr, 0); 508 WRITE_LOG(LOG_INFO, "FreeChannel not uv_thread_self channelid:%u", channelId); 509 return; 510 } 511 HChannel hChannel = AdminChannel(OP_QUERY, channelId, nullptr); 512 do { 513 if (!hChannel || hChannel->isDead) { 514 WRITE_LOG(LOG_WARN, "FreeChannel hChannel nullptr or isDead channelid:%u", channelId); 515 break; 516 } 517 WRITE_LOG(LOG_DEBUG, "Begin to free channel, channelid:%u", channelId); 518 Base::TimerUvTask(loopMain, hChannel, FreeChannelOpeate, MINOR_TIMEOUT); // do immediately 519 hChannel->isDead = true; 520 } while (false); 521} 522 523HChannel HdcChannelBase::AdminChannel(const uint8_t op, const uint32_t channelId, HChannel hInput) 524{ 525 HChannel hRet = nullptr; 526 switch (op) { 527 case OP_ADD: 528 uv_rwlock_wrlock(&lockMapChannel); 529 mapChannel[channelId] = hInput; 530 uv_rwlock_wrunlock(&lockMapChannel); 531 break; 532 case OP_REMOVE: 533 uv_rwlock_wrlock(&lockMapChannel); 534 mapChannel.erase(channelId); 535 uv_rwlock_wrunlock(&lockMapChannel); 536 break; 537 case OP_QUERY: 538 uv_rwlock_rdlock(&lockMapChannel); 539 if (mapChannel.count(channelId)) { 540 hRet = mapChannel[channelId]; 541 } 542 uv_rwlock_rdunlock(&lockMapChannel); 543 break; 544 case OP_QUERY_REF: 545 uv_rwlock_wrlock(&lockMapChannel); 546 if (mapChannel.count(channelId)) { 547 hRet = mapChannel[channelId]; 548 ++hRet->ref; 549 } 550 uv_rwlock_wrunlock(&lockMapChannel); 551 break; 552 case OP_UPDATE: 553 uv_rwlock_wrlock(&lockMapChannel); 554 // remove old 555 mapChannel.erase(channelId); 556 mapChannel[hInput->channelId] = hInput; 557 uv_rwlock_wrunlock(&lockMapChannel); 558 break; 559 default: 560 break; 561 } 562 return hRet; 563} 564 565void HdcChannelBase::EchoToClient(HChannel hChannel, uint8_t *bufPtr, const int size) 566{ 567 StartTraceScope("HdcChannelBase::EchoToClient"); 568 uv_stream_t *sendStream = nullptr; 569 int sizeNewBuf = size + DWORD_SERIALIZE_SIZE; 570 auto data = new uint8_t[sizeNewBuf](); 571 if (!data) { 572 return; 573 } 574 *reinterpret_cast<uint32_t *>(data) = htonl(size); 575 if (memcpy_s(data + DWORD_SERIALIZE_SIZE, sizeNewBuf - DWORD_SERIALIZE_SIZE, bufPtr, size)) { 576 delete[] data; 577 return; 578 } 579 sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP; 580 if (!uv_is_closing((const uv_handle_t *)sendStream) && uv_is_writable(sendStream)) { 581 ++hChannel->ref; 582 Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)WriteCallback, data); 583 } else { 584 WRITE_LOG(LOG_WARN, "EchoToClient, channelId:%u is unwritable.", hChannel->channelId); 585 delete[] data; 586 } 587} 588 589void HdcChannelBase::EchoToAllChannelsViaSessionId(uint32_t targetSessionId, const string &echo) 590{ 591 for (auto v : mapChannel) { 592 HChannel hChannel = (HChannel)v.second; 593 if (!hChannel->isDead && hChannel->targetSessionId == targetSessionId) { 594 WRITE_LOG(LOG_INFO, "%s:%u %s", __FUNCTION__, targetSessionId, echo.c_str()); 595 EchoToClient(hChannel, (uint8_t *)echo.c_str(), echo.size()); 596 } 597 } 598} 599 600void HdcChannelBase::DispMntnInfo(HChannel hChannel) 601{ 602 if (!hChannel) { 603 WRITE_LOG(LOG_WARN, "prt is null"); 604 return; 605 } 606 WRITE_LOG(LOG_DEBUG, "channel info: id:%u isDead:%d ref:%u, writeFailedTimes:%u", 607 hChannel->channelId, hChannel->isDead, uint32_t(hChannel->ref), uint32_t(hChannel->writeFailedTimes)); 608} 609} 610