1bae4d13cSopenharmony_ci/*
2bae4d13cSopenharmony_ci * Copyright (c) 2023 Huawei Device Co., Ltd.
3bae4d13cSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
4bae4d13cSopenharmony_ci * you may not use this file except in compliance with the License.
5bae4d13cSopenharmony_ci * You may obtain a copy of the License at
6bae4d13cSopenharmony_ci *
7bae4d13cSopenharmony_ci *     http://www.apache.org/licenses/LICENSE-2.0
8bae4d13cSopenharmony_ci *
9bae4d13cSopenharmony_ci * Unless required by applicable law or agreed to in writing, software
10bae4d13cSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
11bae4d13cSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12bae4d13cSopenharmony_ci * See the License for the specific language governing permissions and
13bae4d13cSopenharmony_ci * limitations under the License.
14bae4d13cSopenharmony_ci */
15bae4d13cSopenharmony_ci
16bae4d13cSopenharmony_ci#include "stream_socket.h"
17bae4d13cSopenharmony_ci
18bae4d13cSopenharmony_ci#include <cinttypes>
19bae4d13cSopenharmony_ci
20bae4d13cSopenharmony_ci#include "sensor_errors.h"
21bae4d13cSopenharmony_ci
22bae4d13cSopenharmony_cinamespace OHOS {
23bae4d13cSopenharmony_cinamespace Sensors {
24bae4d13cSopenharmony_ci#ifndef OHOS_BUILD_ENABLE_RUST
25bae4d13cSopenharmony_ci#undef LOG_TAG
26bae4d13cSopenharmony_ci#define LOG_TAG "StreamSocket"
27bae4d13cSopenharmony_ci#endif // OHOS_BUILD_ENABLE_RUST
28bae4d13cSopenharmony_ci
29bae4d13cSopenharmony_ciStreamSocket::StreamSocket() {}
30bae4d13cSopenharmony_ci
31bae4d13cSopenharmony_ciStreamSocket::~StreamSocket()
32bae4d13cSopenharmony_ci{
33bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST
34bae4d13cSopenharmony_ci    StreamSocketClose(streamSocketPtr_.get());
35bae4d13cSopenharmony_ci#else
36bae4d13cSopenharmony_ci    Close();
37bae4d13cSopenharmony_ci#endif // OHOS_BUILD_ENABLE_RUST
38bae4d13cSopenharmony_ci}
39bae4d13cSopenharmony_ci
40bae4d13cSopenharmony_ci#ifndef OHOS_BUILD_ENABLE_RUST
41bae4d13cSopenharmony_civoid StreamSocket::OnReadPackets(CircleStreamBuffer &circBuf, StreamSocket::PacketCallBackFun callbackFun)
42bae4d13cSopenharmony_ci{
43bae4d13cSopenharmony_ci    constexpr size_t headSize = sizeof(PackHead);
44bae4d13cSopenharmony_ci    for (size_t i = 0; i < ONCE_PROCESS_NETPACKET_LIMIT; ++i) {
45bae4d13cSopenharmony_ci        const size_t unreadSize = circBuf.UnreadSize();
46bae4d13cSopenharmony_ci        if (unreadSize < headSize) {
47bae4d13cSopenharmony_ci            break;
48bae4d13cSopenharmony_ci        }
49bae4d13cSopenharmony_ci        size_t dataSize = unreadSize - headSize;
50bae4d13cSopenharmony_ci        char *buf = const_cast<char *>(circBuf.ReadBuf());
51bae4d13cSopenharmony_ci        CHKPB(buf);
52bae4d13cSopenharmony_ci        PackHead *head = reinterpret_cast<PackHead *>(buf);
53bae4d13cSopenharmony_ci        CHKPB(head);
54bae4d13cSopenharmony_ci        if (head->size < 0 || head->size > MAX_PACKET_BUF_SIZE) {
55bae4d13cSopenharmony_ci            SEN_HILOGE("Packet header parsing error, and this error cannot be recovered. The buffer will be reset"
56bae4d13cSopenharmony_ci                " head->size:%{public}zu, unreadSize:%{public}zu", head->size, unreadSize);
57bae4d13cSopenharmony_ci            circBuf.Reset();
58bae4d13cSopenharmony_ci            break;
59bae4d13cSopenharmony_ci        }
60bae4d13cSopenharmony_ci        if (head->size > dataSize) {
61bae4d13cSopenharmony_ci            break;
62bae4d13cSopenharmony_ci        }
63bae4d13cSopenharmony_ci        NetPacket pkt(head->idMsg);
64bae4d13cSopenharmony_ci        if ((head->size > 0) && (!pkt.Write(&buf[headSize], head->size))) {
65bae4d13cSopenharmony_ci            SEN_HILOGW("Error writing data in the NetPacket. It will be retried next time. messageid:%{public}d,"
66bae4d13cSopenharmony_ci                "size:%{public}zu", head->idMsg, head->size);
67bae4d13cSopenharmony_ci            break;
68bae4d13cSopenharmony_ci        }
69bae4d13cSopenharmony_ci        if (!circBuf.SeekReadPos(pkt.GetPacketLength())) {
70bae4d13cSopenharmony_ci            SEN_HILOGW("Set read position error, and this error cannot be recovered, and the buffer will be reset"
71bae4d13cSopenharmony_ci                " packetSize:%{public}zu, unreadSize:%{public}zu", pkt.GetPacketLength(), unreadSize);
72bae4d13cSopenharmony_ci            circBuf.Reset();
73bae4d13cSopenharmony_ci            break;
74bae4d13cSopenharmony_ci        }
75bae4d13cSopenharmony_ci        callbackFun(pkt);
76bae4d13cSopenharmony_ci        if (circBuf.IsEmpty()) {
77bae4d13cSopenharmony_ci            circBuf.Reset();
78bae4d13cSopenharmony_ci            break;
79bae4d13cSopenharmony_ci        }
80bae4d13cSopenharmony_ci    }
81bae4d13cSopenharmony_ci}
82bae4d13cSopenharmony_ci#endif // OHOS_BUILD_ENABLE_RUST
83bae4d13cSopenharmony_ci
84bae4d13cSopenharmony_civoid StreamSocket::Close()
85bae4d13cSopenharmony_ci{
86bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST
87bae4d13cSopenharmony_ci    StreamSocketClose(streamSocketPtr_.get());
88bae4d13cSopenharmony_ci#else
89bae4d13cSopenharmony_ci    if (fd_ >= 0) {
90bae4d13cSopenharmony_ci        auto rf = close(fd_);
91bae4d13cSopenharmony_ci        if (rf != 0) {
92bae4d13cSopenharmony_ci            SEN_HILOGE("Socket close failed, rf:%{public}d", rf);
93bae4d13cSopenharmony_ci        }
94bae4d13cSopenharmony_ci    }
95bae4d13cSopenharmony_ci    fd_ = -1;
96bae4d13cSopenharmony_ci#endif // OHOS_BUILD_ENABLE_RUST
97bae4d13cSopenharmony_ci}
98bae4d13cSopenharmony_ci
99bae4d13cSopenharmony_ciint32_t StreamSocket::GetFd() const
100bae4d13cSopenharmony_ci{
101bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST
102bae4d13cSopenharmony_ci    return StreamSocketGetFd(streamSocketPtr_.get());
103bae4d13cSopenharmony_ci#else
104bae4d13cSopenharmony_ci    return fd_;
105bae4d13cSopenharmony_ci#endif // OHOS_BUILD_ENABLE_RUST
106bae4d13cSopenharmony_ci}
107bae4d13cSopenharmony_ci} // namespace Sensors
108bae4d13cSopenharmony_ci} // namespace OHOS