1cc290419Sopenharmony_ci/* 2cc290419Sopenharmony_ci * Copyright (C) 2021 Huawei Device Co., Ltd. 3cc290419Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 4cc290419Sopenharmony_ci * you may not use this file except in compliance with the License. 5cc290419Sopenharmony_ci * You may obtain a copy of the License at 6cc290419Sopenharmony_ci * 7cc290419Sopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 8cc290419Sopenharmony_ci * 9cc290419Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software 10cc290419Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 11cc290419Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12cc290419Sopenharmony_ci * See the License for the specific language governing permissions and 13cc290419Sopenharmony_ci * limitations under the License. 14cc290419Sopenharmony_ci */ 15cc290419Sopenharmony_ci#include "channel.h" 16cc290419Sopenharmony_cinamespace Hdc { 17cc290419Sopenharmony_ciHdcChannelBase::HdcChannelBase(const bool serverOrClient, const string &addrString, uv_loop_t *loopMainIn) 18cc290419Sopenharmony_ci{ 19cc290419Sopenharmony_ci SetChannelTCPString(addrString); 20cc290419Sopenharmony_ci isServerOrClient = serverOrClient; 21cc290419Sopenharmony_ci loopMain = loopMainIn; 22cc290419Sopenharmony_ci threadChanneMain = uv_thread_self(); 23cc290419Sopenharmony_ci uv_rwlock_init(&mainAsync); 24cc290419Sopenharmony_ci uv_async_init(loopMain, &asyncMainLoop, MainAsyncCallback); 25cc290419Sopenharmony_ci uv_rwlock_init(&lockMapChannel); 26cc290419Sopenharmony_ci} 27cc290419Sopenharmony_ci 28cc290419Sopenharmony_ciHdcChannelBase::~HdcChannelBase() 29cc290419Sopenharmony_ci{ 30cc290419Sopenharmony_ci ClearChannels(); 31cc290419Sopenharmony_ci // clear 32cc290419Sopenharmony_ci if (!uv_is_closing((uv_handle_t *)&asyncMainLoop)) { 33cc290419Sopenharmony_ci uv_close((uv_handle_t *)&asyncMainLoop, nullptr); 34cc290419Sopenharmony_ci } 35cc290419Sopenharmony_ci 36cc290419Sopenharmony_ci uv_rwlock_destroy(&mainAsync); 37cc290419Sopenharmony_ci uv_rwlock_destroy(&lockMapChannel); 38cc290419Sopenharmony_ci} 39cc290419Sopenharmony_ci 40cc290419Sopenharmony_civector<uint8_t> HdcChannelBase::GetChannelHandshake(string &connectKey) const 41cc290419Sopenharmony_ci{ 42cc290419Sopenharmony_ci vector<uint8_t> ret; 43cc290419Sopenharmony_ci struct ChannelHandShake handshake = {}; 44cc290419Sopenharmony_ci if (strcpy_s(handshake.banner, sizeof(handshake.banner), HANDSHAKE_MESSAGE.c_str()) != EOK) { 45cc290419Sopenharmony_ci return ret; 46cc290419Sopenharmony_ci } 47cc290419Sopenharmony_ci if (strcpy_s(handshake.connectKey, sizeof(handshake.connectKey), connectKey.c_str()) != EOK) { 48cc290419Sopenharmony_ci return ret; 49cc290419Sopenharmony_ci } 50cc290419Sopenharmony_ci ret.insert(ret.begin(), (uint8_t *)&handshake, (uint8_t *)&handshake + sizeof(ChannelHandShake)); 51cc290419Sopenharmony_ci return ret; 52cc290419Sopenharmony_ci} 53cc290419Sopenharmony_ci 54cc290419Sopenharmony_cibool HdcChannelBase::SetChannelTCPString(const string &addrString) 55cc290419Sopenharmony_ci{ 56cc290419Sopenharmony_ci bool ret = false; 57cc290419Sopenharmony_ci while (true) { 58cc290419Sopenharmony_ci if (addrString.find(":") == string::npos) { 59cc290419Sopenharmony_ci break; 60cc290419Sopenharmony_ci } 61cc290419Sopenharmony_ci std::size_t found = addrString.find_last_of(":"); 62cc290419Sopenharmony_ci if (found == string::npos) { 63cc290419Sopenharmony_ci break; 64cc290419Sopenharmony_ci } 65cc290419Sopenharmony_ci 66cc290419Sopenharmony_ci string host = addrString.substr(0, found); 67cc290419Sopenharmony_ci string port = addrString.substr(found + 1); 68cc290419Sopenharmony_ci 69cc290419Sopenharmony_ci channelPort = std::atoi(port.c_str()); 70cc290419Sopenharmony_ci sockaddr_in addrv4; 71cc290419Sopenharmony_ci sockaddr_in6 addrv6; 72cc290419Sopenharmony_ci if (!channelPort) { 73cc290419Sopenharmony_ci break; 74cc290419Sopenharmony_ci } 75cc290419Sopenharmony_ci 76cc290419Sopenharmony_ci if (uv_ip6_addr(host.c_str(), channelPort, &addrv6) != 0 && 77cc290419Sopenharmony_ci uv_ip4_addr(host.c_str(), channelPort, &addrv4) != 0) { 78cc290419Sopenharmony_ci break; 79cc290419Sopenharmony_ci } 80cc290419Sopenharmony_ci channelHost = host; 81cc290419Sopenharmony_ci channelHostPort = addrString; 82cc290419Sopenharmony_ci ret = true; 83cc290419Sopenharmony_ci break; 84cc290419Sopenharmony_ci } 85cc290419Sopenharmony_ci if (!ret) { 86cc290419Sopenharmony_ci channelPort = 0; 87cc290419Sopenharmony_ci channelHost = STRING_EMPTY; 88cc290419Sopenharmony_ci channelHostPort = STRING_EMPTY; 89cc290419Sopenharmony_ci } 90cc290419Sopenharmony_ci return ret; 91cc290419Sopenharmony_ci} 92cc290419Sopenharmony_ci 93cc290419Sopenharmony_civoid HdcChannelBase::ClearChannels() 94cc290419Sopenharmony_ci{ 95cc290419Sopenharmony_ci for (auto v : mapChannel) { 96cc290419Sopenharmony_ci HChannel hChannel = (HChannel)v.second; 97cc290419Sopenharmony_ci if (!hChannel->isDead) { 98cc290419Sopenharmony_ci FreeChannel(hChannel->channelId); 99cc290419Sopenharmony_ci } 100cc290419Sopenharmony_ci } 101cc290419Sopenharmony_ci} 102cc290419Sopenharmony_ci 103cc290419Sopenharmony_civoid HdcChannelBase::WorkerPendding() 104cc290419Sopenharmony_ci{ 105cc290419Sopenharmony_ci WRITE_LOG(LOG_DEBUG, "Begin host channel pendding"); 106cc290419Sopenharmony_ci uv_run(loopMain, UV_RUN_DEFAULT); 107cc290419Sopenharmony_ci uv_loop_close(loopMain); 108cc290419Sopenharmony_ci} 109cc290419Sopenharmony_ci 110cc290419Sopenharmony_civoid HdcChannelBase::ReadStream(uv_stream_t *tcp, ssize_t nread, const uv_buf_t *buf) 111cc290419Sopenharmony_ci{ 112cc290419Sopenharmony_ci StartTraceScope("HdcChannelBase::ReadStream"); 113cc290419Sopenharmony_ci int size = 0; 114cc290419Sopenharmony_ci int indexBuf = 0; 115cc290419Sopenharmony_ci int childRet = 0; 116cc290419Sopenharmony_ci bool needExit = false; 117cc290419Sopenharmony_ci HChannel hChannel = (HChannel)tcp->data; 118cc290419Sopenharmony_ci HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel; 119cc290419Sopenharmony_ci uint32_t channelId = hChannel->channelId; 120cc290419Sopenharmony_ci 121cc290419Sopenharmony_ci if (nread == UV_ENOBUFS) { 122cc290419Sopenharmony_ci WRITE_LOG(LOG_FATAL, "ReadStream nobufs channelId:%u", channelId); 123cc290419Sopenharmony_ci return; 124cc290419Sopenharmony_ci } else if (nread == 0) { 125cc290419Sopenharmony_ci // maybe just after accept, second client req 126cc290419Sopenharmony_ci WRITE_LOG(LOG_DEBUG, "ReadStream idle read channelId:%u", channelId); 127cc290419Sopenharmony_ci return; 128cc290419Sopenharmony_ci } else if (nread < 0) { 129cc290419Sopenharmony_ci Base::TryCloseHandle((uv_handle_t *)tcp); 130cc290419Sopenharmony_ci constexpr int bufSize = 1024; 131cc290419Sopenharmony_ci char buffer[bufSize] = { 0 }; 132cc290419Sopenharmony_ci uv_err_name_r(nread, buffer, bufSize); 133cc290419Sopenharmony_ci WRITE_LOG(LOG_DEBUG, "ReadStream channelId:%u failed:%s", channelId, buffer); 134cc290419Sopenharmony_ci needExit = true; 135cc290419Sopenharmony_ci goto Finish; 136cc290419Sopenharmony_ci } else { 137cc290419Sopenharmony_ci hChannel->availTailIndex += nread; 138cc290419Sopenharmony_ci } 139cc290419Sopenharmony_ci while (hChannel->availTailIndex > DWORD_SERIALIZE_SIZE) { 140cc290419Sopenharmony_ci size = ntohl(*reinterpret_cast<uint32_t *>(hChannel->ioBuf + indexBuf)); // big endian 141cc290419Sopenharmony_ci if (size <= 0 || static_cast<uint32_t>(size) > HDC_BUF_MAX_BYTES) { 142cc290419Sopenharmony_ci WRITE_LOG(LOG_FATAL, "ReadStream size:%d channelId:%u", size, channelId); 143cc290419Sopenharmony_ci needExit = true; 144cc290419Sopenharmony_ci break; 145cc290419Sopenharmony_ci } 146cc290419Sopenharmony_ci if (hChannel->availTailIndex - DWORD_SERIALIZE_SIZE < size) { 147cc290419Sopenharmony_ci break; 148cc290419Sopenharmony_ci } 149cc290419Sopenharmony_ci childRet = thisClass->ReadChannel(hChannel, reinterpret_cast<uint8_t *>(hChannel->ioBuf) + 150cc290419Sopenharmony_ci DWORD_SERIALIZE_SIZE + indexBuf, size); 151cc290419Sopenharmony_ci if (childRet < 0) { 152cc290419Sopenharmony_ci WRITE_LOG(LOG_WARN, "ReadStream childRet:%d channelId:%u keepAlive:%d", 153cc290419Sopenharmony_ci childRet, channelId, hChannel->keepAlive); 154cc290419Sopenharmony_ci if (!hChannel->keepAlive) { 155cc290419Sopenharmony_ci needExit = true; 156cc290419Sopenharmony_ci break; 157cc290419Sopenharmony_ci } 158cc290419Sopenharmony_ci } 159cc290419Sopenharmony_ci // update io 160cc290419Sopenharmony_ci hChannel->availTailIndex -= (DWORD_SERIALIZE_SIZE + size); 161cc290419Sopenharmony_ci indexBuf += DWORD_SERIALIZE_SIZE + size; 162cc290419Sopenharmony_ci } 163cc290419Sopenharmony_ci if (indexBuf > 0 && hChannel->availTailIndex > 0) { 164cc290419Sopenharmony_ci if (memmove_s(hChannel->ioBuf, hChannel->bufSize, hChannel->ioBuf + indexBuf, hChannel->availTailIndex)) { 165cc290419Sopenharmony_ci needExit = true; 166cc290419Sopenharmony_ci goto Finish; 167cc290419Sopenharmony_ci } 168cc290419Sopenharmony_ci } 169cc290419Sopenharmony_ci 170cc290419Sopenharmony_ciFinish: 171cc290419Sopenharmony_ci if (needExit) { 172cc290419Sopenharmony_ci thisClass->FreeChannel(hChannel->channelId); 173cc290419Sopenharmony_ci WRITE_LOG(LOG_DEBUG, "Read Stream needExit, FreeChannel finish channelId:%u", channelId); 174cc290419Sopenharmony_ci } 175cc290419Sopenharmony_ci} 176cc290419Sopenharmony_ci 177cc290419Sopenharmony_civoid HdcChannelBase::WriteCallback(uv_write_t *req, int status) 178cc290419Sopenharmony_ci{ 179cc290419Sopenharmony_ci HChannel hChannel = (HChannel)req->handle->data; 180cc290419Sopenharmony_ci --hChannel->ref; 181cc290419Sopenharmony_ci HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel; 182cc290419Sopenharmony_ci if (status < 0) { 183cc290419Sopenharmony_ci hChannel->writeFailedTimes++; 184cc290419Sopenharmony_ci Base::TryCloseHandle((uv_handle_t *)req->handle); 185cc290419Sopenharmony_ci if (!hChannel->isDead && !hChannel->ref) { 186cc290419Sopenharmony_ci thisClass->FreeChannel(hChannel->channelId); 187cc290419Sopenharmony_ci } 188cc290419Sopenharmony_ci } 189cc290419Sopenharmony_ci delete[]((uint8_t *)req->data); 190cc290419Sopenharmony_ci delete req; 191cc290419Sopenharmony_ci} 192cc290419Sopenharmony_ci 193cc290419Sopenharmony_civoid HdcChannelBase::AsyncMainLoopTask(uv_idle_t *handle) 194cc290419Sopenharmony_ci{ 195cc290419Sopenharmony_ci AsyncParam *param = (AsyncParam *)handle->data; 196cc290419Sopenharmony_ci HdcChannelBase *thisClass = (HdcChannelBase *)param->thisClass; 197cc290419Sopenharmony_ci 198cc290419Sopenharmony_ci switch (param->method) { 199cc290419Sopenharmony_ci case ASYNC_FREE_CHANNEL: { 200cc290419Sopenharmony_ci // alloc/release should pair in main thread. 201cc290419Sopenharmony_ci thisClass->FreeChannel(param->sid); 202cc290419Sopenharmony_ci break; 203cc290419Sopenharmony_ci } 204cc290419Sopenharmony_ci default: 205cc290419Sopenharmony_ci break; 206cc290419Sopenharmony_ci } 207cc290419Sopenharmony_ci if (param->data) { 208cc290419Sopenharmony_ci delete[]((uint8_t *)param->data); 209cc290419Sopenharmony_ci } 210cc290419Sopenharmony_ci delete param; 211cc290419Sopenharmony_ci uv_close((uv_handle_t *)handle, Base::CloseIdleCallback); 212cc290419Sopenharmony_ci} 213cc290419Sopenharmony_ci 214cc290419Sopenharmony_ci// multiple uv_async_send() calls may be merged by libuv,so not each call will yield callback as expected. 215cc290419Sopenharmony_ci// eg: if uv_async_send() 5 times before callback calling,it will be called only once. 216cc290419Sopenharmony_ci// if uv_async_send() is called again after callback calling, it will be called again. 217cc290419Sopenharmony_civoid HdcChannelBase::MainAsyncCallback(uv_async_t *handle) 218cc290419Sopenharmony_ci{ 219cc290419Sopenharmony_ci HdcChannelBase *thisClass = (HdcChannelBase *)handle->data; 220cc290419Sopenharmony_ci if (uv_is_closing((uv_handle_t *)thisClass->loopMain)) { 221cc290419Sopenharmony_ci WRITE_LOG(LOG_WARN, "MainAsyncCallback uv_is_closing loopMain"); 222cc290419Sopenharmony_ci return; 223cc290419Sopenharmony_ci } 224cc290419Sopenharmony_ci list<void *>::iterator i; 225cc290419Sopenharmony_ci list<void *> &lst = thisClass->lstMainThreadOP; 226cc290419Sopenharmony_ci uv_rwlock_wrlock(&thisClass->mainAsync); 227cc290419Sopenharmony_ci for (i = lst.begin(); i != lst.end();) { 228cc290419Sopenharmony_ci AsyncParam *param = (AsyncParam *)*i; 229cc290419Sopenharmony_ci Base::IdleUvTask(thisClass->loopMain, param, AsyncMainLoopTask); 230cc290419Sopenharmony_ci i = lst.erase(i); 231cc290419Sopenharmony_ci } 232cc290419Sopenharmony_ci uv_rwlock_wrunlock(&thisClass->mainAsync); 233cc290419Sopenharmony_ci} 234cc290419Sopenharmony_ci 235cc290419Sopenharmony_civoid HdcChannelBase::PushAsyncMessage(const uint32_t channelId, const uint8_t method, const void *data, 236cc290419Sopenharmony_ci const int dataSize) 237cc290419Sopenharmony_ci{ 238cc290419Sopenharmony_ci if (uv_is_closing((uv_handle_t *)&asyncMainLoop)) { 239cc290419Sopenharmony_ci WRITE_LOG(LOG_WARN, "PushAsyncMessage uv_is_closing asyncMainLoop"); 240cc290419Sopenharmony_ci return; 241cc290419Sopenharmony_ci } 242cc290419Sopenharmony_ci auto param = new AsyncParam(); 243cc290419Sopenharmony_ci if (!param) { 244cc290419Sopenharmony_ci return; 245cc290419Sopenharmony_ci } 246cc290419Sopenharmony_ci param->sid = channelId; // Borrow SID storage 247cc290419Sopenharmony_ci param->thisClass = this; 248cc290419Sopenharmony_ci param->method = method; 249cc290419Sopenharmony_ci if (dataSize > 0) { 250cc290419Sopenharmony_ci param->dataSize = dataSize; 251cc290419Sopenharmony_ci param->data = new uint8_t[param->dataSize](); 252cc290419Sopenharmony_ci if (!param->data) { 253cc290419Sopenharmony_ci delete param; 254cc290419Sopenharmony_ci return; 255cc290419Sopenharmony_ci } 256cc290419Sopenharmony_ci if (memcpy_s((uint8_t *)param->data, param->dataSize, data, dataSize)) { 257cc290419Sopenharmony_ci delete[]((uint8_t *)param->data); 258cc290419Sopenharmony_ci delete param; 259cc290419Sopenharmony_ci return; 260cc290419Sopenharmony_ci } 261cc290419Sopenharmony_ci } 262cc290419Sopenharmony_ci asyncMainLoop.data = this; 263cc290419Sopenharmony_ci uv_rwlock_wrlock(&mainAsync); 264cc290419Sopenharmony_ci lstMainThreadOP.push_back(param); 265cc290419Sopenharmony_ci uv_rwlock_wrunlock(&mainAsync); 266cc290419Sopenharmony_ci uv_async_send(&asyncMainLoop); 267cc290419Sopenharmony_ci} 268cc290419Sopenharmony_ci 269cc290419Sopenharmony_ci// add commandflag ahead real buf data 270cc290419Sopenharmony_civoid HdcChannelBase::SendChannelWithCmd(HChannel hChannel, const uint16_t commandFlag, uint8_t *bufPtr, const int size) 271cc290419Sopenharmony_ci{ 272cc290419Sopenharmony_ci StartTraceScope("HdcChannelBase::SendChannelWithCmd"); 273cc290419Sopenharmony_ci auto data = new uint8_t[size + sizeof(commandFlag)](); 274cc290419Sopenharmony_ci if (!data) { 275cc290419Sopenharmony_ci return; 276cc290419Sopenharmony_ci } 277cc290419Sopenharmony_ci 278cc290419Sopenharmony_ci if (memcpy_s(data, size + sizeof(commandFlag), &commandFlag, sizeof(commandFlag))) { 279cc290419Sopenharmony_ci delete[] data; 280cc290419Sopenharmony_ci return; 281cc290419Sopenharmony_ci } 282cc290419Sopenharmony_ci 283cc290419Sopenharmony_ci if (size > 0 && memcpy_s(data + sizeof(commandFlag), size, bufPtr, size)) { 284cc290419Sopenharmony_ci delete[] data; 285cc290419Sopenharmony_ci return; 286cc290419Sopenharmony_ci } 287cc290419Sopenharmony_ci 288cc290419Sopenharmony_ci SendChannel(hChannel, data, size + sizeof(commandFlag)); 289cc290419Sopenharmony_ci delete[] data; 290cc290419Sopenharmony_ci} 291cc290419Sopenharmony_ci 292cc290419Sopenharmony_civoid HdcChannelBase::SendWithCmd(const uint32_t channelId, const uint16_t commandFlag, uint8_t *bufPtr, const int size) 293cc290419Sopenharmony_ci{ 294cc290419Sopenharmony_ci StartTraceScope("HdcChannelBase::SendWithCmd"); 295cc290419Sopenharmony_ci HChannel hChannel = reinterpret_cast<HChannel>(AdminChannel(OP_QUERY_REF, channelId, nullptr)); 296cc290419Sopenharmony_ci if (!hChannel) { 297cc290419Sopenharmony_ci WRITE_LOG(LOG_FATAL, "SendWithCmd hChannel nullptr channelId:%u", channelId); 298cc290419Sopenharmony_ci return; 299cc290419Sopenharmony_ci } 300cc290419Sopenharmony_ci do { 301cc290419Sopenharmony_ci if (hChannel->isDead) { 302cc290419Sopenharmony_ci WRITE_LOG(LOG_FATAL, "SendWithCmd isDead channelId:%u", channelId); 303cc290419Sopenharmony_ci break; 304cc290419Sopenharmony_ci } 305cc290419Sopenharmony_ci SendChannelWithCmd(hChannel, commandFlag, bufPtr, size); 306cc290419Sopenharmony_ci } while (false); 307cc290419Sopenharmony_ci --hChannel->ref; 308cc290419Sopenharmony_ci} 309cc290419Sopenharmony_ci 310cc290419Sopenharmony_civoid HdcChannelBase::SendChannel(HChannel hChannel, uint8_t *bufPtr, const int size) 311cc290419Sopenharmony_ci{ 312cc290419Sopenharmony_ci StartTraceScope("HdcChannelBase::SendChannel"); 313cc290419Sopenharmony_ci uv_stream_t *sendStream = nullptr; 314cc290419Sopenharmony_ci int sizeNewBuf = size + DWORD_SERIALIZE_SIZE; 315cc290419Sopenharmony_ci auto data = new uint8_t[sizeNewBuf](); 316cc290419Sopenharmony_ci if (!data) { 317cc290419Sopenharmony_ci return; 318cc290419Sopenharmony_ci } 319cc290419Sopenharmony_ci *reinterpret_cast<uint32_t *>(data) = htonl(size); // big endian 320cc290419Sopenharmony_ci if (memcpy_s(data + DWORD_SERIALIZE_SIZE, sizeNewBuf - DWORD_SERIALIZE_SIZE, bufPtr, size)) { 321cc290419Sopenharmony_ci delete[] data; 322cc290419Sopenharmony_ci return; 323cc290419Sopenharmony_ci } 324cc290419Sopenharmony_ci if (hChannel->hWorkThread == uv_thread_self()) { 325cc290419Sopenharmony_ci sendStream = (uv_stream_t *)&hChannel->hWorkTCP; 326cc290419Sopenharmony_ci } else { 327cc290419Sopenharmony_ci sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP; 328cc290419Sopenharmony_ci } 329cc290419Sopenharmony_ci if (!uv_is_closing((const uv_handle_t *)sendStream) && uv_is_writable(sendStream)) { 330cc290419Sopenharmony_ci ++hChannel->ref; 331cc290419Sopenharmony_ci Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)WriteCallback, data); 332cc290419Sopenharmony_ci } else { 333cc290419Sopenharmony_ci delete[] data; 334cc290419Sopenharmony_ci } 335cc290419Sopenharmony_ci} 336cc290419Sopenharmony_ci 337cc290419Sopenharmony_ci// works only in current working thread 338cc290419Sopenharmony_civoid HdcChannelBase::Send(const uint32_t channelId, uint8_t *bufPtr, const int size) 339cc290419Sopenharmony_ci{ 340cc290419Sopenharmony_ci StartTraceScope("HdcChannelBase::Send"); 341cc290419Sopenharmony_ci HChannel hChannel = reinterpret_cast<HChannel>(AdminChannel(OP_QUERY_REF, channelId, nullptr)); 342cc290419Sopenharmony_ci if (!hChannel) { 343cc290419Sopenharmony_ci WRITE_LOG(LOG_FATAL, "Send hChannel nullptr channelId:%u", channelId); 344cc290419Sopenharmony_ci return; 345cc290419Sopenharmony_ci } 346cc290419Sopenharmony_ci do { 347cc290419Sopenharmony_ci if (hChannel->isDead) { 348cc290419Sopenharmony_ci WRITE_LOG(LOG_FATAL, "Send isDead channelId:%u", channelId); 349cc290419Sopenharmony_ci break; 350cc290419Sopenharmony_ci } 351cc290419Sopenharmony_ci SendChannel(hChannel, bufPtr, size); 352cc290419Sopenharmony_ci } while (false); 353cc290419Sopenharmony_ci --hChannel->ref; 354cc290419Sopenharmony_ci} 355cc290419Sopenharmony_ci 356cc290419Sopenharmony_civoid HdcChannelBase::AllocCallback(uv_handle_t *handle, size_t sizeWanted, uv_buf_t *buf) 357cc290419Sopenharmony_ci{ 358cc290419Sopenharmony_ci HChannel context = (HChannel)handle->data; 359cc290419Sopenharmony_ci Base::ReallocBuf(&context->ioBuf, &context->bufSize, Base::GetMaxBufSize() * BUF_EXTEND_SIZE); 360cc290419Sopenharmony_ci buf->base = (char *)context->ioBuf + context->availTailIndex; 361cc290419Sopenharmony_ci int size = context->bufSize - context->availTailIndex; 362cc290419Sopenharmony_ci buf->len = std::min(size, static_cast<int>(sizeWanted)); 363cc290419Sopenharmony_ci} 364cc290419Sopenharmony_ci 365cc290419Sopenharmony_ciuint32_t HdcChannelBase::GetChannelPseudoUid() 366cc290419Sopenharmony_ci{ 367cc290419Sopenharmony_ci uint32_t uid = 0; 368cc290419Sopenharmony_ci do { 369cc290419Sopenharmony_ci uid = Base::GetSecureRandom(); 370cc290419Sopenharmony_ci } while (AdminChannel(OP_QUERY, uid, nullptr) != nullptr); 371cc290419Sopenharmony_ci return uid; 372cc290419Sopenharmony_ci} 373cc290419Sopenharmony_ci 374cc290419Sopenharmony_ciuint32_t HdcChannelBase::MallocChannel(HChannel *hOutChannel) 375cc290419Sopenharmony_ci{ 376cc290419Sopenharmony_ci#ifdef CONFIG_USE_JEMALLOC_DFX_INIF 377cc290419Sopenharmony_ci mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE); 378cc290419Sopenharmony_ci mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE); 379cc290419Sopenharmony_ci#endif 380cc290419Sopenharmony_ci auto hChannel = new HdcChannel(); 381cc290419Sopenharmony_ci if (!hChannel) { 382cc290419Sopenharmony_ci WRITE_LOG(LOG_FATAL, "malloc channel failed"); 383cc290419Sopenharmony_ci return 0; 384cc290419Sopenharmony_ci } 385cc290419Sopenharmony_ci hChannel->stdinTty.data = nullptr; 386cc290419Sopenharmony_ci hChannel->stdoutTty.data = nullptr; 387cc290419Sopenharmony_ci uint32_t channelId = GetChannelPseudoUid(); 388cc290419Sopenharmony_ci if (isServerOrClient) { 389cc290419Sopenharmony_ci hChannel->serverOrClient = isServerOrClient; 390cc290419Sopenharmony_ci ++channelId; // Use different value for serverForClient&client in per process 391cc290419Sopenharmony_ci } 392cc290419Sopenharmony_ci uv_tcp_init(loopMain, &hChannel->hWorkTCP); 393cc290419Sopenharmony_ci ++hChannel->uvHandleRef; 394cc290419Sopenharmony_ci hChannel->hWorkThread = uv_thread_self(); 395cc290419Sopenharmony_ci hChannel->hWorkTCP.data = hChannel; 396cc290419Sopenharmony_ci hChannel->clsChannel = this; 397cc290419Sopenharmony_ci hChannel->channelId = channelId; 398cc290419Sopenharmony_ci (void)memset_s(&hChannel->hChildWorkTCP, sizeof(hChannel->hChildWorkTCP), 0, sizeof(uv_tcp_t)); 399cc290419Sopenharmony_ci AdminChannel(OP_ADD, channelId, hChannel); 400cc290419Sopenharmony_ci *hOutChannel = hChannel; 401cc290419Sopenharmony_ci if (isServerOrClient) { 402cc290419Sopenharmony_ci WRITE_LOG(LOG_INFO, "Mallocchannel:%u", channelId); 403cc290419Sopenharmony_ci } else { 404cc290419Sopenharmony_ci WRITE_LOG(LOG_DEBUG, "Mallocchannel:%u", channelId); 405cc290419Sopenharmony_ci } 406cc290419Sopenharmony_ci return channelId; 407cc290419Sopenharmony_ci} 408cc290419Sopenharmony_ci 409cc290419Sopenharmony_ci// work when libuv-handle at struct of HdcSession has all callback finished 410cc290419Sopenharmony_civoid HdcChannelBase::FreeChannelFinally(uv_idle_t *handle) 411cc290419Sopenharmony_ci{ 412cc290419Sopenharmony_ci HChannel hChannel = (HChannel)handle->data; 413cc290419Sopenharmony_ci HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel; 414cc290419Sopenharmony_ci if (hChannel->uvHandleRef > 0) { 415cc290419Sopenharmony_ci if (hChannel->serverOrClient) { 416cc290419Sopenharmony_ci WRITE_LOG(LOG_INFO, "FreeChannelFinally uvHandleRef:%d channelId:%u sid:%u", 417cc290419Sopenharmony_ci hChannel->uvHandleRef, hChannel->channelId, hChannel->targetSessionId); 418cc290419Sopenharmony_ci } else { 419cc290419Sopenharmony_ci WRITE_LOG(LOG_DEBUG, "FreeChannelFinally uvHandleRef:%d channelId:%u sid:%u", 420cc290419Sopenharmony_ci hChannel->uvHandleRef, hChannel->channelId, hChannel->targetSessionId); 421cc290419Sopenharmony_ci } 422cc290419Sopenharmony_ci return; 423cc290419Sopenharmony_ci } 424cc290419Sopenharmony_ci thisClass->NotifyInstanceChannelFree(hChannel); 425cc290419Sopenharmony_ci thisClass->AdminChannel(OP_REMOVE, hChannel->channelId, nullptr); 426cc290419Sopenharmony_ci 427cc290419Sopenharmony_ci if (!hChannel->serverOrClient) { 428cc290419Sopenharmony_ci WRITE_LOG(LOG_DEBUG, "!!!FreeChannelFinally channelId:%u sid:%u finish", 429cc290419Sopenharmony_ci hChannel->channelId, hChannel->targetSessionId); 430cc290419Sopenharmony_ci uv_stop(thisClass->loopMain); 431cc290419Sopenharmony_ci } else { 432cc290419Sopenharmony_ci WRITE_LOG(LOG_INFO, "!!!FreeChannelFinally channelId:%u sid:%u finish", 433cc290419Sopenharmony_ci hChannel->channelId, hChannel->targetSessionId); 434cc290419Sopenharmony_ci } 435cc290419Sopenharmony_ci#ifdef HDC_HOST 436cc290419Sopenharmony_ci Base::TryCloseHandle((const uv_handle_t *)&hChannel->hChildWorkTCP); 437cc290419Sopenharmony_ci#endif 438cc290419Sopenharmony_ci delete hChannel; 439cc290419Sopenharmony_ci Base::TryCloseHandle((const uv_handle_t *)handle, Base::CloseIdleCallback); 440cc290419Sopenharmony_ci} 441cc290419Sopenharmony_ci 442cc290419Sopenharmony_civoid HdcChannelBase::FreeChannelContinue(HChannel hChannel) 443cc290419Sopenharmony_ci{ 444cc290419Sopenharmony_ci auto closeChannelHandle = [](uv_handle_t *handle) -> void { 445cc290419Sopenharmony_ci if (handle->data == nullptr) { 446cc290419Sopenharmony_ci WRITE_LOG(LOG_DEBUG, "FreeChannelContinue handle->data is nullptr"); 447cc290419Sopenharmony_ci return; 448cc290419Sopenharmony_ci } 449cc290419Sopenharmony_ci HChannel channel = reinterpret_cast<HChannel>(handle->data); 450cc290419Sopenharmony_ci --channel->uvHandleRef; 451cc290419Sopenharmony_ci Base::TryCloseHandle((uv_handle_t *)handle); 452cc290419Sopenharmony_ci }; 453cc290419Sopenharmony_ci hChannel->availTailIndex = 0; 454cc290419Sopenharmony_ci if (hChannel->ioBuf) { 455cc290419Sopenharmony_ci delete[] hChannel->ioBuf; 456cc290419Sopenharmony_ci hChannel->ioBuf = nullptr; 457cc290419Sopenharmony_ci } 458cc290419Sopenharmony_ci if (!hChannel->serverOrClient) { 459cc290419Sopenharmony_ci Base::TryCloseHandle((uv_handle_t *)&hChannel->stdinTty, closeChannelHandle); 460cc290419Sopenharmony_ci Base::TryCloseHandle((uv_handle_t *)&hChannel->stdoutTty, closeChannelHandle); 461cc290419Sopenharmony_ci } 462cc290419Sopenharmony_ci if (uv_is_closing((const uv_handle_t *)&hChannel->hWorkTCP)) { 463cc290419Sopenharmony_ci --hChannel->uvHandleRef; 464cc290419Sopenharmony_ci } else { 465cc290419Sopenharmony_ci Base::TryCloseHandle((uv_handle_t *)&hChannel->hWorkTCP, closeChannelHandle); 466cc290419Sopenharmony_ci } 467cc290419Sopenharmony_ci Base::IdleUvTask(loopMain, hChannel, FreeChannelFinally); 468cc290419Sopenharmony_ci} 469cc290419Sopenharmony_ci 470cc290419Sopenharmony_civoid HdcChannelBase::FreeChannelOpeate(uv_timer_t *handle) 471cc290419Sopenharmony_ci{ 472cc290419Sopenharmony_ci HChannel hChannel = (HChannel)handle->data; 473cc290419Sopenharmony_ci HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel; 474cc290419Sopenharmony_ci if (hChannel->ref > 0) { 475cc290419Sopenharmony_ci return; 476cc290419Sopenharmony_ci } 477cc290419Sopenharmony_ci thisClass->DispMntnInfo(hChannel); 478cc290419Sopenharmony_ci if (hChannel->hChildWorkTCP.loop) { 479cc290419Sopenharmony_ci auto ctrl = HdcSessionBase::BuildCtrlString(SP_DEATCH_CHANNEL, hChannel->channelId, nullptr, 0); 480cc290419Sopenharmony_ci bool ret = thisClass->ChannelSendSessionCtrlMsg(ctrl, hChannel->targetSessionId); 481cc290419Sopenharmony_ci if (!ret) { 482cc290419Sopenharmony_ci WRITE_LOG(LOG_WARN, "FreeChannelOpeate deatch failed channelId:%u sid:%u", 483cc290419Sopenharmony_ci hChannel->channelId, hChannel->targetSessionId); 484cc290419Sopenharmony_ci hChannel->childCleared = true; 485cc290419Sopenharmony_ci } 486cc290419Sopenharmony_ci auto callbackCheckFreeChannelContinue = [](uv_timer_t *handle) -> void { 487cc290419Sopenharmony_ci HChannel hChannel = (HChannel)handle->data; 488cc290419Sopenharmony_ci HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel; 489cc290419Sopenharmony_ci if (!hChannel->childCleared) { 490cc290419Sopenharmony_ci WRITE_LOG(LOG_WARN, "FreeChannelOpeate childCleared:%d channelId:%u sid:%u", 491cc290419Sopenharmony_ci hChannel->childCleared, hChannel->channelId, hChannel->targetSessionId); 492cc290419Sopenharmony_ci return; 493cc290419Sopenharmony_ci } 494cc290419Sopenharmony_ci Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback); 495cc290419Sopenharmony_ci thisClass->FreeChannelContinue(hChannel); 496cc290419Sopenharmony_ci }; 497cc290419Sopenharmony_ci Base::TimerUvTask(thisClass->loopMain, hChannel, callbackCheckFreeChannelContinue); 498cc290419Sopenharmony_ci } else { 499cc290419Sopenharmony_ci thisClass->FreeChannelContinue(hChannel); 500cc290419Sopenharmony_ci } 501cc290419Sopenharmony_ci Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback); 502cc290419Sopenharmony_ci} 503cc290419Sopenharmony_ci 504cc290419Sopenharmony_civoid HdcChannelBase::FreeChannel(const uint32_t channelId) 505cc290419Sopenharmony_ci{ 506cc290419Sopenharmony_ci if (threadChanneMain != uv_thread_self()) { 507cc290419Sopenharmony_ci PushAsyncMessage(channelId, ASYNC_FREE_CHANNEL, nullptr, 0); 508cc290419Sopenharmony_ci WRITE_LOG(LOG_INFO, "FreeChannel not uv_thread_self channelid:%u", channelId); 509cc290419Sopenharmony_ci return; 510cc290419Sopenharmony_ci } 511cc290419Sopenharmony_ci HChannel hChannel = AdminChannel(OP_QUERY, channelId, nullptr); 512cc290419Sopenharmony_ci do { 513cc290419Sopenharmony_ci if (!hChannel || hChannel->isDead) { 514cc290419Sopenharmony_ci WRITE_LOG(LOG_WARN, "FreeChannel hChannel nullptr or isDead channelid:%u", channelId); 515cc290419Sopenharmony_ci break; 516cc290419Sopenharmony_ci } 517cc290419Sopenharmony_ci WRITE_LOG(LOG_DEBUG, "Begin to free channel, channelid:%u", channelId); 518cc290419Sopenharmony_ci Base::TimerUvTask(loopMain, hChannel, FreeChannelOpeate, MINOR_TIMEOUT); // do immediately 519cc290419Sopenharmony_ci hChannel->isDead = true; 520cc290419Sopenharmony_ci } while (false); 521cc290419Sopenharmony_ci} 522cc290419Sopenharmony_ci 523cc290419Sopenharmony_ciHChannel HdcChannelBase::AdminChannel(const uint8_t op, const uint32_t channelId, HChannel hInput) 524cc290419Sopenharmony_ci{ 525cc290419Sopenharmony_ci HChannel hRet = nullptr; 526cc290419Sopenharmony_ci switch (op) { 527cc290419Sopenharmony_ci case OP_ADD: 528cc290419Sopenharmony_ci uv_rwlock_wrlock(&lockMapChannel); 529cc290419Sopenharmony_ci mapChannel[channelId] = hInput; 530cc290419Sopenharmony_ci uv_rwlock_wrunlock(&lockMapChannel); 531cc290419Sopenharmony_ci break; 532cc290419Sopenharmony_ci case OP_REMOVE: 533cc290419Sopenharmony_ci uv_rwlock_wrlock(&lockMapChannel); 534cc290419Sopenharmony_ci mapChannel.erase(channelId); 535cc290419Sopenharmony_ci uv_rwlock_wrunlock(&lockMapChannel); 536cc290419Sopenharmony_ci break; 537cc290419Sopenharmony_ci case OP_QUERY: 538cc290419Sopenharmony_ci uv_rwlock_rdlock(&lockMapChannel); 539cc290419Sopenharmony_ci if (mapChannel.count(channelId)) { 540cc290419Sopenharmony_ci hRet = mapChannel[channelId]; 541cc290419Sopenharmony_ci } 542cc290419Sopenharmony_ci uv_rwlock_rdunlock(&lockMapChannel); 543cc290419Sopenharmony_ci break; 544cc290419Sopenharmony_ci case OP_QUERY_REF: 545cc290419Sopenharmony_ci uv_rwlock_wrlock(&lockMapChannel); 546cc290419Sopenharmony_ci if (mapChannel.count(channelId)) { 547cc290419Sopenharmony_ci hRet = mapChannel[channelId]; 548cc290419Sopenharmony_ci ++hRet->ref; 549cc290419Sopenharmony_ci } 550cc290419Sopenharmony_ci uv_rwlock_wrunlock(&lockMapChannel); 551cc290419Sopenharmony_ci break; 552cc290419Sopenharmony_ci case OP_UPDATE: 553cc290419Sopenharmony_ci uv_rwlock_wrlock(&lockMapChannel); 554cc290419Sopenharmony_ci // remove old 555cc290419Sopenharmony_ci mapChannel.erase(channelId); 556cc290419Sopenharmony_ci mapChannel[hInput->channelId] = hInput; 557cc290419Sopenharmony_ci uv_rwlock_wrunlock(&lockMapChannel); 558cc290419Sopenharmony_ci break; 559cc290419Sopenharmony_ci default: 560cc290419Sopenharmony_ci break; 561cc290419Sopenharmony_ci } 562cc290419Sopenharmony_ci return hRet; 563cc290419Sopenharmony_ci} 564cc290419Sopenharmony_ci 565cc290419Sopenharmony_civoid HdcChannelBase::EchoToClient(HChannel hChannel, uint8_t *bufPtr, const int size) 566cc290419Sopenharmony_ci{ 567cc290419Sopenharmony_ci StartTraceScope("HdcChannelBase::EchoToClient"); 568cc290419Sopenharmony_ci uv_stream_t *sendStream = nullptr; 569cc290419Sopenharmony_ci int sizeNewBuf = size + DWORD_SERIALIZE_SIZE; 570cc290419Sopenharmony_ci auto data = new uint8_t[sizeNewBuf](); 571cc290419Sopenharmony_ci if (!data) { 572cc290419Sopenharmony_ci return; 573cc290419Sopenharmony_ci } 574cc290419Sopenharmony_ci *reinterpret_cast<uint32_t *>(data) = htonl(size); 575cc290419Sopenharmony_ci if (memcpy_s(data + DWORD_SERIALIZE_SIZE, sizeNewBuf - DWORD_SERIALIZE_SIZE, bufPtr, size)) { 576cc290419Sopenharmony_ci delete[] data; 577cc290419Sopenharmony_ci return; 578cc290419Sopenharmony_ci } 579cc290419Sopenharmony_ci sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP; 580cc290419Sopenharmony_ci if (!uv_is_closing((const uv_handle_t *)sendStream) && uv_is_writable(sendStream)) { 581cc290419Sopenharmony_ci ++hChannel->ref; 582cc290419Sopenharmony_ci Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)WriteCallback, data); 583cc290419Sopenharmony_ci } else { 584cc290419Sopenharmony_ci WRITE_LOG(LOG_WARN, "EchoToClient, channelId:%u is unwritable.", hChannel->channelId); 585cc290419Sopenharmony_ci delete[] data; 586cc290419Sopenharmony_ci } 587cc290419Sopenharmony_ci} 588cc290419Sopenharmony_ci 589cc290419Sopenharmony_civoid HdcChannelBase::EchoToAllChannelsViaSessionId(uint32_t targetSessionId, const string &echo) 590cc290419Sopenharmony_ci{ 591cc290419Sopenharmony_ci for (auto v : mapChannel) { 592cc290419Sopenharmony_ci HChannel hChannel = (HChannel)v.second; 593cc290419Sopenharmony_ci if (!hChannel->isDead && hChannel->targetSessionId == targetSessionId) { 594cc290419Sopenharmony_ci WRITE_LOG(LOG_INFO, "%s:%u %s", __FUNCTION__, targetSessionId, echo.c_str()); 595cc290419Sopenharmony_ci EchoToClient(hChannel, (uint8_t *)echo.c_str(), echo.size()); 596cc290419Sopenharmony_ci } 597cc290419Sopenharmony_ci } 598cc290419Sopenharmony_ci} 599cc290419Sopenharmony_ci 600cc290419Sopenharmony_civoid HdcChannelBase::DispMntnInfo(HChannel hChannel) 601cc290419Sopenharmony_ci{ 602cc290419Sopenharmony_ci if (!hChannel) { 603cc290419Sopenharmony_ci WRITE_LOG(LOG_WARN, "prt is null"); 604cc290419Sopenharmony_ci return; 605cc290419Sopenharmony_ci } 606cc290419Sopenharmony_ci WRITE_LOG(LOG_DEBUG, "channel info: id:%u isDead:%d ref:%u, writeFailedTimes:%u", 607cc290419Sopenharmony_ci hChannel->channelId, hChannel->isDead, uint32_t(hChannel->ref), uint32_t(hChannel->writeFailedTimes)); 608cc290419Sopenharmony_ci} 609cc290419Sopenharmony_ci} 610