1/* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16#include "stream_socket.h" 17 18#include <cinttypes> 19 20#include "sensor_errors.h" 21 22namespace OHOS { 23namespace Sensors { 24#ifndef OHOS_BUILD_ENABLE_RUST 25#undef LOG_TAG 26#define LOG_TAG "StreamSocket" 27#endif // OHOS_BUILD_ENABLE_RUST 28 29StreamSocket::StreamSocket() {} 30 31StreamSocket::~StreamSocket() 32{ 33#ifdef OHOS_BUILD_ENABLE_RUST 34 StreamSocketClose(streamSocketPtr_.get()); 35#else 36 Close(); 37#endif // OHOS_BUILD_ENABLE_RUST 38} 39 40#ifndef OHOS_BUILD_ENABLE_RUST 41void StreamSocket::OnReadPackets(CircleStreamBuffer &circBuf, StreamSocket::PacketCallBackFun callbackFun) 42{ 43 constexpr size_t headSize = sizeof(PackHead); 44 for (size_t i = 0; i < ONCE_PROCESS_NETPACKET_LIMIT; ++i) { 45 const size_t unreadSize = circBuf.UnreadSize(); 46 if (unreadSize < headSize) { 47 break; 48 } 49 size_t dataSize = unreadSize - headSize; 50 char *buf = const_cast<char *>(circBuf.ReadBuf()); 51 CHKPB(buf); 52 PackHead *head = reinterpret_cast<PackHead *>(buf); 53 CHKPB(head); 54 if (head->size < 0 || head->size > MAX_PACKET_BUF_SIZE) { 55 SEN_HILOGE("Packet header parsing error, and this error cannot be recovered. The buffer will be reset" 56 " head->size:%{public}zu, unreadSize:%{public}zu", head->size, unreadSize); 57 circBuf.Reset(); 58 break; 59 } 60 if (head->size > dataSize) { 61 break; 62 } 63 NetPacket pkt(head->idMsg); 64 if ((head->size > 0) && (!pkt.Write(&buf[headSize], head->size))) { 65 SEN_HILOGW("Error writing data in the NetPacket. It will be retried next time. messageid:%{public}d," 66 "size:%{public}zu", head->idMsg, head->size); 67 break; 68 } 69 if (!circBuf.SeekReadPos(pkt.GetPacketLength())) { 70 SEN_HILOGW("Set read position error, and this error cannot be recovered, and the buffer will be reset" 71 " packetSize:%{public}zu, unreadSize:%{public}zu", pkt.GetPacketLength(), unreadSize); 72 circBuf.Reset(); 73 break; 74 } 75 callbackFun(pkt); 76 if (circBuf.IsEmpty()) { 77 circBuf.Reset(); 78 break; 79 } 80 } 81} 82#endif // OHOS_BUILD_ENABLE_RUST 83 84void StreamSocket::Close() 85{ 86#ifdef OHOS_BUILD_ENABLE_RUST 87 StreamSocketClose(streamSocketPtr_.get()); 88#else 89 if (fd_ >= 0) { 90 auto rf = close(fd_); 91 if (rf != 0) { 92 SEN_HILOGE("Socket close failed, rf:%{public}d", rf); 93 } 94 } 95 fd_ = -1; 96#endif // OHOS_BUILD_ENABLE_RUST 97} 98 99int32_t StreamSocket::GetFd() const 100{ 101#ifdef OHOS_BUILD_ENABLE_RUST 102 return StreamSocketGetFd(streamSocketPtr_.get()); 103#else 104 return fd_; 105#endif // OHOS_BUILD_ENABLE_RUST 106} 107} // namespace Sensors 108} // namespace OHOS