1/*
2 * Copyright (c) 2021-2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include "stream_client.h"
17
18#include "include/util.h"
19
20#undef LOG_TAG
21#define LOG_TAG "StreamClient"
22
23namespace OHOS {
24namespace Msdp {
25namespace DeviceStatus {
26
27StreamClient::StreamClient()
28{
29    CALL_DEBUG_ENTER;
30}
31
32StreamClient::~StreamClient()
33{
34    CALL_DEBUG_ENTER;
35}
36
37int32_t StreamClient::StartConnect()
38{
39    CALL_DEBUG_ENTER;
40    if (Socket() < 0) {
41        FI_HILOGE("Socket failed");
42        return RET_ERR;
43    }
44    OnConnected();
45    return RET_OK;
46}
47
48bool StreamClient::SendMsg(const char *buf, size_t size) const
49{
50    CHKPF(buf);
51    if ((size == 0) || (size > MAX_PACKET_BUF_SIZE)) {
52        FI_HILOGE("Stream buffer size out of range");
53        return false;
54    }
55    if (fd_ < 0) {
56        FI_HILOGE("The fd_ is less than 0");
57        return false;
58    }
59
60    int32_t retryCount = 0;
61    int32_t idx = 0;
62    const int32_t bufSize = static_cast<int32_t>(size);
63    int32_t remSize = bufSize;
64    while (remSize > 0 && retryCount < SEND_RETRY_LIMIT) {
65        retryCount += 1;
66        ssize_t number = send(fd_, &buf[idx], remSize, MSG_DONTWAIT | MSG_NOSIGNAL);
67        if (number < 0) {
68            if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK) {
69                FI_HILOGW("Continue for errno EAGAIN|EINTR|EWOULDBLOCK, errno:%{public}d", errno);
70                continue;
71            }
72            FI_HILOGE("Send return failed, error:%{public}d, fd:%{public}d", errno, fd_);
73            return false;
74        }
75        idx += number;
76        remSize -= number;
77        if (remSize > 0) {
78            usleep(SEND_RETRY_SLEEP_TIME);
79        }
80    }
81    if (retryCount >= SEND_RETRY_LIMIT || remSize != 0) {
82        FI_HILOGE("Too many times to send :%{public}d/%{public}d, size:%{public}d/%{public}d, fd:%{public}d",
83            retryCount, SEND_RETRY_LIMIT, idx, bufSize, fd_);
84        return false;
85    }
86    return true;
87}
88
89bool StreamClient::SendMsg(const NetPacket &pkt) const
90{
91    if (pkt.ChkRWError()) {
92        FI_HILOGE("Read and write status is error");
93        return false;
94    }
95    StreamBuffer buf;
96    if (!pkt.MakeData(buf)) {
97        FI_HILOGE("Failed to buffer packet");
98        return false;
99    }
100    return SendMsg(buf.Data(), buf.Size());
101}
102
103bool StreamClient::StartClient(MsgClientFunCallback fun)
104{
105    CALL_DEBUG_ENTER;
106    if (isRunning_ || hasConnected_) {
107        FI_HILOGE("Client is connected or started");
108        return false;
109    }
110    hasClient_ = true;
111    recvFun_ = fun;
112    if (StartConnect() < 0) {
113        FI_HILOGW("Client connection failed, Try again later");
114    }
115    return true;
116}
117
118void StreamClient::Stop()
119{
120    CALL_DEBUG_ENTER;
121    hasClient_ = false;
122    Close();
123}
124} // namespace DeviceStatus
125} // namespace Msdp
126} // namespace OHOS