1cc290419Sopenharmony_ci/*
2cc290419Sopenharmony_ci * Copyright (C) 2021 Huawei Device Co., Ltd.
3cc290419Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
4cc290419Sopenharmony_ci * you may not use this file except in compliance with the License.
5cc290419Sopenharmony_ci * You may obtain a copy of the License at
6cc290419Sopenharmony_ci *
7cc290419Sopenharmony_ci *     http://www.apache.org/licenses/LICENSE-2.0
8cc290419Sopenharmony_ci *
9cc290419Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software
10cc290419Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
11cc290419Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12cc290419Sopenharmony_ci * See the License for the specific language governing permissions and
13cc290419Sopenharmony_ci * limitations under the License.
14cc290419Sopenharmony_ci */
15cc290419Sopenharmony_ci#include "channel.h"
16cc290419Sopenharmony_cinamespace Hdc {
17cc290419Sopenharmony_ciHdcChannelBase::HdcChannelBase(const bool serverOrClient, const string &addrString, uv_loop_t *loopMainIn)
18cc290419Sopenharmony_ci{
19cc290419Sopenharmony_ci    SetChannelTCPString(addrString);
20cc290419Sopenharmony_ci    isServerOrClient = serverOrClient;
21cc290419Sopenharmony_ci    loopMain = loopMainIn;
22cc290419Sopenharmony_ci    threadChanneMain = uv_thread_self();
23cc290419Sopenharmony_ci    uv_rwlock_init(&mainAsync);
24cc290419Sopenharmony_ci    uv_async_init(loopMain, &asyncMainLoop, MainAsyncCallback);
25cc290419Sopenharmony_ci    uv_rwlock_init(&lockMapChannel);
26cc290419Sopenharmony_ci}
27cc290419Sopenharmony_ci
28cc290419Sopenharmony_ciHdcChannelBase::~HdcChannelBase()
29cc290419Sopenharmony_ci{
30cc290419Sopenharmony_ci    ClearChannels();
31cc290419Sopenharmony_ci    // clear
32cc290419Sopenharmony_ci    if (!uv_is_closing((uv_handle_t *)&asyncMainLoop)) {
33cc290419Sopenharmony_ci        uv_close((uv_handle_t *)&asyncMainLoop, nullptr);
34cc290419Sopenharmony_ci    }
35cc290419Sopenharmony_ci
36cc290419Sopenharmony_ci    uv_rwlock_destroy(&mainAsync);
37cc290419Sopenharmony_ci    uv_rwlock_destroy(&lockMapChannel);
38cc290419Sopenharmony_ci}
39cc290419Sopenharmony_ci
40cc290419Sopenharmony_civector<uint8_t> HdcChannelBase::GetChannelHandshake(string &connectKey) const
41cc290419Sopenharmony_ci{
42cc290419Sopenharmony_ci    vector<uint8_t> ret;
43cc290419Sopenharmony_ci    struct ChannelHandShake handshake = {};
44cc290419Sopenharmony_ci    if (strcpy_s(handshake.banner, sizeof(handshake.banner), HANDSHAKE_MESSAGE.c_str()) != EOK) {
45cc290419Sopenharmony_ci        return ret;
46cc290419Sopenharmony_ci    }
47cc290419Sopenharmony_ci    if (strcpy_s(handshake.connectKey, sizeof(handshake.connectKey), connectKey.c_str()) != EOK) {
48cc290419Sopenharmony_ci        return ret;
49cc290419Sopenharmony_ci    }
50cc290419Sopenharmony_ci    ret.insert(ret.begin(), (uint8_t *)&handshake, (uint8_t *)&handshake + sizeof(ChannelHandShake));
51cc290419Sopenharmony_ci    return ret;
52cc290419Sopenharmony_ci}
53cc290419Sopenharmony_ci
54cc290419Sopenharmony_cibool HdcChannelBase::SetChannelTCPString(const string &addrString)
55cc290419Sopenharmony_ci{
56cc290419Sopenharmony_ci    bool ret = false;
57cc290419Sopenharmony_ci    while (true) {
58cc290419Sopenharmony_ci        if (addrString.find(":") == string::npos) {
59cc290419Sopenharmony_ci            break;
60cc290419Sopenharmony_ci        }
61cc290419Sopenharmony_ci        std::size_t found = addrString.find_last_of(":");
62cc290419Sopenharmony_ci        if (found == string::npos) {
63cc290419Sopenharmony_ci            break;
64cc290419Sopenharmony_ci        }
65cc290419Sopenharmony_ci
66cc290419Sopenharmony_ci        string host = addrString.substr(0, found);
67cc290419Sopenharmony_ci        string port = addrString.substr(found + 1);
68cc290419Sopenharmony_ci
69cc290419Sopenharmony_ci        channelPort = std::atoi(port.c_str());
70cc290419Sopenharmony_ci        sockaddr_in addrv4;
71cc290419Sopenharmony_ci        sockaddr_in6 addrv6;
72cc290419Sopenharmony_ci        if (!channelPort) {
73cc290419Sopenharmony_ci            break;
74cc290419Sopenharmony_ci        }
75cc290419Sopenharmony_ci
76cc290419Sopenharmony_ci        if (uv_ip6_addr(host.c_str(), channelPort, &addrv6) != 0 &&
77cc290419Sopenharmony_ci            uv_ip4_addr(host.c_str(), channelPort, &addrv4) != 0) {
78cc290419Sopenharmony_ci            break;
79cc290419Sopenharmony_ci        }
80cc290419Sopenharmony_ci        channelHost = host;
81cc290419Sopenharmony_ci        channelHostPort = addrString;
82cc290419Sopenharmony_ci        ret = true;
83cc290419Sopenharmony_ci        break;
84cc290419Sopenharmony_ci    }
85cc290419Sopenharmony_ci    if (!ret) {
86cc290419Sopenharmony_ci        channelPort = 0;
87cc290419Sopenharmony_ci        channelHost = STRING_EMPTY;
88cc290419Sopenharmony_ci        channelHostPort = STRING_EMPTY;
89cc290419Sopenharmony_ci    }
90cc290419Sopenharmony_ci    return ret;
91cc290419Sopenharmony_ci}
92cc290419Sopenharmony_ci
93cc290419Sopenharmony_civoid HdcChannelBase::ClearChannels()
94cc290419Sopenharmony_ci{
95cc290419Sopenharmony_ci    for (auto v : mapChannel) {
96cc290419Sopenharmony_ci        HChannel hChannel = (HChannel)v.second;
97cc290419Sopenharmony_ci        if (!hChannel->isDead) {
98cc290419Sopenharmony_ci            FreeChannel(hChannel->channelId);
99cc290419Sopenharmony_ci        }
100cc290419Sopenharmony_ci    }
101cc290419Sopenharmony_ci}
102cc290419Sopenharmony_ci
103cc290419Sopenharmony_civoid HdcChannelBase::WorkerPendding()
104cc290419Sopenharmony_ci{
105cc290419Sopenharmony_ci    WRITE_LOG(LOG_DEBUG, "Begin host channel pendding");
106cc290419Sopenharmony_ci    uv_run(loopMain, UV_RUN_DEFAULT);
107cc290419Sopenharmony_ci    uv_loop_close(loopMain);
108cc290419Sopenharmony_ci}
109cc290419Sopenharmony_ci
110cc290419Sopenharmony_civoid HdcChannelBase::ReadStream(uv_stream_t *tcp, ssize_t nread, const uv_buf_t *buf)
111cc290419Sopenharmony_ci{
112cc290419Sopenharmony_ci    StartTraceScope("HdcChannelBase::ReadStream");
113cc290419Sopenharmony_ci    int size = 0;
114cc290419Sopenharmony_ci    int indexBuf = 0;
115cc290419Sopenharmony_ci    int childRet = 0;
116cc290419Sopenharmony_ci    bool needExit = false;
117cc290419Sopenharmony_ci    HChannel hChannel = (HChannel)tcp->data;
118cc290419Sopenharmony_ci    HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
119cc290419Sopenharmony_ci    uint32_t channelId = hChannel->channelId;
120cc290419Sopenharmony_ci
121cc290419Sopenharmony_ci    if (nread == UV_ENOBUFS) {
122cc290419Sopenharmony_ci        WRITE_LOG(LOG_FATAL, "ReadStream nobufs channelId:%u", channelId);
123cc290419Sopenharmony_ci        return;
124cc290419Sopenharmony_ci    } else if (nread == 0) {
125cc290419Sopenharmony_ci        // maybe just after accept, second client req
126cc290419Sopenharmony_ci        WRITE_LOG(LOG_DEBUG, "ReadStream idle read channelId:%u", channelId);
127cc290419Sopenharmony_ci        return;
128cc290419Sopenharmony_ci    } else if (nread < 0) {
129cc290419Sopenharmony_ci        Base::TryCloseHandle((uv_handle_t *)tcp);
130cc290419Sopenharmony_ci        constexpr int bufSize = 1024;
131cc290419Sopenharmony_ci        char buffer[bufSize] = { 0 };
132cc290419Sopenharmony_ci        uv_err_name_r(nread, buffer, bufSize);
133cc290419Sopenharmony_ci        WRITE_LOG(LOG_DEBUG, "ReadStream channelId:%u failed:%s", channelId, buffer);
134cc290419Sopenharmony_ci        needExit = true;
135cc290419Sopenharmony_ci        goto Finish;
136cc290419Sopenharmony_ci    } else {
137cc290419Sopenharmony_ci        hChannel->availTailIndex += nread;
138cc290419Sopenharmony_ci    }
139cc290419Sopenharmony_ci    while (hChannel->availTailIndex > DWORD_SERIALIZE_SIZE) {
140cc290419Sopenharmony_ci        size = ntohl(*reinterpret_cast<uint32_t *>(hChannel->ioBuf + indexBuf));  // big endian
141cc290419Sopenharmony_ci        if (size <= 0 || static_cast<uint32_t>(size) > HDC_BUF_MAX_BYTES) {
142cc290419Sopenharmony_ci            WRITE_LOG(LOG_FATAL, "ReadStream size:%d channelId:%u", size, channelId);
143cc290419Sopenharmony_ci            needExit = true;
144cc290419Sopenharmony_ci            break;
145cc290419Sopenharmony_ci        }
146cc290419Sopenharmony_ci        if (hChannel->availTailIndex - DWORD_SERIALIZE_SIZE < size) {
147cc290419Sopenharmony_ci            break;
148cc290419Sopenharmony_ci        }
149cc290419Sopenharmony_ci        childRet = thisClass->ReadChannel(hChannel, reinterpret_cast<uint8_t *>(hChannel->ioBuf) +
150cc290419Sopenharmony_ci                                          DWORD_SERIALIZE_SIZE + indexBuf, size);
151cc290419Sopenharmony_ci        if (childRet < 0) {
152cc290419Sopenharmony_ci            WRITE_LOG(LOG_WARN, "ReadStream childRet:%d channelId:%u keepAlive:%d",
153cc290419Sopenharmony_ci                childRet, channelId, hChannel->keepAlive);
154cc290419Sopenharmony_ci            if (!hChannel->keepAlive) {
155cc290419Sopenharmony_ci                needExit = true;
156cc290419Sopenharmony_ci                break;
157cc290419Sopenharmony_ci            }
158cc290419Sopenharmony_ci        }
159cc290419Sopenharmony_ci        // update io
160cc290419Sopenharmony_ci        hChannel->availTailIndex -= (DWORD_SERIALIZE_SIZE + size);
161cc290419Sopenharmony_ci        indexBuf += DWORD_SERIALIZE_SIZE + size;
162cc290419Sopenharmony_ci    }
163cc290419Sopenharmony_ci    if (indexBuf > 0 && hChannel->availTailIndex > 0) {
164cc290419Sopenharmony_ci        if (memmove_s(hChannel->ioBuf, hChannel->bufSize, hChannel->ioBuf + indexBuf, hChannel->availTailIndex)) {
165cc290419Sopenharmony_ci            needExit = true;
166cc290419Sopenharmony_ci            goto Finish;
167cc290419Sopenharmony_ci        }
168cc290419Sopenharmony_ci    }
169cc290419Sopenharmony_ci
170cc290419Sopenharmony_ciFinish:
171cc290419Sopenharmony_ci    if (needExit) {
172cc290419Sopenharmony_ci        thisClass->FreeChannel(hChannel->channelId);
173cc290419Sopenharmony_ci        WRITE_LOG(LOG_DEBUG, "Read Stream needExit, FreeChannel finish channelId:%u", channelId);
174cc290419Sopenharmony_ci    }
175cc290419Sopenharmony_ci}
176cc290419Sopenharmony_ci
177cc290419Sopenharmony_civoid HdcChannelBase::WriteCallback(uv_write_t *req, int status)
178cc290419Sopenharmony_ci{
179cc290419Sopenharmony_ci    HChannel hChannel = (HChannel)req->handle->data;
180cc290419Sopenharmony_ci    --hChannel->ref;
181cc290419Sopenharmony_ci    HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
182cc290419Sopenharmony_ci    if (status < 0) {
183cc290419Sopenharmony_ci        hChannel->writeFailedTimes++;
184cc290419Sopenharmony_ci        Base::TryCloseHandle((uv_handle_t *)req->handle);
185cc290419Sopenharmony_ci        if (!hChannel->isDead && !hChannel->ref) {
186cc290419Sopenharmony_ci            thisClass->FreeChannel(hChannel->channelId);
187cc290419Sopenharmony_ci        }
188cc290419Sopenharmony_ci    }
189cc290419Sopenharmony_ci    delete[]((uint8_t *)req->data);
190cc290419Sopenharmony_ci    delete req;
191cc290419Sopenharmony_ci}
192cc290419Sopenharmony_ci
193cc290419Sopenharmony_civoid HdcChannelBase::AsyncMainLoopTask(uv_idle_t *handle)
194cc290419Sopenharmony_ci{
195cc290419Sopenharmony_ci    AsyncParam *param = (AsyncParam *)handle->data;
196cc290419Sopenharmony_ci    HdcChannelBase *thisClass = (HdcChannelBase *)param->thisClass;
197cc290419Sopenharmony_ci
198cc290419Sopenharmony_ci    switch (param->method) {
199cc290419Sopenharmony_ci        case ASYNC_FREE_CHANNEL: {
200cc290419Sopenharmony_ci            // alloc/release should pair in main thread.
201cc290419Sopenharmony_ci            thisClass->FreeChannel(param->sid);
202cc290419Sopenharmony_ci            break;
203cc290419Sopenharmony_ci        }
204cc290419Sopenharmony_ci        default:
205cc290419Sopenharmony_ci            break;
206cc290419Sopenharmony_ci    }
207cc290419Sopenharmony_ci    if (param->data) {
208cc290419Sopenharmony_ci        delete[]((uint8_t *)param->data);
209cc290419Sopenharmony_ci    }
210cc290419Sopenharmony_ci    delete param;
211cc290419Sopenharmony_ci    uv_close((uv_handle_t *)handle, Base::CloseIdleCallback);
212cc290419Sopenharmony_ci}
213cc290419Sopenharmony_ci
214cc290419Sopenharmony_ci// multiple uv_async_send() calls may be merged by libuv,so not each call will yield callback as expected.
215cc290419Sopenharmony_ci// eg: if uv_async_send() 5 times before callback calling,it will be called only once.
216cc290419Sopenharmony_ci// if uv_async_send() is called again after callback calling, it will be called again.
217cc290419Sopenharmony_civoid HdcChannelBase::MainAsyncCallback(uv_async_t *handle)
218cc290419Sopenharmony_ci{
219cc290419Sopenharmony_ci    HdcChannelBase *thisClass = (HdcChannelBase *)handle->data;
220cc290419Sopenharmony_ci    if (uv_is_closing((uv_handle_t *)thisClass->loopMain)) {
221cc290419Sopenharmony_ci        WRITE_LOG(LOG_WARN, "MainAsyncCallback uv_is_closing loopMain");
222cc290419Sopenharmony_ci        return;
223cc290419Sopenharmony_ci    }
224cc290419Sopenharmony_ci    list<void *>::iterator i;
225cc290419Sopenharmony_ci    list<void *> &lst = thisClass->lstMainThreadOP;
226cc290419Sopenharmony_ci    uv_rwlock_wrlock(&thisClass->mainAsync);
227cc290419Sopenharmony_ci    for (i = lst.begin(); i != lst.end();) {
228cc290419Sopenharmony_ci        AsyncParam *param = (AsyncParam *)*i;
229cc290419Sopenharmony_ci        Base::IdleUvTask(thisClass->loopMain, param, AsyncMainLoopTask);
230cc290419Sopenharmony_ci        i = lst.erase(i);
231cc290419Sopenharmony_ci    }
232cc290419Sopenharmony_ci    uv_rwlock_wrunlock(&thisClass->mainAsync);
233cc290419Sopenharmony_ci}
234cc290419Sopenharmony_ci
235cc290419Sopenharmony_civoid HdcChannelBase::PushAsyncMessage(const uint32_t channelId, const uint8_t method, const void *data,
236cc290419Sopenharmony_ci                                      const int dataSize)
237cc290419Sopenharmony_ci{
238cc290419Sopenharmony_ci    if (uv_is_closing((uv_handle_t *)&asyncMainLoop)) {
239cc290419Sopenharmony_ci        WRITE_LOG(LOG_WARN, "PushAsyncMessage uv_is_closing asyncMainLoop");
240cc290419Sopenharmony_ci        return;
241cc290419Sopenharmony_ci    }
242cc290419Sopenharmony_ci    auto param = new AsyncParam();
243cc290419Sopenharmony_ci    if (!param) {
244cc290419Sopenharmony_ci        return;
245cc290419Sopenharmony_ci    }
246cc290419Sopenharmony_ci    param->sid = channelId;  // Borrow SID storage
247cc290419Sopenharmony_ci    param->thisClass = this;
248cc290419Sopenharmony_ci    param->method = method;
249cc290419Sopenharmony_ci    if (dataSize > 0) {
250cc290419Sopenharmony_ci        param->dataSize = dataSize;
251cc290419Sopenharmony_ci        param->data = new uint8_t[param->dataSize]();
252cc290419Sopenharmony_ci        if (!param->data) {
253cc290419Sopenharmony_ci            delete param;
254cc290419Sopenharmony_ci            return;
255cc290419Sopenharmony_ci        }
256cc290419Sopenharmony_ci        if (memcpy_s((uint8_t *)param->data, param->dataSize, data, dataSize)) {
257cc290419Sopenharmony_ci            delete[]((uint8_t *)param->data);
258cc290419Sopenharmony_ci            delete param;
259cc290419Sopenharmony_ci            return;
260cc290419Sopenharmony_ci        }
261cc290419Sopenharmony_ci    }
262cc290419Sopenharmony_ci    asyncMainLoop.data = this;
263cc290419Sopenharmony_ci    uv_rwlock_wrlock(&mainAsync);
264cc290419Sopenharmony_ci    lstMainThreadOP.push_back(param);
265cc290419Sopenharmony_ci    uv_rwlock_wrunlock(&mainAsync);
266cc290419Sopenharmony_ci    uv_async_send(&asyncMainLoop);
267cc290419Sopenharmony_ci}
268cc290419Sopenharmony_ci
269cc290419Sopenharmony_ci// add commandflag ahead real buf data
270cc290419Sopenharmony_civoid HdcChannelBase::SendChannelWithCmd(HChannel hChannel, const uint16_t commandFlag, uint8_t *bufPtr, const int size)
271cc290419Sopenharmony_ci{
272cc290419Sopenharmony_ci    StartTraceScope("HdcChannelBase::SendChannelWithCmd");
273cc290419Sopenharmony_ci    auto data = new uint8_t[size + sizeof(commandFlag)]();
274cc290419Sopenharmony_ci    if (!data) {
275cc290419Sopenharmony_ci        return;
276cc290419Sopenharmony_ci    }
277cc290419Sopenharmony_ci
278cc290419Sopenharmony_ci    if (memcpy_s(data, size + sizeof(commandFlag), &commandFlag, sizeof(commandFlag))) {
279cc290419Sopenharmony_ci        delete[] data;
280cc290419Sopenharmony_ci        return;
281cc290419Sopenharmony_ci    }
282cc290419Sopenharmony_ci
283cc290419Sopenharmony_ci    if (size > 0 && memcpy_s(data + sizeof(commandFlag), size, bufPtr, size)) {
284cc290419Sopenharmony_ci        delete[] data;
285cc290419Sopenharmony_ci        return;
286cc290419Sopenharmony_ci    }
287cc290419Sopenharmony_ci
288cc290419Sopenharmony_ci    SendChannel(hChannel, data, size + sizeof(commandFlag));
289cc290419Sopenharmony_ci    delete[] data;
290cc290419Sopenharmony_ci}
291cc290419Sopenharmony_ci
292cc290419Sopenharmony_civoid HdcChannelBase::SendWithCmd(const uint32_t channelId, const uint16_t commandFlag, uint8_t *bufPtr, const int size)
293cc290419Sopenharmony_ci{
294cc290419Sopenharmony_ci    StartTraceScope("HdcChannelBase::SendWithCmd");
295cc290419Sopenharmony_ci    HChannel hChannel = reinterpret_cast<HChannel>(AdminChannel(OP_QUERY_REF, channelId, nullptr));
296cc290419Sopenharmony_ci    if (!hChannel) {
297cc290419Sopenharmony_ci        WRITE_LOG(LOG_FATAL, "SendWithCmd hChannel nullptr channelId:%u", channelId);
298cc290419Sopenharmony_ci        return;
299cc290419Sopenharmony_ci    }
300cc290419Sopenharmony_ci    do {
301cc290419Sopenharmony_ci        if (hChannel->isDead) {
302cc290419Sopenharmony_ci            WRITE_LOG(LOG_FATAL, "SendWithCmd isDead channelId:%u", channelId);
303cc290419Sopenharmony_ci            break;
304cc290419Sopenharmony_ci        }
305cc290419Sopenharmony_ci        SendChannelWithCmd(hChannel, commandFlag, bufPtr, size);
306cc290419Sopenharmony_ci    } while (false);
307cc290419Sopenharmony_ci    --hChannel->ref;
308cc290419Sopenharmony_ci}
309cc290419Sopenharmony_ci
310cc290419Sopenharmony_civoid HdcChannelBase::SendChannel(HChannel hChannel, uint8_t *bufPtr, const int size)
311cc290419Sopenharmony_ci{
312cc290419Sopenharmony_ci    StartTraceScope("HdcChannelBase::SendChannel");
313cc290419Sopenharmony_ci    uv_stream_t *sendStream = nullptr;
314cc290419Sopenharmony_ci    int sizeNewBuf = size + DWORD_SERIALIZE_SIZE;
315cc290419Sopenharmony_ci    auto data = new uint8_t[sizeNewBuf]();
316cc290419Sopenharmony_ci    if (!data) {
317cc290419Sopenharmony_ci        return;
318cc290419Sopenharmony_ci    }
319cc290419Sopenharmony_ci    *reinterpret_cast<uint32_t *>(data) = htonl(size);  // big endian
320cc290419Sopenharmony_ci    if (memcpy_s(data + DWORD_SERIALIZE_SIZE, sizeNewBuf - DWORD_SERIALIZE_SIZE, bufPtr, size)) {
321cc290419Sopenharmony_ci        delete[] data;
322cc290419Sopenharmony_ci        return;
323cc290419Sopenharmony_ci    }
324cc290419Sopenharmony_ci    if (hChannel->hWorkThread == uv_thread_self()) {
325cc290419Sopenharmony_ci        sendStream = (uv_stream_t *)&hChannel->hWorkTCP;
326cc290419Sopenharmony_ci    } else {
327cc290419Sopenharmony_ci        sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP;
328cc290419Sopenharmony_ci    }
329cc290419Sopenharmony_ci    if (!uv_is_closing((const uv_handle_t *)sendStream) && uv_is_writable(sendStream)) {
330cc290419Sopenharmony_ci        ++hChannel->ref;
331cc290419Sopenharmony_ci        Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)WriteCallback, data);
332cc290419Sopenharmony_ci    } else {
333cc290419Sopenharmony_ci        delete[] data;
334cc290419Sopenharmony_ci    }
335cc290419Sopenharmony_ci}
336cc290419Sopenharmony_ci
337cc290419Sopenharmony_ci// works only in current working thread
338cc290419Sopenharmony_civoid HdcChannelBase::Send(const uint32_t channelId, uint8_t *bufPtr, const int size)
339cc290419Sopenharmony_ci{
340cc290419Sopenharmony_ci    StartTraceScope("HdcChannelBase::Send");
341cc290419Sopenharmony_ci    HChannel hChannel = reinterpret_cast<HChannel>(AdminChannel(OP_QUERY_REF, channelId, nullptr));
342cc290419Sopenharmony_ci    if (!hChannel) {
343cc290419Sopenharmony_ci        WRITE_LOG(LOG_FATAL, "Send hChannel nullptr channelId:%u", channelId);
344cc290419Sopenharmony_ci        return;
345cc290419Sopenharmony_ci    }
346cc290419Sopenharmony_ci    do {
347cc290419Sopenharmony_ci        if (hChannel->isDead) {
348cc290419Sopenharmony_ci            WRITE_LOG(LOG_FATAL, "Send isDead channelId:%u", channelId);
349cc290419Sopenharmony_ci            break;
350cc290419Sopenharmony_ci        }
351cc290419Sopenharmony_ci        SendChannel(hChannel, bufPtr, size);
352cc290419Sopenharmony_ci    } while (false);
353cc290419Sopenharmony_ci    --hChannel->ref;
354cc290419Sopenharmony_ci}
355cc290419Sopenharmony_ci
356cc290419Sopenharmony_civoid HdcChannelBase::AllocCallback(uv_handle_t *handle, size_t sizeWanted, uv_buf_t *buf)
357cc290419Sopenharmony_ci{
358cc290419Sopenharmony_ci    HChannel context = (HChannel)handle->data;
359cc290419Sopenharmony_ci    Base::ReallocBuf(&context->ioBuf, &context->bufSize, Base::GetMaxBufSize() * BUF_EXTEND_SIZE);
360cc290419Sopenharmony_ci    buf->base = (char *)context->ioBuf + context->availTailIndex;
361cc290419Sopenharmony_ci    int size = context->bufSize - context->availTailIndex;
362cc290419Sopenharmony_ci    buf->len = std::min(size, static_cast<int>(sizeWanted));
363cc290419Sopenharmony_ci}
364cc290419Sopenharmony_ci
365cc290419Sopenharmony_ciuint32_t HdcChannelBase::GetChannelPseudoUid()
366cc290419Sopenharmony_ci{
367cc290419Sopenharmony_ci    uint32_t uid = 0;
368cc290419Sopenharmony_ci    do {
369cc290419Sopenharmony_ci        uid = Base::GetSecureRandom();
370cc290419Sopenharmony_ci    } while (AdminChannel(OP_QUERY, uid, nullptr) != nullptr);
371cc290419Sopenharmony_ci    return uid;
372cc290419Sopenharmony_ci}
373cc290419Sopenharmony_ci
374cc290419Sopenharmony_ciuint32_t HdcChannelBase::MallocChannel(HChannel *hOutChannel)
375cc290419Sopenharmony_ci{
376cc290419Sopenharmony_ci#ifdef CONFIG_USE_JEMALLOC_DFX_INIF
377cc290419Sopenharmony_ci    mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
378cc290419Sopenharmony_ci    mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
379cc290419Sopenharmony_ci#endif
380cc290419Sopenharmony_ci    auto hChannel = new HdcChannel();
381cc290419Sopenharmony_ci    if (!hChannel) {
382cc290419Sopenharmony_ci        WRITE_LOG(LOG_FATAL, "malloc channel failed");
383cc290419Sopenharmony_ci        return 0;
384cc290419Sopenharmony_ci    }
385cc290419Sopenharmony_ci    hChannel->stdinTty.data = nullptr;
386cc290419Sopenharmony_ci    hChannel->stdoutTty.data = nullptr;
387cc290419Sopenharmony_ci    uint32_t channelId = GetChannelPseudoUid();
388cc290419Sopenharmony_ci    if (isServerOrClient) {
389cc290419Sopenharmony_ci        hChannel->serverOrClient = isServerOrClient;
390cc290419Sopenharmony_ci        ++channelId;  // Use different value for serverForClient&client in per process
391cc290419Sopenharmony_ci    }
392cc290419Sopenharmony_ci    uv_tcp_init(loopMain, &hChannel->hWorkTCP);
393cc290419Sopenharmony_ci    ++hChannel->uvHandleRef;
394cc290419Sopenharmony_ci    hChannel->hWorkThread = uv_thread_self();
395cc290419Sopenharmony_ci    hChannel->hWorkTCP.data = hChannel;
396cc290419Sopenharmony_ci    hChannel->clsChannel = this;
397cc290419Sopenharmony_ci    hChannel->channelId = channelId;
398cc290419Sopenharmony_ci    (void)memset_s(&hChannel->hChildWorkTCP, sizeof(hChannel->hChildWorkTCP), 0, sizeof(uv_tcp_t));
399cc290419Sopenharmony_ci    AdminChannel(OP_ADD, channelId, hChannel);
400cc290419Sopenharmony_ci    *hOutChannel = hChannel;
401cc290419Sopenharmony_ci    if (isServerOrClient) {
402cc290419Sopenharmony_ci        WRITE_LOG(LOG_INFO, "Mallocchannel:%u", channelId);
403cc290419Sopenharmony_ci    } else {
404cc290419Sopenharmony_ci        WRITE_LOG(LOG_DEBUG, "Mallocchannel:%u", channelId);
405cc290419Sopenharmony_ci    }
406cc290419Sopenharmony_ci    return channelId;
407cc290419Sopenharmony_ci}
408cc290419Sopenharmony_ci
409cc290419Sopenharmony_ci// work when libuv-handle at struct of HdcSession has all callback finished
410cc290419Sopenharmony_civoid HdcChannelBase::FreeChannelFinally(uv_idle_t *handle)
411cc290419Sopenharmony_ci{
412cc290419Sopenharmony_ci    HChannel hChannel = (HChannel)handle->data;
413cc290419Sopenharmony_ci    HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
414cc290419Sopenharmony_ci    if (hChannel->uvHandleRef > 0) {
415cc290419Sopenharmony_ci        if (hChannel->serverOrClient) {
416cc290419Sopenharmony_ci            WRITE_LOG(LOG_INFO, "FreeChannelFinally uvHandleRef:%d channelId:%u sid:%u",
417cc290419Sopenharmony_ci                hChannel->uvHandleRef, hChannel->channelId, hChannel->targetSessionId);
418cc290419Sopenharmony_ci        } else {
419cc290419Sopenharmony_ci            WRITE_LOG(LOG_DEBUG, "FreeChannelFinally uvHandleRef:%d channelId:%u sid:%u",
420cc290419Sopenharmony_ci                hChannel->uvHandleRef, hChannel->channelId, hChannel->targetSessionId);
421cc290419Sopenharmony_ci        }
422cc290419Sopenharmony_ci        return;
423cc290419Sopenharmony_ci    }
424cc290419Sopenharmony_ci    thisClass->NotifyInstanceChannelFree(hChannel);
425cc290419Sopenharmony_ci    thisClass->AdminChannel(OP_REMOVE, hChannel->channelId, nullptr);
426cc290419Sopenharmony_ci
427cc290419Sopenharmony_ci    if (!hChannel->serverOrClient) {
428cc290419Sopenharmony_ci        WRITE_LOG(LOG_DEBUG, "!!!FreeChannelFinally channelId:%u sid:%u finish",
429cc290419Sopenharmony_ci            hChannel->channelId, hChannel->targetSessionId);
430cc290419Sopenharmony_ci        uv_stop(thisClass->loopMain);
431cc290419Sopenharmony_ci    } else {
432cc290419Sopenharmony_ci        WRITE_LOG(LOG_INFO, "!!!FreeChannelFinally channelId:%u sid:%u finish",
433cc290419Sopenharmony_ci            hChannel->channelId, hChannel->targetSessionId);
434cc290419Sopenharmony_ci    }
435cc290419Sopenharmony_ci#ifdef HDC_HOST
436cc290419Sopenharmony_ci    Base::TryCloseHandle((const uv_handle_t *)&hChannel->hChildWorkTCP);
437cc290419Sopenharmony_ci#endif
438cc290419Sopenharmony_ci    delete hChannel;
439cc290419Sopenharmony_ci    Base::TryCloseHandle((const uv_handle_t *)handle, Base::CloseIdleCallback);
440cc290419Sopenharmony_ci}
441cc290419Sopenharmony_ci
442cc290419Sopenharmony_civoid HdcChannelBase::FreeChannelContinue(HChannel hChannel)
443cc290419Sopenharmony_ci{
444cc290419Sopenharmony_ci    auto closeChannelHandle = [](uv_handle_t *handle) -> void {
445cc290419Sopenharmony_ci        if (handle->data == nullptr) {
446cc290419Sopenharmony_ci            WRITE_LOG(LOG_DEBUG, "FreeChannelContinue handle->data is nullptr");
447cc290419Sopenharmony_ci            return;
448cc290419Sopenharmony_ci        }
449cc290419Sopenharmony_ci        HChannel channel = reinterpret_cast<HChannel>(handle->data);
450cc290419Sopenharmony_ci        --channel->uvHandleRef;
451cc290419Sopenharmony_ci        Base::TryCloseHandle((uv_handle_t *)handle);
452cc290419Sopenharmony_ci    };
453cc290419Sopenharmony_ci    hChannel->availTailIndex = 0;
454cc290419Sopenharmony_ci    if (hChannel->ioBuf) {
455cc290419Sopenharmony_ci        delete[] hChannel->ioBuf;
456cc290419Sopenharmony_ci        hChannel->ioBuf = nullptr;
457cc290419Sopenharmony_ci    }
458cc290419Sopenharmony_ci    if (!hChannel->serverOrClient) {
459cc290419Sopenharmony_ci        Base::TryCloseHandle((uv_handle_t *)&hChannel->stdinTty, closeChannelHandle);
460cc290419Sopenharmony_ci        Base::TryCloseHandle((uv_handle_t *)&hChannel->stdoutTty, closeChannelHandle);
461cc290419Sopenharmony_ci    }
462cc290419Sopenharmony_ci    if (uv_is_closing((const uv_handle_t *)&hChannel->hWorkTCP)) {
463cc290419Sopenharmony_ci        --hChannel->uvHandleRef;
464cc290419Sopenharmony_ci    } else {
465cc290419Sopenharmony_ci        Base::TryCloseHandle((uv_handle_t *)&hChannel->hWorkTCP, closeChannelHandle);
466cc290419Sopenharmony_ci    }
467cc290419Sopenharmony_ci    Base::IdleUvTask(loopMain, hChannel, FreeChannelFinally);
468cc290419Sopenharmony_ci}
469cc290419Sopenharmony_ci
470cc290419Sopenharmony_civoid HdcChannelBase::FreeChannelOpeate(uv_timer_t *handle)
471cc290419Sopenharmony_ci{
472cc290419Sopenharmony_ci    HChannel hChannel = (HChannel)handle->data;
473cc290419Sopenharmony_ci    HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
474cc290419Sopenharmony_ci    if (hChannel->ref > 0) {
475cc290419Sopenharmony_ci        return;
476cc290419Sopenharmony_ci    }
477cc290419Sopenharmony_ci    thisClass->DispMntnInfo(hChannel);
478cc290419Sopenharmony_ci    if (hChannel->hChildWorkTCP.loop) {
479cc290419Sopenharmony_ci        auto ctrl = HdcSessionBase::BuildCtrlString(SP_DEATCH_CHANNEL, hChannel->channelId, nullptr, 0);
480cc290419Sopenharmony_ci        bool ret = thisClass->ChannelSendSessionCtrlMsg(ctrl, hChannel->targetSessionId);
481cc290419Sopenharmony_ci        if (!ret) {
482cc290419Sopenharmony_ci            WRITE_LOG(LOG_WARN, "FreeChannelOpeate deatch failed channelId:%u sid:%u",
483cc290419Sopenharmony_ci                hChannel->channelId, hChannel->targetSessionId);
484cc290419Sopenharmony_ci            hChannel->childCleared = true;
485cc290419Sopenharmony_ci        }
486cc290419Sopenharmony_ci        auto callbackCheckFreeChannelContinue = [](uv_timer_t *handle) -> void {
487cc290419Sopenharmony_ci            HChannel hChannel = (HChannel)handle->data;
488cc290419Sopenharmony_ci            HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
489cc290419Sopenharmony_ci            if (!hChannel->childCleared) {
490cc290419Sopenharmony_ci                WRITE_LOG(LOG_WARN, "FreeChannelOpeate childCleared:%d channelId:%u sid:%u",
491cc290419Sopenharmony_ci                    hChannel->childCleared, hChannel->channelId, hChannel->targetSessionId);
492cc290419Sopenharmony_ci                return;
493cc290419Sopenharmony_ci            }
494cc290419Sopenharmony_ci            Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
495cc290419Sopenharmony_ci            thisClass->FreeChannelContinue(hChannel);
496cc290419Sopenharmony_ci        };
497cc290419Sopenharmony_ci        Base::TimerUvTask(thisClass->loopMain, hChannel, callbackCheckFreeChannelContinue);
498cc290419Sopenharmony_ci    } else {
499cc290419Sopenharmony_ci        thisClass->FreeChannelContinue(hChannel);
500cc290419Sopenharmony_ci    }
501cc290419Sopenharmony_ci    Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
502cc290419Sopenharmony_ci}
503cc290419Sopenharmony_ci
504cc290419Sopenharmony_civoid HdcChannelBase::FreeChannel(const uint32_t channelId)
505cc290419Sopenharmony_ci{
506cc290419Sopenharmony_ci    if (threadChanneMain != uv_thread_self()) {
507cc290419Sopenharmony_ci        PushAsyncMessage(channelId, ASYNC_FREE_CHANNEL, nullptr, 0);
508cc290419Sopenharmony_ci        WRITE_LOG(LOG_INFO, "FreeChannel not uv_thread_self channelid:%u", channelId);
509cc290419Sopenharmony_ci        return;
510cc290419Sopenharmony_ci    }
511cc290419Sopenharmony_ci    HChannel hChannel = AdminChannel(OP_QUERY, channelId, nullptr);
512cc290419Sopenharmony_ci    do {
513cc290419Sopenharmony_ci        if (!hChannel || hChannel->isDead) {
514cc290419Sopenharmony_ci            WRITE_LOG(LOG_WARN, "FreeChannel hChannel nullptr or isDead channelid:%u", channelId);
515cc290419Sopenharmony_ci            break;
516cc290419Sopenharmony_ci        }
517cc290419Sopenharmony_ci        WRITE_LOG(LOG_DEBUG, "Begin to free channel, channelid:%u", channelId);
518cc290419Sopenharmony_ci        Base::TimerUvTask(loopMain, hChannel, FreeChannelOpeate, MINOR_TIMEOUT);  // do immediately
519cc290419Sopenharmony_ci        hChannel->isDead = true;
520cc290419Sopenharmony_ci    } while (false);
521cc290419Sopenharmony_ci}
522cc290419Sopenharmony_ci
523cc290419Sopenharmony_ciHChannel HdcChannelBase::AdminChannel(const uint8_t op, const uint32_t channelId, HChannel hInput)
524cc290419Sopenharmony_ci{
525cc290419Sopenharmony_ci    HChannel hRet = nullptr;
526cc290419Sopenharmony_ci    switch (op) {
527cc290419Sopenharmony_ci        case OP_ADD:
528cc290419Sopenharmony_ci            uv_rwlock_wrlock(&lockMapChannel);
529cc290419Sopenharmony_ci            mapChannel[channelId] = hInput;
530cc290419Sopenharmony_ci            uv_rwlock_wrunlock(&lockMapChannel);
531cc290419Sopenharmony_ci            break;
532cc290419Sopenharmony_ci        case OP_REMOVE:
533cc290419Sopenharmony_ci            uv_rwlock_wrlock(&lockMapChannel);
534cc290419Sopenharmony_ci            mapChannel.erase(channelId);
535cc290419Sopenharmony_ci            uv_rwlock_wrunlock(&lockMapChannel);
536cc290419Sopenharmony_ci            break;
537cc290419Sopenharmony_ci        case OP_QUERY:
538cc290419Sopenharmony_ci            uv_rwlock_rdlock(&lockMapChannel);
539cc290419Sopenharmony_ci            if (mapChannel.count(channelId)) {
540cc290419Sopenharmony_ci                hRet = mapChannel[channelId];
541cc290419Sopenharmony_ci            }
542cc290419Sopenharmony_ci            uv_rwlock_rdunlock(&lockMapChannel);
543cc290419Sopenharmony_ci            break;
544cc290419Sopenharmony_ci        case OP_QUERY_REF:
545cc290419Sopenharmony_ci            uv_rwlock_wrlock(&lockMapChannel);
546cc290419Sopenharmony_ci            if (mapChannel.count(channelId)) {
547cc290419Sopenharmony_ci                hRet = mapChannel[channelId];
548cc290419Sopenharmony_ci                ++hRet->ref;
549cc290419Sopenharmony_ci            }
550cc290419Sopenharmony_ci            uv_rwlock_wrunlock(&lockMapChannel);
551cc290419Sopenharmony_ci            break;
552cc290419Sopenharmony_ci        case OP_UPDATE:
553cc290419Sopenharmony_ci            uv_rwlock_wrlock(&lockMapChannel);
554cc290419Sopenharmony_ci            // remove old
555cc290419Sopenharmony_ci            mapChannel.erase(channelId);
556cc290419Sopenharmony_ci            mapChannel[hInput->channelId] = hInput;
557cc290419Sopenharmony_ci            uv_rwlock_wrunlock(&lockMapChannel);
558cc290419Sopenharmony_ci            break;
559cc290419Sopenharmony_ci        default:
560cc290419Sopenharmony_ci            break;
561cc290419Sopenharmony_ci    }
562cc290419Sopenharmony_ci    return hRet;
563cc290419Sopenharmony_ci}
564cc290419Sopenharmony_ci
565cc290419Sopenharmony_civoid HdcChannelBase::EchoToClient(HChannel hChannel, uint8_t *bufPtr, const int size)
566cc290419Sopenharmony_ci{
567cc290419Sopenharmony_ci    StartTraceScope("HdcChannelBase::EchoToClient");
568cc290419Sopenharmony_ci    uv_stream_t *sendStream = nullptr;
569cc290419Sopenharmony_ci    int sizeNewBuf = size + DWORD_SERIALIZE_SIZE;
570cc290419Sopenharmony_ci    auto data = new uint8_t[sizeNewBuf]();
571cc290419Sopenharmony_ci    if (!data) {
572cc290419Sopenharmony_ci        return;
573cc290419Sopenharmony_ci    }
574cc290419Sopenharmony_ci    *reinterpret_cast<uint32_t *>(data) = htonl(size);
575cc290419Sopenharmony_ci    if (memcpy_s(data + DWORD_SERIALIZE_SIZE, sizeNewBuf - DWORD_SERIALIZE_SIZE, bufPtr, size)) {
576cc290419Sopenharmony_ci        delete[] data;
577cc290419Sopenharmony_ci        return;
578cc290419Sopenharmony_ci    }
579cc290419Sopenharmony_ci    sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP;
580cc290419Sopenharmony_ci    if (!uv_is_closing((const uv_handle_t *)sendStream) && uv_is_writable(sendStream)) {
581cc290419Sopenharmony_ci        ++hChannel->ref;
582cc290419Sopenharmony_ci        Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)WriteCallback, data);
583cc290419Sopenharmony_ci    } else {
584cc290419Sopenharmony_ci        WRITE_LOG(LOG_WARN, "EchoToClient, channelId:%u is unwritable.", hChannel->channelId);
585cc290419Sopenharmony_ci        delete[] data;
586cc290419Sopenharmony_ci    }
587cc290419Sopenharmony_ci}
588cc290419Sopenharmony_ci
589cc290419Sopenharmony_civoid HdcChannelBase::EchoToAllChannelsViaSessionId(uint32_t targetSessionId, const string &echo)
590cc290419Sopenharmony_ci{
591cc290419Sopenharmony_ci    for (auto v : mapChannel) {
592cc290419Sopenharmony_ci        HChannel hChannel = (HChannel)v.second;
593cc290419Sopenharmony_ci        if (!hChannel->isDead && hChannel->targetSessionId == targetSessionId) {
594cc290419Sopenharmony_ci            WRITE_LOG(LOG_INFO, "%s:%u %s", __FUNCTION__, targetSessionId, echo.c_str());
595cc290419Sopenharmony_ci            EchoToClient(hChannel, (uint8_t *)echo.c_str(), echo.size());
596cc290419Sopenharmony_ci        }
597cc290419Sopenharmony_ci    }
598cc290419Sopenharmony_ci}
599cc290419Sopenharmony_ci
600cc290419Sopenharmony_civoid HdcChannelBase::DispMntnInfo(HChannel hChannel)
601cc290419Sopenharmony_ci{
602cc290419Sopenharmony_ci    if (!hChannel) {
603cc290419Sopenharmony_ci        WRITE_LOG(LOG_WARN, "prt is null");
604cc290419Sopenharmony_ci        return;
605cc290419Sopenharmony_ci    }
606cc290419Sopenharmony_ci    WRITE_LOG(LOG_DEBUG, "channel info: id:%u isDead:%d ref:%u, writeFailedTimes:%u",
607cc290419Sopenharmony_ci        hChannel->channelId, hChannel->isDead, uint32_t(hChannel->ref), uint32_t(hChannel->writeFailedTimes));
608cc290419Sopenharmony_ci}
609cc290419Sopenharmony_ci}
610