1/* 2 * Copyright (c) 2021-2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16#include "stream_client.h" 17 18#include "include/util.h" 19 20#undef LOG_TAG 21#define LOG_TAG "StreamClient" 22 23namespace OHOS { 24namespace Msdp { 25namespace DeviceStatus { 26 27StreamClient::StreamClient() 28{ 29 CALL_DEBUG_ENTER; 30} 31 32StreamClient::~StreamClient() 33{ 34 CALL_DEBUG_ENTER; 35} 36 37int32_t StreamClient::StartConnect() 38{ 39 CALL_DEBUG_ENTER; 40 if (Socket() < 0) { 41 FI_HILOGE("Socket failed"); 42 return RET_ERR; 43 } 44 OnConnected(); 45 return RET_OK; 46} 47 48bool StreamClient::SendMsg(const char *buf, size_t size) const 49{ 50 CHKPF(buf); 51 if ((size == 0) || (size > MAX_PACKET_BUF_SIZE)) { 52 FI_HILOGE("Stream buffer size out of range"); 53 return false; 54 } 55 if (fd_ < 0) { 56 FI_HILOGE("The fd_ is less than 0"); 57 return false; 58 } 59 60 int32_t retryCount = 0; 61 int32_t idx = 0; 62 const int32_t bufSize = static_cast<int32_t>(size); 63 int32_t remSize = bufSize; 64 while (remSize > 0 && retryCount < SEND_RETRY_LIMIT) { 65 retryCount += 1; 66 ssize_t number = send(fd_, &buf[idx], remSize, MSG_DONTWAIT | MSG_NOSIGNAL); 67 if (number < 0) { 68 if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK) { 69 FI_HILOGW("Continue for errno EAGAIN|EINTR|EWOULDBLOCK, errno:%{public}d", errno); 70 continue; 71 } 72 FI_HILOGE("Send return failed, error:%{public}d, fd:%{public}d", errno, fd_); 73 return false; 74 } 75 idx += number; 76 remSize -= number; 77 if (remSize > 0) { 78 usleep(SEND_RETRY_SLEEP_TIME); 79 } 80 } 81 if (retryCount >= SEND_RETRY_LIMIT || remSize != 0) { 82 FI_HILOGE("Too many times to send :%{public}d/%{public}d, size:%{public}d/%{public}d, fd:%{public}d", 83 retryCount, SEND_RETRY_LIMIT, idx, bufSize, fd_); 84 return false; 85 } 86 return true; 87} 88 89bool StreamClient::SendMsg(const NetPacket &pkt) const 90{ 91 if (pkt.ChkRWError()) { 92 FI_HILOGE("Read and write status is error"); 93 return false; 94 } 95 StreamBuffer buf; 96 if (!pkt.MakeData(buf)) { 97 FI_HILOGE("Failed to buffer packet"); 98 return false; 99 } 100 return SendMsg(buf.Data(), buf.Size()); 101} 102 103bool StreamClient::StartClient(MsgClientFunCallback fun) 104{ 105 CALL_DEBUG_ENTER; 106 if (isRunning_ || hasConnected_) { 107 FI_HILOGE("Client is connected or started"); 108 return false; 109 } 110 hasClient_ = true; 111 recvFun_ = fun; 112 if (StartConnect() < 0) { 113 FI_HILOGW("Client connection failed, Try again later"); 114 } 115 return true; 116} 117 118void StreamClient::Stop() 119{ 120 CALL_DEBUG_ENTER; 121 hasClient_ = false; 122 Close(); 123} 124} // namespace DeviceStatus 125} // namespace Msdp 126} // namespace OHOS