1/*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include "stream_socket.h"
17
18#include <cinttypes>
19
20#include "sensor_errors.h"
21
22namespace OHOS {
23namespace Sensors {
24#ifndef OHOS_BUILD_ENABLE_RUST
25#undef LOG_TAG
26#define LOG_TAG "StreamSocket"
27#endif // OHOS_BUILD_ENABLE_RUST
28
29StreamSocket::StreamSocket() {}
30
31StreamSocket::~StreamSocket()
32{
33#ifdef OHOS_BUILD_ENABLE_RUST
34    StreamSocketClose(streamSocketPtr_.get());
35#else
36    Close();
37#endif // OHOS_BUILD_ENABLE_RUST
38}
39
40#ifndef OHOS_BUILD_ENABLE_RUST
41void StreamSocket::OnReadPackets(CircleStreamBuffer &circBuf, StreamSocket::PacketCallBackFun callbackFun)
42{
43    constexpr size_t headSize = sizeof(PackHead);
44    for (size_t i = 0; i < ONCE_PROCESS_NETPACKET_LIMIT; ++i) {
45        const size_t unreadSize = circBuf.UnreadSize();
46        if (unreadSize < headSize) {
47            break;
48        }
49        size_t dataSize = unreadSize - headSize;
50        char *buf = const_cast<char *>(circBuf.ReadBuf());
51        CHKPB(buf);
52        PackHead *head = reinterpret_cast<PackHead *>(buf);
53        CHKPB(head);
54        if (head->size < 0 || head->size > MAX_PACKET_BUF_SIZE) {
55            SEN_HILOGE("Packet header parsing error, and this error cannot be recovered. The buffer will be reset"
56                " head->size:%{public}zu, unreadSize:%{public}zu", head->size, unreadSize);
57            circBuf.Reset();
58            break;
59        }
60        if (head->size > dataSize) {
61            break;
62        }
63        NetPacket pkt(head->idMsg);
64        if ((head->size > 0) && (!pkt.Write(&buf[headSize], head->size))) {
65            SEN_HILOGW("Error writing data in the NetPacket. It will be retried next time. messageid:%{public}d,"
66                "size:%{public}zu", head->idMsg, head->size);
67            break;
68        }
69        if (!circBuf.SeekReadPos(pkt.GetPacketLength())) {
70            SEN_HILOGW("Set read position error, and this error cannot be recovered, and the buffer will be reset"
71                " packetSize:%{public}zu, unreadSize:%{public}zu", pkt.GetPacketLength(), unreadSize);
72            circBuf.Reset();
73            break;
74        }
75        callbackFun(pkt);
76        if (circBuf.IsEmpty()) {
77            circBuf.Reset();
78            break;
79        }
80    }
81}
82#endif // OHOS_BUILD_ENABLE_RUST
83
84void StreamSocket::Close()
85{
86#ifdef OHOS_BUILD_ENABLE_RUST
87    StreamSocketClose(streamSocketPtr_.get());
88#else
89    if (fd_ >= 0) {
90        auto rf = close(fd_);
91        if (rf != 0) {
92            SEN_HILOGE("Socket close failed, rf:%{public}d", rf);
93        }
94    }
95    fd_ = -1;
96#endif // OHOS_BUILD_ENABLE_RUST
97}
98
99int32_t StreamSocket::GetFd() const
100{
101#ifdef OHOS_BUILD_ENABLE_RUST
102    return StreamSocketGetFd(streamSocketPtr_.get());
103#else
104    return fd_;
105#endif // OHOS_BUILD_ENABLE_RUST
106}
107} // namespace Sensors
108} // namespace OHOS