1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "channel.h"
16 namespace Hdc {
HdcChannelBase(const bool serverOrClient, const string &addrString, uv_loop_t *loopMainIn)17 HdcChannelBase::HdcChannelBase(const bool serverOrClient, const string &addrString, uv_loop_t *loopMainIn)
18 {
19     SetChannelTCPString(addrString);
20     isServerOrClient = serverOrClient;
21     loopMain = loopMainIn;
22     threadChanneMain = uv_thread_self();
23     uv_rwlock_init(&mainAsync);
24     uv_async_init(loopMain, &asyncMainLoop, MainAsyncCallback);
25     uv_rwlock_init(&lockMapChannel);
26 }
27 
~HdcChannelBase()28 HdcChannelBase::~HdcChannelBase()
29 {
30     ClearChannels();
31     // clear
32     if (!uv_is_closing((uv_handle_t *)&asyncMainLoop)) {
33         uv_close((uv_handle_t *)&asyncMainLoop, nullptr);
34     }
35 
36     uv_rwlock_destroy(&mainAsync);
37     uv_rwlock_destroy(&lockMapChannel);
38 }
39 
GetChannelHandshake(string &connectKey) const40 vector<uint8_t> HdcChannelBase::GetChannelHandshake(string &connectKey) const
41 {
42     vector<uint8_t> ret;
43     struct ChannelHandShake handshake = {};
44     if (strcpy_s(handshake.banner, sizeof(handshake.banner), HANDSHAKE_MESSAGE.c_str()) != EOK) {
45         return ret;
46     }
47     if (strcpy_s(handshake.connectKey, sizeof(handshake.connectKey), connectKey.c_str()) != EOK) {
48         return ret;
49     }
50     ret.insert(ret.begin(), (uint8_t *)&handshake, (uint8_t *)&handshake + sizeof(ChannelHandShake));
51     return ret;
52 }
53 
SetChannelTCPString(const string &addrString)54 bool HdcChannelBase::SetChannelTCPString(const string &addrString)
55 {
56     bool ret = false;
57     while (true) {
58         if (addrString.find(":") == string::npos) {
59             break;
60         }
61         std::size_t found = addrString.find_last_of(":");
62         if (found == string::npos) {
63             break;
64         }
65 
66         string host = addrString.substr(0, found);
67         string port = addrString.substr(found + 1);
68 
69         channelPort = std::atoi(port.c_str());
70         sockaddr_in addrv4;
71         sockaddr_in6 addrv6;
72         if (!channelPort) {
73             break;
74         }
75 
76         if (uv_ip6_addr(host.c_str(), channelPort, &addrv6) != 0 &&
77             uv_ip4_addr(host.c_str(), channelPort, &addrv4) != 0) {
78             break;
79         }
80         channelHost = host;
81         channelHostPort = addrString;
82         ret = true;
83         break;
84     }
85     if (!ret) {
86         channelPort = 0;
87         channelHost = STRING_EMPTY;
88         channelHostPort = STRING_EMPTY;
89     }
90     return ret;
91 }
92 
ClearChannels()93 void HdcChannelBase::ClearChannels()
94 {
95     for (auto v : mapChannel) {
96         HChannel hChannel = (HChannel)v.second;
97         if (!hChannel->isDead) {
98             FreeChannel(hChannel->channelId);
99         }
100     }
101 }
102 
WorkerPendding()103 void HdcChannelBase::WorkerPendding()
104 {
105     WRITE_LOG(LOG_DEBUG, "Begin host channel pendding");
106     uv_run(loopMain, UV_RUN_DEFAULT);
107     uv_loop_close(loopMain);
108 }
109 
ReadStream(uv_stream_t *tcp, ssize_t nread, const uv_buf_t *buf)110 void HdcChannelBase::ReadStream(uv_stream_t *tcp, ssize_t nread, const uv_buf_t *buf)
111 {
112     StartTraceScope("HdcChannelBase::ReadStream");
113     int size = 0;
114     int indexBuf = 0;
115     int childRet = 0;
116     bool needExit = false;
117     HChannel hChannel = (HChannel)tcp->data;
118     HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
119     uint32_t channelId = hChannel->channelId;
120 
121     if (nread == UV_ENOBUFS) {
122         WRITE_LOG(LOG_FATAL, "ReadStream nobufs channelId:%u", channelId);
123         return;
124     } else if (nread == 0) {
125         // maybe just after accept, second client req
126         WRITE_LOG(LOG_DEBUG, "ReadStream idle read channelId:%u", channelId);
127         return;
128     } else if (nread < 0) {
129         Base::TryCloseHandle((uv_handle_t *)tcp);
130         constexpr int bufSize = 1024;
131         char buffer[bufSize] = { 0 };
132         uv_err_name_r(nread, buffer, bufSize);
133         WRITE_LOG(LOG_DEBUG, "ReadStream channelId:%u failed:%s", channelId, buffer);
134         needExit = true;
135         goto Finish;
136     } else {
137         hChannel->availTailIndex += nread;
138     }
139     while (hChannel->availTailIndex > DWORD_SERIALIZE_SIZE) {
140         size = ntohl(*reinterpret_cast<uint32_t *>(hChannel->ioBuf + indexBuf));  // big endian
141         if (size <= 0 || static_cast<uint32_t>(size) > HDC_BUF_MAX_BYTES) {
142             WRITE_LOG(LOG_FATAL, "ReadStream size:%d channelId:%u", size, channelId);
143             needExit = true;
144             break;
145         }
146         if (hChannel->availTailIndex - DWORD_SERIALIZE_SIZE < size) {
147             break;
148         }
149         childRet = thisClass->ReadChannel(hChannel, reinterpret_cast<uint8_t *>(hChannel->ioBuf) +
150                                           DWORD_SERIALIZE_SIZE + indexBuf, size);
151         if (childRet < 0) {
152             WRITE_LOG(LOG_WARN, "ReadStream childRet:%d channelId:%u keepAlive:%d",
153                 childRet, channelId, hChannel->keepAlive);
154             if (!hChannel->keepAlive) {
155                 needExit = true;
156                 break;
157             }
158         }
159         // update io
160         hChannel->availTailIndex -= (DWORD_SERIALIZE_SIZE + size);
161         indexBuf += DWORD_SERIALIZE_SIZE + size;
162     }
163     if (indexBuf > 0 && hChannel->availTailIndex > 0) {
164         if (memmove_s(hChannel->ioBuf, hChannel->bufSize, hChannel->ioBuf + indexBuf, hChannel->availTailIndex)) {
165             needExit = true;
166             goto Finish;
167         }
168     }
169 
170 Finish:
171     if (needExit) {
172         thisClass->FreeChannel(hChannel->channelId);
173         WRITE_LOG(LOG_DEBUG, "Read Stream needExit, FreeChannel finish channelId:%u", channelId);
174     }
175 }
176 
WriteCallback(uv_write_t *req, int status)177 void HdcChannelBase::WriteCallback(uv_write_t *req, int status)
178 {
179     HChannel hChannel = (HChannel)req->handle->data;
180     --hChannel->ref;
181     HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
182     if (status < 0) {
183         hChannel->writeFailedTimes++;
184         Base::TryCloseHandle((uv_handle_t *)req->handle);
185         if (!hChannel->isDead && !hChannel->ref) {
186             thisClass->FreeChannel(hChannel->channelId);
187         }
188     }
189     delete[]((uint8_t *)req->data);
190     delete req;
191 }
192 
AsyncMainLoopTask(uv_idle_t *handle)193 void HdcChannelBase::AsyncMainLoopTask(uv_idle_t *handle)
194 {
195     AsyncParam *param = (AsyncParam *)handle->data;
196     HdcChannelBase *thisClass = (HdcChannelBase *)param->thisClass;
197 
198     switch (param->method) {
199         case ASYNC_FREE_CHANNEL: {
200             // alloc/release should pair in main thread.
201             thisClass->FreeChannel(param->sid);
202             break;
203         }
204         default:
205             break;
206     }
207     if (param->data) {
208         delete[]((uint8_t *)param->data);
209     }
210     delete param;
211     uv_close((uv_handle_t *)handle, Base::CloseIdleCallback);
212 }
213 
214 // multiple uv_async_send() calls may be merged by libuv,so not each call will yield callback as expected.
215 // eg: if uv_async_send() 5 times before callback calling,it will be called only once.
216 // if uv_async_send() is called again after callback calling, it will be called again.
MainAsyncCallback(uv_async_t *handle)217 void HdcChannelBase::MainAsyncCallback(uv_async_t *handle)
218 {
219     HdcChannelBase *thisClass = (HdcChannelBase *)handle->data;
220     if (uv_is_closing((uv_handle_t *)thisClass->loopMain)) {
221         WRITE_LOG(LOG_WARN, "MainAsyncCallback uv_is_closing loopMain");
222         return;
223     }
224     list<void *>::iterator i;
225     list<void *> &lst = thisClass->lstMainThreadOP;
226     uv_rwlock_wrlock(&thisClass->mainAsync);
227     for (i = lst.begin(); i != lst.end();) {
228         AsyncParam *param = (AsyncParam *)*i;
229         Base::IdleUvTask(thisClass->loopMain, param, AsyncMainLoopTask);
230         i = lst.erase(i);
231     }
232     uv_rwlock_wrunlock(&thisClass->mainAsync);
233 }
234 
PushAsyncMessage(const uint32_t channelId, const uint8_t method, const void *data, const int dataSize)235 void HdcChannelBase::PushAsyncMessage(const uint32_t channelId, const uint8_t method, const void *data,
236                                       const int dataSize)
237 {
238     if (uv_is_closing((uv_handle_t *)&asyncMainLoop)) {
239         WRITE_LOG(LOG_WARN, "PushAsyncMessage uv_is_closing asyncMainLoop");
240         return;
241     }
242     auto param = new AsyncParam();
243     if (!param) {
244         return;
245     }
246     param->sid = channelId;  // Borrow SID storage
247     param->thisClass = this;
248     param->method = method;
249     if (dataSize > 0) {
250         param->dataSize = dataSize;
251         param->data = new uint8_t[param->dataSize]();
252         if (!param->data) {
253             delete param;
254             return;
255         }
256         if (memcpy_s((uint8_t *)param->data, param->dataSize, data, dataSize)) {
257             delete[]((uint8_t *)param->data);
258             delete param;
259             return;
260         }
261     }
262     asyncMainLoop.data = this;
263     uv_rwlock_wrlock(&mainAsync);
264     lstMainThreadOP.push_back(param);
265     uv_rwlock_wrunlock(&mainAsync);
266     uv_async_send(&asyncMainLoop);
267 }
268 
269 // add commandflag ahead real buf data
SendChannelWithCmd(HChannel hChannel, const uint16_t commandFlag, uint8_t *bufPtr, const int size)270 void HdcChannelBase::SendChannelWithCmd(HChannel hChannel, const uint16_t commandFlag, uint8_t *bufPtr, const int size)
271 {
272     StartTraceScope("HdcChannelBase::SendChannelWithCmd");
273     auto data = new uint8_t[size + sizeof(commandFlag)]();
274     if (!data) {
275         return;
276     }
277 
278     if (memcpy_s(data, size + sizeof(commandFlag), &commandFlag, sizeof(commandFlag))) {
279         delete[] data;
280         return;
281     }
282 
283     if (size > 0 && memcpy_s(data + sizeof(commandFlag), size, bufPtr, size)) {
284         delete[] data;
285         return;
286     }
287 
288     SendChannel(hChannel, data, size + sizeof(commandFlag));
289     delete[] data;
290 }
291 
SendWithCmd(const uint32_t channelId, const uint16_t commandFlag, uint8_t *bufPtr, const int size)292 void HdcChannelBase::SendWithCmd(const uint32_t channelId, const uint16_t commandFlag, uint8_t *bufPtr, const int size)
293 {
294     StartTraceScope("HdcChannelBase::SendWithCmd");
295     HChannel hChannel = reinterpret_cast<HChannel>(AdminChannel(OP_QUERY_REF, channelId, nullptr));
296     if (!hChannel) {
297         WRITE_LOG(LOG_FATAL, "SendWithCmd hChannel nullptr channelId:%u", channelId);
298         return;
299     }
300     do {
301         if (hChannel->isDead) {
302             WRITE_LOG(LOG_FATAL, "SendWithCmd isDead channelId:%u", channelId);
303             break;
304         }
305         SendChannelWithCmd(hChannel, commandFlag, bufPtr, size);
306     } while (false);
307     --hChannel->ref;
308 }
309 
SendChannel(HChannel hChannel, uint8_t *bufPtr, const int size)310 void HdcChannelBase::SendChannel(HChannel hChannel, uint8_t *bufPtr, const int size)
311 {
312     StartTraceScope("HdcChannelBase::SendChannel");
313     uv_stream_t *sendStream = nullptr;
314     int sizeNewBuf = size + DWORD_SERIALIZE_SIZE;
315     auto data = new uint8_t[sizeNewBuf]();
316     if (!data) {
317         return;
318     }
319     *reinterpret_cast<uint32_t *>(data) = htonl(size);  // big endian
320     if (memcpy_s(data + DWORD_SERIALIZE_SIZE, sizeNewBuf - DWORD_SERIALIZE_SIZE, bufPtr, size)) {
321         delete[] data;
322         return;
323     }
324     if (hChannel->hWorkThread == uv_thread_self()) {
325         sendStream = (uv_stream_t *)&hChannel->hWorkTCP;
326     } else {
327         sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP;
328     }
329     if (!uv_is_closing((const uv_handle_t *)sendStream) && uv_is_writable(sendStream)) {
330         ++hChannel->ref;
331         Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)WriteCallback, data);
332     } else {
333         delete[] data;
334     }
335 }
336 
337 // works only in current working thread
Send(const uint32_t channelId, uint8_t *bufPtr, const int size)338 void HdcChannelBase::Send(const uint32_t channelId, uint8_t *bufPtr, const int size)
339 {
340     StartTraceScope("HdcChannelBase::Send");
341     HChannel hChannel = reinterpret_cast<HChannel>(AdminChannel(OP_QUERY_REF, channelId, nullptr));
342     if (!hChannel) {
343         WRITE_LOG(LOG_FATAL, "Send hChannel nullptr channelId:%u", channelId);
344         return;
345     }
346     do {
347         if (hChannel->isDead) {
348             WRITE_LOG(LOG_FATAL, "Send isDead channelId:%u", channelId);
349             break;
350         }
351         SendChannel(hChannel, bufPtr, size);
352     } while (false);
353     --hChannel->ref;
354 }
355 
AllocCallback(uv_handle_t *handle, size_t sizeWanted, uv_buf_t *buf)356 void HdcChannelBase::AllocCallback(uv_handle_t *handle, size_t sizeWanted, uv_buf_t *buf)
357 {
358     HChannel context = (HChannel)handle->data;
359     Base::ReallocBuf(&context->ioBuf, &context->bufSize, Base::GetMaxBufSize() * BUF_EXTEND_SIZE);
360     buf->base = (char *)context->ioBuf + context->availTailIndex;
361     int size = context->bufSize - context->availTailIndex;
362     buf->len = std::min(size, static_cast<int>(sizeWanted));
363 }
364 
GetChannelPseudoUid()365 uint32_t HdcChannelBase::GetChannelPseudoUid()
366 {
367     uint32_t uid = 0;
368     do {
369         uid = Base::GetSecureRandom();
370     } while (AdminChannel(OP_QUERY, uid, nullptr) != nullptr);
371     return uid;
372 }
373 
MallocChannel(HChannel *hOutChannel)374 uint32_t HdcChannelBase::MallocChannel(HChannel *hOutChannel)
375 {
376 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
377     mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
378     mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
379 #endif
380     auto hChannel = new HdcChannel();
381     if (!hChannel) {
382         WRITE_LOG(LOG_FATAL, "malloc channel failed");
383         return 0;
384     }
385     hChannel->stdinTty.data = nullptr;
386     hChannel->stdoutTty.data = nullptr;
387     uint32_t channelId = GetChannelPseudoUid();
388     if (isServerOrClient) {
389         hChannel->serverOrClient = isServerOrClient;
390         ++channelId;  // Use different value for serverForClient&client in per process
391     }
392     uv_tcp_init(loopMain, &hChannel->hWorkTCP);
393     ++hChannel->uvHandleRef;
394     hChannel->hWorkThread = uv_thread_self();
395     hChannel->hWorkTCP.data = hChannel;
396     hChannel->clsChannel = this;
397     hChannel->channelId = channelId;
398     (void)memset_s(&hChannel->hChildWorkTCP, sizeof(hChannel->hChildWorkTCP), 0, sizeof(uv_tcp_t));
399     AdminChannel(OP_ADD, channelId, hChannel);
400     *hOutChannel = hChannel;
401     if (isServerOrClient) {
402         WRITE_LOG(LOG_INFO, "Mallocchannel:%u", channelId);
403     } else {
404         WRITE_LOG(LOG_DEBUG, "Mallocchannel:%u", channelId);
405     }
406     return channelId;
407 }
408 
409 // work when libuv-handle at struct of HdcSession has all callback finished
FreeChannelFinally(uv_idle_t *handle)410 void HdcChannelBase::FreeChannelFinally(uv_idle_t *handle)
411 {
412     HChannel hChannel = (HChannel)handle->data;
413     HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
414     if (hChannel->uvHandleRef > 0) {
415         if (hChannel->serverOrClient) {
416             WRITE_LOG(LOG_INFO, "FreeChannelFinally uvHandleRef:%d channelId:%u sid:%u",
417                 hChannel->uvHandleRef, hChannel->channelId, hChannel->targetSessionId);
418         } else {
419             WRITE_LOG(LOG_DEBUG, "FreeChannelFinally uvHandleRef:%d channelId:%u sid:%u",
420                 hChannel->uvHandleRef, hChannel->channelId, hChannel->targetSessionId);
421         }
422         return;
423     }
424     thisClass->NotifyInstanceChannelFree(hChannel);
425     thisClass->AdminChannel(OP_REMOVE, hChannel->channelId, nullptr);
426 
427     if (!hChannel->serverOrClient) {
428         WRITE_LOG(LOG_DEBUG, "!!!FreeChannelFinally channelId:%u sid:%u finish",
429             hChannel->channelId, hChannel->targetSessionId);
430         uv_stop(thisClass->loopMain);
431     } else {
432         WRITE_LOG(LOG_INFO, "!!!FreeChannelFinally channelId:%u sid:%u finish",
433             hChannel->channelId, hChannel->targetSessionId);
434     }
435 #ifdef HDC_HOST
436     Base::TryCloseHandle((const uv_handle_t *)&hChannel->hChildWorkTCP);
437 #endif
438     delete hChannel;
439     Base::TryCloseHandle((const uv_handle_t *)handle, Base::CloseIdleCallback);
440 }
441 
FreeChannelContinue(HChannel hChannel)442 void HdcChannelBase::FreeChannelContinue(HChannel hChannel)
443 {
444     auto closeChannelHandle = [](uv_handle_t *handle) -> void {
445         if (handle->data == nullptr) {
446             WRITE_LOG(LOG_DEBUG, "FreeChannelContinue handle->data is nullptr");
447             return;
448         }
449         HChannel channel = reinterpret_cast<HChannel>(handle->data);
450         --channel->uvHandleRef;
451         Base::TryCloseHandle((uv_handle_t *)handle);
452     };
453     hChannel->availTailIndex = 0;
454     if (hChannel->ioBuf) {
455         delete[] hChannel->ioBuf;
456         hChannel->ioBuf = nullptr;
457     }
458     if (!hChannel->serverOrClient) {
459         Base::TryCloseHandle((uv_handle_t *)&hChannel->stdinTty, closeChannelHandle);
460         Base::TryCloseHandle((uv_handle_t *)&hChannel->stdoutTty, closeChannelHandle);
461     }
462     if (uv_is_closing((const uv_handle_t *)&hChannel->hWorkTCP)) {
463         --hChannel->uvHandleRef;
464     } else {
465         Base::TryCloseHandle((uv_handle_t *)&hChannel->hWorkTCP, closeChannelHandle);
466     }
467     Base::IdleUvTask(loopMain, hChannel, FreeChannelFinally);
468 }
469 
FreeChannelOpeate(uv_timer_t *handle)470 void HdcChannelBase::FreeChannelOpeate(uv_timer_t *handle)
471 {
472     HChannel hChannel = (HChannel)handle->data;
473     HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
474     if (hChannel->ref > 0) {
475         return;
476     }
477     thisClass->DispMntnInfo(hChannel);
478     if (hChannel->hChildWorkTCP.loop) {
479         auto ctrl = HdcSessionBase::BuildCtrlString(SP_DEATCH_CHANNEL, hChannel->channelId, nullptr, 0);
480         bool ret = thisClass->ChannelSendSessionCtrlMsg(ctrl, hChannel->targetSessionId);
481         if (!ret) {
482             WRITE_LOG(LOG_WARN, "FreeChannelOpeate deatch failed channelId:%u sid:%u",
483                 hChannel->channelId, hChannel->targetSessionId);
484             hChannel->childCleared = true;
485         }
486         auto callbackCheckFreeChannelContinue = [](uv_timer_t *handle) -> void {
487             HChannel hChannel = (HChannel)handle->data;
488             HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
489             if (!hChannel->childCleared) {
490                 WRITE_LOG(LOG_WARN, "FreeChannelOpeate childCleared:%d channelId:%u sid:%u",
491                     hChannel->childCleared, hChannel->channelId, hChannel->targetSessionId);
492                 return;
493             }
494             Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
495             thisClass->FreeChannelContinue(hChannel);
496         };
497         Base::TimerUvTask(thisClass->loopMain, hChannel, callbackCheckFreeChannelContinue);
498     } else {
499         thisClass->FreeChannelContinue(hChannel);
500     }
501     Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
502 }
503 
FreeChannel(const uint32_t channelId)504 void HdcChannelBase::FreeChannel(const uint32_t channelId)
505 {
506     if (threadChanneMain != uv_thread_self()) {
507         PushAsyncMessage(channelId, ASYNC_FREE_CHANNEL, nullptr, 0);
508         WRITE_LOG(LOG_INFO, "FreeChannel not uv_thread_self channelid:%u", channelId);
509         return;
510     }
511     HChannel hChannel = AdminChannel(OP_QUERY, channelId, nullptr);
512     do {
513         if (!hChannel || hChannel->isDead) {
514             WRITE_LOG(LOG_WARN, "FreeChannel hChannel nullptr or isDead channelid:%u", channelId);
515             break;
516         }
517         WRITE_LOG(LOG_DEBUG, "Begin to free channel, channelid:%u", channelId);
518         Base::TimerUvTask(loopMain, hChannel, FreeChannelOpeate, MINOR_TIMEOUT);  // do immediately
519         hChannel->isDead = true;
520     } while (false);
521 }
522 
AdminChannel(const uint8_t op, const uint32_t channelId, HChannel hInput)523 HChannel HdcChannelBase::AdminChannel(const uint8_t op, const uint32_t channelId, HChannel hInput)
524 {
525     HChannel hRet = nullptr;
526     switch (op) {
527         case OP_ADD:
528             uv_rwlock_wrlock(&lockMapChannel);
529             mapChannel[channelId] = hInput;
530             uv_rwlock_wrunlock(&lockMapChannel);
531             break;
532         case OP_REMOVE:
533             uv_rwlock_wrlock(&lockMapChannel);
534             mapChannel.erase(channelId);
535             uv_rwlock_wrunlock(&lockMapChannel);
536             break;
537         case OP_QUERY:
538             uv_rwlock_rdlock(&lockMapChannel);
539             if (mapChannel.count(channelId)) {
540                 hRet = mapChannel[channelId];
541             }
542             uv_rwlock_rdunlock(&lockMapChannel);
543             break;
544         case OP_QUERY_REF:
545             uv_rwlock_wrlock(&lockMapChannel);
546             if (mapChannel.count(channelId)) {
547                 hRet = mapChannel[channelId];
548                 ++hRet->ref;
549             }
550             uv_rwlock_wrunlock(&lockMapChannel);
551             break;
552         case OP_UPDATE:
553             uv_rwlock_wrlock(&lockMapChannel);
554             // remove old
555             mapChannel.erase(channelId);
556             mapChannel[hInput->channelId] = hInput;
557             uv_rwlock_wrunlock(&lockMapChannel);
558             break;
559         default:
560             break;
561     }
562     return hRet;
563 }
564 
EchoToClient(HChannel hChannel, uint8_t *bufPtr, const int size)565 void HdcChannelBase::EchoToClient(HChannel hChannel, uint8_t *bufPtr, const int size)
566 {
567     StartTraceScope("HdcChannelBase::EchoToClient");
568     uv_stream_t *sendStream = nullptr;
569     int sizeNewBuf = size + DWORD_SERIALIZE_SIZE;
570     auto data = new uint8_t[sizeNewBuf]();
571     if (!data) {
572         return;
573     }
574     *reinterpret_cast<uint32_t *>(data) = htonl(size);
575     if (memcpy_s(data + DWORD_SERIALIZE_SIZE, sizeNewBuf - DWORD_SERIALIZE_SIZE, bufPtr, size)) {
576         delete[] data;
577         return;
578     }
579     sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP;
580     if (!uv_is_closing((const uv_handle_t *)sendStream) && uv_is_writable(sendStream)) {
581         ++hChannel->ref;
582         Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)WriteCallback, data);
583     } else {
584         WRITE_LOG(LOG_WARN, "EchoToClient, channelId:%u is unwritable.", hChannel->channelId);
585         delete[] data;
586     }
587 }
588 
EchoToAllChannelsViaSessionId(uint32_t targetSessionId, const string &echo)589 void HdcChannelBase::EchoToAllChannelsViaSessionId(uint32_t targetSessionId, const string &echo)
590 {
591     for (auto v : mapChannel) {
592         HChannel hChannel = (HChannel)v.second;
593         if (!hChannel->isDead && hChannel->targetSessionId == targetSessionId) {
594             WRITE_LOG(LOG_INFO, "%s:%u %s", __FUNCTION__, targetSessionId, echo.c_str());
595             EchoToClient(hChannel, (uint8_t *)echo.c_str(), echo.size());
596         }
597     }
598 }
599 
DispMntnInfo(HChannel hChannel)600 void HdcChannelBase::DispMntnInfo(HChannel hChannel)
601 {
602     if (!hChannel) {
603         WRITE_LOG(LOG_WARN, "prt is null");
604         return;
605     }
606     WRITE_LOG(LOG_DEBUG, "channel info: id:%u isDead:%d ref:%u, writeFailedTimes:%u",
607         hChannel->channelId, hChannel->isDead, uint32_t(hChannel->ref), uint32_t(hChannel->writeFailedTimes));
608 }
609 }
610