1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "channel.h"
16 namespace Hdc {
HdcChannelBase(const bool serverOrClient, const string &addrString, uv_loop_t *loopMainIn)17 HdcChannelBase::HdcChannelBase(const bool serverOrClient, const string &addrString, uv_loop_t *loopMainIn)
18 {
19 SetChannelTCPString(addrString);
20 isServerOrClient = serverOrClient;
21 loopMain = loopMainIn;
22 threadChanneMain = uv_thread_self();
23 uv_rwlock_init(&mainAsync);
24 uv_async_init(loopMain, &asyncMainLoop, MainAsyncCallback);
25 uv_rwlock_init(&lockMapChannel);
26 }
27
~HdcChannelBase()28 HdcChannelBase::~HdcChannelBase()
29 {
30 ClearChannels();
31 // clear
32 if (!uv_is_closing((uv_handle_t *)&asyncMainLoop)) {
33 uv_close((uv_handle_t *)&asyncMainLoop, nullptr);
34 }
35
36 uv_rwlock_destroy(&mainAsync);
37 uv_rwlock_destroy(&lockMapChannel);
38 }
39
GetChannelHandshake(string &connectKey) const40 vector<uint8_t> HdcChannelBase::GetChannelHandshake(string &connectKey) const
41 {
42 vector<uint8_t> ret;
43 struct ChannelHandShake handshake = {};
44 if (strcpy_s(handshake.banner, sizeof(handshake.banner), HANDSHAKE_MESSAGE.c_str()) != EOK) {
45 return ret;
46 }
47 if (strcpy_s(handshake.connectKey, sizeof(handshake.connectKey), connectKey.c_str()) != EOK) {
48 return ret;
49 }
50 ret.insert(ret.begin(), (uint8_t *)&handshake, (uint8_t *)&handshake + sizeof(ChannelHandShake));
51 return ret;
52 }
53
SetChannelTCPString(const string &addrString)54 bool HdcChannelBase::SetChannelTCPString(const string &addrString)
55 {
56 bool ret = false;
57 while (true) {
58 if (addrString.find(":") == string::npos) {
59 break;
60 }
61 std::size_t found = addrString.find_last_of(":");
62 if (found == string::npos) {
63 break;
64 }
65
66 string host = addrString.substr(0, found);
67 string port = addrString.substr(found + 1);
68
69 channelPort = std::atoi(port.c_str());
70 sockaddr_in addrv4;
71 sockaddr_in6 addrv6;
72 if (!channelPort) {
73 break;
74 }
75
76 if (uv_ip6_addr(host.c_str(), channelPort, &addrv6) != 0 &&
77 uv_ip4_addr(host.c_str(), channelPort, &addrv4) != 0) {
78 break;
79 }
80 channelHost = host;
81 channelHostPort = addrString;
82 ret = true;
83 break;
84 }
85 if (!ret) {
86 channelPort = 0;
87 channelHost = STRING_EMPTY;
88 channelHostPort = STRING_EMPTY;
89 }
90 return ret;
91 }
92
ClearChannels()93 void HdcChannelBase::ClearChannels()
94 {
95 for (auto v : mapChannel) {
96 HChannel hChannel = (HChannel)v.second;
97 if (!hChannel->isDead) {
98 FreeChannel(hChannel->channelId);
99 }
100 }
101 }
102
WorkerPendding()103 void HdcChannelBase::WorkerPendding()
104 {
105 WRITE_LOG(LOG_DEBUG, "Begin host channel pendding");
106 uv_run(loopMain, UV_RUN_DEFAULT);
107 uv_loop_close(loopMain);
108 }
109
ReadStream(uv_stream_t *tcp, ssize_t nread, const uv_buf_t *buf)110 void HdcChannelBase::ReadStream(uv_stream_t *tcp, ssize_t nread, const uv_buf_t *buf)
111 {
112 StartTraceScope("HdcChannelBase::ReadStream");
113 int size = 0;
114 int indexBuf = 0;
115 int childRet = 0;
116 bool needExit = false;
117 HChannel hChannel = (HChannel)tcp->data;
118 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
119 uint32_t channelId = hChannel->channelId;
120
121 if (nread == UV_ENOBUFS) {
122 WRITE_LOG(LOG_FATAL, "ReadStream nobufs channelId:%u", channelId);
123 return;
124 } else if (nread == 0) {
125 // maybe just after accept, second client req
126 WRITE_LOG(LOG_DEBUG, "ReadStream idle read channelId:%u", channelId);
127 return;
128 } else if (nread < 0) {
129 Base::TryCloseHandle((uv_handle_t *)tcp);
130 constexpr int bufSize = 1024;
131 char buffer[bufSize] = { 0 };
132 uv_err_name_r(nread, buffer, bufSize);
133 WRITE_LOG(LOG_DEBUG, "ReadStream channelId:%u failed:%s", channelId, buffer);
134 needExit = true;
135 goto Finish;
136 } else {
137 hChannel->availTailIndex += nread;
138 }
139 while (hChannel->availTailIndex > DWORD_SERIALIZE_SIZE) {
140 size = ntohl(*reinterpret_cast<uint32_t *>(hChannel->ioBuf + indexBuf)); // big endian
141 if (size <= 0 || static_cast<uint32_t>(size) > HDC_BUF_MAX_BYTES) {
142 WRITE_LOG(LOG_FATAL, "ReadStream size:%d channelId:%u", size, channelId);
143 needExit = true;
144 break;
145 }
146 if (hChannel->availTailIndex - DWORD_SERIALIZE_SIZE < size) {
147 break;
148 }
149 childRet = thisClass->ReadChannel(hChannel, reinterpret_cast<uint8_t *>(hChannel->ioBuf) +
150 DWORD_SERIALIZE_SIZE + indexBuf, size);
151 if (childRet < 0) {
152 WRITE_LOG(LOG_WARN, "ReadStream childRet:%d channelId:%u keepAlive:%d",
153 childRet, channelId, hChannel->keepAlive);
154 if (!hChannel->keepAlive) {
155 needExit = true;
156 break;
157 }
158 }
159 // update io
160 hChannel->availTailIndex -= (DWORD_SERIALIZE_SIZE + size);
161 indexBuf += DWORD_SERIALIZE_SIZE + size;
162 }
163 if (indexBuf > 0 && hChannel->availTailIndex > 0) {
164 if (memmove_s(hChannel->ioBuf, hChannel->bufSize, hChannel->ioBuf + indexBuf, hChannel->availTailIndex)) {
165 needExit = true;
166 goto Finish;
167 }
168 }
169
170 Finish:
171 if (needExit) {
172 thisClass->FreeChannel(hChannel->channelId);
173 WRITE_LOG(LOG_DEBUG, "Read Stream needExit, FreeChannel finish channelId:%u", channelId);
174 }
175 }
176
WriteCallback(uv_write_t *req, int status)177 void HdcChannelBase::WriteCallback(uv_write_t *req, int status)
178 {
179 HChannel hChannel = (HChannel)req->handle->data;
180 --hChannel->ref;
181 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
182 if (status < 0) {
183 hChannel->writeFailedTimes++;
184 Base::TryCloseHandle((uv_handle_t *)req->handle);
185 if (!hChannel->isDead && !hChannel->ref) {
186 thisClass->FreeChannel(hChannel->channelId);
187 }
188 }
189 delete[]((uint8_t *)req->data);
190 delete req;
191 }
192
AsyncMainLoopTask(uv_idle_t *handle)193 void HdcChannelBase::AsyncMainLoopTask(uv_idle_t *handle)
194 {
195 AsyncParam *param = (AsyncParam *)handle->data;
196 HdcChannelBase *thisClass = (HdcChannelBase *)param->thisClass;
197
198 switch (param->method) {
199 case ASYNC_FREE_CHANNEL: {
200 // alloc/release should pair in main thread.
201 thisClass->FreeChannel(param->sid);
202 break;
203 }
204 default:
205 break;
206 }
207 if (param->data) {
208 delete[]((uint8_t *)param->data);
209 }
210 delete param;
211 uv_close((uv_handle_t *)handle, Base::CloseIdleCallback);
212 }
213
214 // multiple uv_async_send() calls may be merged by libuv,so not each call will yield callback as expected.
215 // eg: if uv_async_send() 5 times before callback calling,it will be called only once.
216 // if uv_async_send() is called again after callback calling, it will be called again.
MainAsyncCallback(uv_async_t *handle)217 void HdcChannelBase::MainAsyncCallback(uv_async_t *handle)
218 {
219 HdcChannelBase *thisClass = (HdcChannelBase *)handle->data;
220 if (uv_is_closing((uv_handle_t *)thisClass->loopMain)) {
221 WRITE_LOG(LOG_WARN, "MainAsyncCallback uv_is_closing loopMain");
222 return;
223 }
224 list<void *>::iterator i;
225 list<void *> &lst = thisClass->lstMainThreadOP;
226 uv_rwlock_wrlock(&thisClass->mainAsync);
227 for (i = lst.begin(); i != lst.end();) {
228 AsyncParam *param = (AsyncParam *)*i;
229 Base::IdleUvTask(thisClass->loopMain, param, AsyncMainLoopTask);
230 i = lst.erase(i);
231 }
232 uv_rwlock_wrunlock(&thisClass->mainAsync);
233 }
234
PushAsyncMessage(const uint32_t channelId, const uint8_t method, const void *data, const int dataSize)235 void HdcChannelBase::PushAsyncMessage(const uint32_t channelId, const uint8_t method, const void *data,
236 const int dataSize)
237 {
238 if (uv_is_closing((uv_handle_t *)&asyncMainLoop)) {
239 WRITE_LOG(LOG_WARN, "PushAsyncMessage uv_is_closing asyncMainLoop");
240 return;
241 }
242 auto param = new AsyncParam();
243 if (!param) {
244 return;
245 }
246 param->sid = channelId; // Borrow SID storage
247 param->thisClass = this;
248 param->method = method;
249 if (dataSize > 0) {
250 param->dataSize = dataSize;
251 param->data = new uint8_t[param->dataSize]();
252 if (!param->data) {
253 delete param;
254 return;
255 }
256 if (memcpy_s((uint8_t *)param->data, param->dataSize, data, dataSize)) {
257 delete[]((uint8_t *)param->data);
258 delete param;
259 return;
260 }
261 }
262 asyncMainLoop.data = this;
263 uv_rwlock_wrlock(&mainAsync);
264 lstMainThreadOP.push_back(param);
265 uv_rwlock_wrunlock(&mainAsync);
266 uv_async_send(&asyncMainLoop);
267 }
268
269 // add commandflag ahead real buf data
SendChannelWithCmd(HChannel hChannel, const uint16_t commandFlag, uint8_t *bufPtr, const int size)270 void HdcChannelBase::SendChannelWithCmd(HChannel hChannel, const uint16_t commandFlag, uint8_t *bufPtr, const int size)
271 {
272 StartTraceScope("HdcChannelBase::SendChannelWithCmd");
273 auto data = new uint8_t[size + sizeof(commandFlag)]();
274 if (!data) {
275 return;
276 }
277
278 if (memcpy_s(data, size + sizeof(commandFlag), &commandFlag, sizeof(commandFlag))) {
279 delete[] data;
280 return;
281 }
282
283 if (size > 0 && memcpy_s(data + sizeof(commandFlag), size, bufPtr, size)) {
284 delete[] data;
285 return;
286 }
287
288 SendChannel(hChannel, data, size + sizeof(commandFlag));
289 delete[] data;
290 }
291
SendWithCmd(const uint32_t channelId, const uint16_t commandFlag, uint8_t *bufPtr, const int size)292 void HdcChannelBase::SendWithCmd(const uint32_t channelId, const uint16_t commandFlag, uint8_t *bufPtr, const int size)
293 {
294 StartTraceScope("HdcChannelBase::SendWithCmd");
295 HChannel hChannel = reinterpret_cast<HChannel>(AdminChannel(OP_QUERY_REF, channelId, nullptr));
296 if (!hChannel) {
297 WRITE_LOG(LOG_FATAL, "SendWithCmd hChannel nullptr channelId:%u", channelId);
298 return;
299 }
300 do {
301 if (hChannel->isDead) {
302 WRITE_LOG(LOG_FATAL, "SendWithCmd isDead channelId:%u", channelId);
303 break;
304 }
305 SendChannelWithCmd(hChannel, commandFlag, bufPtr, size);
306 } while (false);
307 --hChannel->ref;
308 }
309
SendChannel(HChannel hChannel, uint8_t *bufPtr, const int size)310 void HdcChannelBase::SendChannel(HChannel hChannel, uint8_t *bufPtr, const int size)
311 {
312 StartTraceScope("HdcChannelBase::SendChannel");
313 uv_stream_t *sendStream = nullptr;
314 int sizeNewBuf = size + DWORD_SERIALIZE_SIZE;
315 auto data = new uint8_t[sizeNewBuf]();
316 if (!data) {
317 return;
318 }
319 *reinterpret_cast<uint32_t *>(data) = htonl(size); // big endian
320 if (memcpy_s(data + DWORD_SERIALIZE_SIZE, sizeNewBuf - DWORD_SERIALIZE_SIZE, bufPtr, size)) {
321 delete[] data;
322 return;
323 }
324 if (hChannel->hWorkThread == uv_thread_self()) {
325 sendStream = (uv_stream_t *)&hChannel->hWorkTCP;
326 } else {
327 sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP;
328 }
329 if (!uv_is_closing((const uv_handle_t *)sendStream) && uv_is_writable(sendStream)) {
330 ++hChannel->ref;
331 Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)WriteCallback, data);
332 } else {
333 delete[] data;
334 }
335 }
336
337 // works only in current working thread
Send(const uint32_t channelId, uint8_t *bufPtr, const int size)338 void HdcChannelBase::Send(const uint32_t channelId, uint8_t *bufPtr, const int size)
339 {
340 StartTraceScope("HdcChannelBase::Send");
341 HChannel hChannel = reinterpret_cast<HChannel>(AdminChannel(OP_QUERY_REF, channelId, nullptr));
342 if (!hChannel) {
343 WRITE_LOG(LOG_FATAL, "Send hChannel nullptr channelId:%u", channelId);
344 return;
345 }
346 do {
347 if (hChannel->isDead) {
348 WRITE_LOG(LOG_FATAL, "Send isDead channelId:%u", channelId);
349 break;
350 }
351 SendChannel(hChannel, bufPtr, size);
352 } while (false);
353 --hChannel->ref;
354 }
355
AllocCallback(uv_handle_t *handle, size_t sizeWanted, uv_buf_t *buf)356 void HdcChannelBase::AllocCallback(uv_handle_t *handle, size_t sizeWanted, uv_buf_t *buf)
357 {
358 HChannel context = (HChannel)handle->data;
359 Base::ReallocBuf(&context->ioBuf, &context->bufSize, Base::GetMaxBufSize() * BUF_EXTEND_SIZE);
360 buf->base = (char *)context->ioBuf + context->availTailIndex;
361 int size = context->bufSize - context->availTailIndex;
362 buf->len = std::min(size, static_cast<int>(sizeWanted));
363 }
364
GetChannelPseudoUid()365 uint32_t HdcChannelBase::GetChannelPseudoUid()
366 {
367 uint32_t uid = 0;
368 do {
369 uid = Base::GetSecureRandom();
370 } while (AdminChannel(OP_QUERY, uid, nullptr) != nullptr);
371 return uid;
372 }
373
MallocChannel(HChannel *hOutChannel)374 uint32_t HdcChannelBase::MallocChannel(HChannel *hOutChannel)
375 {
376 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
377 mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
378 mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
379 #endif
380 auto hChannel = new HdcChannel();
381 if (!hChannel) {
382 WRITE_LOG(LOG_FATAL, "malloc channel failed");
383 return 0;
384 }
385 hChannel->stdinTty.data = nullptr;
386 hChannel->stdoutTty.data = nullptr;
387 uint32_t channelId = GetChannelPseudoUid();
388 if (isServerOrClient) {
389 hChannel->serverOrClient = isServerOrClient;
390 ++channelId; // Use different value for serverForClient&client in per process
391 }
392 uv_tcp_init(loopMain, &hChannel->hWorkTCP);
393 ++hChannel->uvHandleRef;
394 hChannel->hWorkThread = uv_thread_self();
395 hChannel->hWorkTCP.data = hChannel;
396 hChannel->clsChannel = this;
397 hChannel->channelId = channelId;
398 (void)memset_s(&hChannel->hChildWorkTCP, sizeof(hChannel->hChildWorkTCP), 0, sizeof(uv_tcp_t));
399 AdminChannel(OP_ADD, channelId, hChannel);
400 *hOutChannel = hChannel;
401 if (isServerOrClient) {
402 WRITE_LOG(LOG_INFO, "Mallocchannel:%u", channelId);
403 } else {
404 WRITE_LOG(LOG_DEBUG, "Mallocchannel:%u", channelId);
405 }
406 return channelId;
407 }
408
409 // work when libuv-handle at struct of HdcSession has all callback finished
FreeChannelFinally(uv_idle_t *handle)410 void HdcChannelBase::FreeChannelFinally(uv_idle_t *handle)
411 {
412 HChannel hChannel = (HChannel)handle->data;
413 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
414 if (hChannel->uvHandleRef > 0) {
415 if (hChannel->serverOrClient) {
416 WRITE_LOG(LOG_INFO, "FreeChannelFinally uvHandleRef:%d channelId:%u sid:%u",
417 hChannel->uvHandleRef, hChannel->channelId, hChannel->targetSessionId);
418 } else {
419 WRITE_LOG(LOG_DEBUG, "FreeChannelFinally uvHandleRef:%d channelId:%u sid:%u",
420 hChannel->uvHandleRef, hChannel->channelId, hChannel->targetSessionId);
421 }
422 return;
423 }
424 thisClass->NotifyInstanceChannelFree(hChannel);
425 thisClass->AdminChannel(OP_REMOVE, hChannel->channelId, nullptr);
426
427 if (!hChannel->serverOrClient) {
428 WRITE_LOG(LOG_DEBUG, "!!!FreeChannelFinally channelId:%u sid:%u finish",
429 hChannel->channelId, hChannel->targetSessionId);
430 uv_stop(thisClass->loopMain);
431 } else {
432 WRITE_LOG(LOG_INFO, "!!!FreeChannelFinally channelId:%u sid:%u finish",
433 hChannel->channelId, hChannel->targetSessionId);
434 }
435 #ifdef HDC_HOST
436 Base::TryCloseHandle((const uv_handle_t *)&hChannel->hChildWorkTCP);
437 #endif
438 delete hChannel;
439 Base::TryCloseHandle((const uv_handle_t *)handle, Base::CloseIdleCallback);
440 }
441
FreeChannelContinue(HChannel hChannel)442 void HdcChannelBase::FreeChannelContinue(HChannel hChannel)
443 {
444 auto closeChannelHandle = [](uv_handle_t *handle) -> void {
445 if (handle->data == nullptr) {
446 WRITE_LOG(LOG_DEBUG, "FreeChannelContinue handle->data is nullptr");
447 return;
448 }
449 HChannel channel = reinterpret_cast<HChannel>(handle->data);
450 --channel->uvHandleRef;
451 Base::TryCloseHandle((uv_handle_t *)handle);
452 };
453 hChannel->availTailIndex = 0;
454 if (hChannel->ioBuf) {
455 delete[] hChannel->ioBuf;
456 hChannel->ioBuf = nullptr;
457 }
458 if (!hChannel->serverOrClient) {
459 Base::TryCloseHandle((uv_handle_t *)&hChannel->stdinTty, closeChannelHandle);
460 Base::TryCloseHandle((uv_handle_t *)&hChannel->stdoutTty, closeChannelHandle);
461 }
462 if (uv_is_closing((const uv_handle_t *)&hChannel->hWorkTCP)) {
463 --hChannel->uvHandleRef;
464 } else {
465 Base::TryCloseHandle((uv_handle_t *)&hChannel->hWorkTCP, closeChannelHandle);
466 }
467 Base::IdleUvTask(loopMain, hChannel, FreeChannelFinally);
468 }
469
FreeChannelOpeate(uv_timer_t *handle)470 void HdcChannelBase::FreeChannelOpeate(uv_timer_t *handle)
471 {
472 HChannel hChannel = (HChannel)handle->data;
473 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
474 if (hChannel->ref > 0) {
475 return;
476 }
477 thisClass->DispMntnInfo(hChannel);
478 if (hChannel->hChildWorkTCP.loop) {
479 auto ctrl = HdcSessionBase::BuildCtrlString(SP_DEATCH_CHANNEL, hChannel->channelId, nullptr, 0);
480 bool ret = thisClass->ChannelSendSessionCtrlMsg(ctrl, hChannel->targetSessionId);
481 if (!ret) {
482 WRITE_LOG(LOG_WARN, "FreeChannelOpeate deatch failed channelId:%u sid:%u",
483 hChannel->channelId, hChannel->targetSessionId);
484 hChannel->childCleared = true;
485 }
486 auto callbackCheckFreeChannelContinue = [](uv_timer_t *handle) -> void {
487 HChannel hChannel = (HChannel)handle->data;
488 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
489 if (!hChannel->childCleared) {
490 WRITE_LOG(LOG_WARN, "FreeChannelOpeate childCleared:%d channelId:%u sid:%u",
491 hChannel->childCleared, hChannel->channelId, hChannel->targetSessionId);
492 return;
493 }
494 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
495 thisClass->FreeChannelContinue(hChannel);
496 };
497 Base::TimerUvTask(thisClass->loopMain, hChannel, callbackCheckFreeChannelContinue);
498 } else {
499 thisClass->FreeChannelContinue(hChannel);
500 }
501 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
502 }
503
FreeChannel(const uint32_t channelId)504 void HdcChannelBase::FreeChannel(const uint32_t channelId)
505 {
506 if (threadChanneMain != uv_thread_self()) {
507 PushAsyncMessage(channelId, ASYNC_FREE_CHANNEL, nullptr, 0);
508 WRITE_LOG(LOG_INFO, "FreeChannel not uv_thread_self channelid:%u", channelId);
509 return;
510 }
511 HChannel hChannel = AdminChannel(OP_QUERY, channelId, nullptr);
512 do {
513 if (!hChannel || hChannel->isDead) {
514 WRITE_LOG(LOG_WARN, "FreeChannel hChannel nullptr or isDead channelid:%u", channelId);
515 break;
516 }
517 WRITE_LOG(LOG_DEBUG, "Begin to free channel, channelid:%u", channelId);
518 Base::TimerUvTask(loopMain, hChannel, FreeChannelOpeate, MINOR_TIMEOUT); // do immediately
519 hChannel->isDead = true;
520 } while (false);
521 }
522
AdminChannel(const uint8_t op, const uint32_t channelId, HChannel hInput)523 HChannel HdcChannelBase::AdminChannel(const uint8_t op, const uint32_t channelId, HChannel hInput)
524 {
525 HChannel hRet = nullptr;
526 switch (op) {
527 case OP_ADD:
528 uv_rwlock_wrlock(&lockMapChannel);
529 mapChannel[channelId] = hInput;
530 uv_rwlock_wrunlock(&lockMapChannel);
531 break;
532 case OP_REMOVE:
533 uv_rwlock_wrlock(&lockMapChannel);
534 mapChannel.erase(channelId);
535 uv_rwlock_wrunlock(&lockMapChannel);
536 break;
537 case OP_QUERY:
538 uv_rwlock_rdlock(&lockMapChannel);
539 if (mapChannel.count(channelId)) {
540 hRet = mapChannel[channelId];
541 }
542 uv_rwlock_rdunlock(&lockMapChannel);
543 break;
544 case OP_QUERY_REF:
545 uv_rwlock_wrlock(&lockMapChannel);
546 if (mapChannel.count(channelId)) {
547 hRet = mapChannel[channelId];
548 ++hRet->ref;
549 }
550 uv_rwlock_wrunlock(&lockMapChannel);
551 break;
552 case OP_UPDATE:
553 uv_rwlock_wrlock(&lockMapChannel);
554 // remove old
555 mapChannel.erase(channelId);
556 mapChannel[hInput->channelId] = hInput;
557 uv_rwlock_wrunlock(&lockMapChannel);
558 break;
559 default:
560 break;
561 }
562 return hRet;
563 }
564
EchoToClient(HChannel hChannel, uint8_t *bufPtr, const int size)565 void HdcChannelBase::EchoToClient(HChannel hChannel, uint8_t *bufPtr, const int size)
566 {
567 StartTraceScope("HdcChannelBase::EchoToClient");
568 uv_stream_t *sendStream = nullptr;
569 int sizeNewBuf = size + DWORD_SERIALIZE_SIZE;
570 auto data = new uint8_t[sizeNewBuf]();
571 if (!data) {
572 return;
573 }
574 *reinterpret_cast<uint32_t *>(data) = htonl(size);
575 if (memcpy_s(data + DWORD_SERIALIZE_SIZE, sizeNewBuf - DWORD_SERIALIZE_SIZE, bufPtr, size)) {
576 delete[] data;
577 return;
578 }
579 sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP;
580 if (!uv_is_closing((const uv_handle_t *)sendStream) && uv_is_writable(sendStream)) {
581 ++hChannel->ref;
582 Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)WriteCallback, data);
583 } else {
584 WRITE_LOG(LOG_WARN, "EchoToClient, channelId:%u is unwritable.", hChannel->channelId);
585 delete[] data;
586 }
587 }
588
EchoToAllChannelsViaSessionId(uint32_t targetSessionId, const string &echo)589 void HdcChannelBase::EchoToAllChannelsViaSessionId(uint32_t targetSessionId, const string &echo)
590 {
591 for (auto v : mapChannel) {
592 HChannel hChannel = (HChannel)v.second;
593 if (!hChannel->isDead && hChannel->targetSessionId == targetSessionId) {
594 WRITE_LOG(LOG_INFO, "%s:%u %s", __FUNCTION__, targetSessionId, echo.c_str());
595 EchoToClient(hChannel, (uint8_t *)echo.c_str(), echo.size());
596 }
597 }
598 }
599
DispMntnInfo(HChannel hChannel)600 void HdcChannelBase::DispMntnInfo(HChannel hChannel)
601 {
602 if (!hChannel) {
603 WRITE_LOG(LOG_WARN, "prt is null");
604 return;
605 }
606 WRITE_LOG(LOG_DEBUG, "channel info: id:%u isDead:%d ref:%u, writeFailedTimes:%u",
607 hChannel->channelId, hChannel->isDead, uint32_t(hChannel->ref), uint32_t(hChannel->writeFailedTimes));
608 }
609 }
610