1/*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include "stream_session.h"
17
18#include <cinttypes>
19#include <sstream>
20
21#include "proto.h"
22#include "sensor_errors.h"
23#include "stream_socket.h"
24
25#undef LOG_TAG
26#define LOG_TAG "StreamSession"
27
28namespace OHOS {
29namespace Sensors {
30
31StreamSession::StreamSession(const std::string &programName, const int32_t fd, const int32_t uid, const int32_t pid)
32    : programName_(programName)
33#ifdef OHOS_BUILD_ENABLE_RUST
34{
35    StreamSessionSetFd(streamSessionPtr_.get(), fd);
36    StreamSessionSetUid(streamSessionPtr_.get(), uid);
37    StreamSessionSetPid(streamSessionPtr_.get(), pid);
38    UpdateDescript();
39}
40#else
41,
42      fd_(fd),
43      uid_(uid),
44      pid_(pid)
45{
46    UpdateDescript();
47}
48#endif // OHOS_BUILD_ENABLE_RUST
49
50
51bool StreamSession::SendMsg(const char *buf, size_t size) const
52{
53#ifdef OHOS_BUILD_ENABLE_RUST
54    return StreamSessionSendMsg(streamSessionPtr_.get(), buf, size);
55#else
56    CHKPF(buf);
57    if ((size == 0) || (size > MAX_PACKET_BUF_SIZE)) {
58        SEN_HILOGE("buf size:%{public}zu", size);
59        return false;
60    }
61    if (fd_ < 0) {
62        SEN_HILOGE("The fd_ is less than 0");
63        return false;
64    }
65    size_t idx = 0;
66    size_t retryCount = 0;
67    size_t remSize = size;
68    while (remSize > 0 && retryCount < SEND_RETRY_LIMIT) {
69        ++retryCount;
70        auto count = send(fd_, &buf[idx], remSize, MSG_DONTWAIT | MSG_NOSIGNAL);
71        if (count < 0) {
72            if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK) {
73#ifdef OHOS_BUILD_ENABLE_RUST
74                sleep(Duration::from_micros(SEND_RETRY_SLEEP_TIME));
75#else
76                usleep(SEND_RETRY_SLEEP_TIME);
77#endif
78                SEN_HILOGW("Continue for errno EAGAIN|EINTR|EWOULDBLOCK, errno:%{public}d", errno);
79                continue;
80            }
81            SEN_HILOGE("Send return failed, error:%{public}d, fd:%{public}d", errno, fd_);
82            return false;
83        }
84        idx += static_cast<size_t>(count);
85        remSize -= static_cast<size_t>(count);
86        if (remSize > 0) {
87#ifdef OHOS_BUILD_ENABLE_RUST
88            sleep(Duration::from_micros(SEND_RETRY_SLEEP_TIME));
89#else
90            usleep(SEND_RETRY_SLEEP_TIME);
91#endif
92        }
93    }
94    if (retryCount >= SEND_RETRY_LIMIT || remSize != 0) {
95        SEN_HILOGE("Send too many times:%{public}zu/%{public}zu, size:%{public}zu/%{public}zu, fd:%{public}d",
96            retryCount, SEND_RETRY_LIMIT, idx, size, fd_);
97        return false;
98    }
99    return true;
100#endif // OHOS_BUILD_ENABLE_RUST
101}
102
103void StreamSession::Close()
104{
105#ifdef OHOS_BUILD_ENABLE_RUST
106    StreamSessionClose(streamSessionPtr_.get());
107    UpdateDescript();
108#else
109    if (fd_ >= 0) {
110        close(fd_);
111        fd_ = -1;
112        UpdateDescript();
113    }
114#endif // OHOS_BUILD_ENABLE_RUST
115}
116
117void StreamSession::UpdateDescript()
118{
119#ifdef OHOS_BUILD_ENABLE_RUST
120    std::ostringstream oss;
121    oss << "fd = " << StreamSessionGetFd(streamSessionPtr_.get())
122        << ", programName = " << programName_
123        << ", moduleType = " << StreamSessionGetModuleType(streamSessionPtr_.get())
124        << ((StreamSessionGetFd(streamSessionPtr_.get()) < 0) ? ", closed" : ", opened")
125        << ", uid = " << StreamSessionGetUid(streamSessionPtr_.get())
126        << ", pid = " << StreamSessionGetPid(streamSessionPtr_.get())
127        << ", tokenType = " << StreamSessionGetTokenType(streamSessionPtr_.get())
128        << std::endl;
129    descript_ = oss.str().c_str();
130#else
131    std::ostringstream oss;
132    oss << "fd = " << fd_
133        << ", programName = " << programName_
134        << ((fd_ < 0) ? ", closed" : ", opened")
135        << ", uid = " << uid_
136        << ", pid = " << pid_
137        << ", tokenType = " << tokenType_
138        << std::endl;
139    descript_ = oss.str().c_str();
140#endif // OHOS_BUILD_ENABLE_RUST
141}
142
143bool StreamSession::SendMsg(const NetPacket &pkt) const
144{
145#ifdef OHOS_BUILD_ENABLE_RUST
146    if (StreamBufferChkRWError(pkt.streamBufferPtr_.get())) {
147        SEN_HILOGE("Read and write status is error");
148        return false;
149    }
150    StreamBuffer buf;
151    pkt.MakeData(buf);
152    return SendMsg(StreamBufferData(buf.streamBufferPtr_.get()), StreamBufferSize(buf.streamBufferPtr_.get()));
153#else
154    if (pkt.ChkRWError()) {
155        SEN_HILOGE("Read and write status failed");
156        return false;
157    }
158    StreamBuffer buf;
159    pkt.MakeData(buf);
160    return SendMsg(buf.Data(), buf.Size());
161#endif // OHOS_BUILD_ENABLE_RUST
162}
163
164int32_t StreamSession::GetUid() const
165{
166#ifdef OHOS_BUILD_ENABLE_RUST
167    return StreamSessionGetUid(streamSessionPtr_.get());
168#else
169    return uid_;
170#endif // OHOS_BUILD_ENABLE_RUST
171}
172
173int32_t StreamSession::GetPid() const
174{
175#ifdef OHOS_BUILD_ENABLE_RUST
176    return StreamSessionGetPid(streamSessionPtr_.get());
177#else
178    return pid_;
179#endif // OHOS_BUILD_ENABLE_RUST
180}
181
182SessionPtr StreamSession::GetSharedPtr()
183{
184    return shared_from_this();
185}
186
187int32_t StreamSession::GetFd() const
188{
189#ifdef OHOS_BUILD_ENABLE_RUST
190    return StreamSessionGetFd(streamSessionPtr_.get());
191#else
192    return fd_;
193#endif // OHOS_BUILD_ENABLE_RUST
194}
195
196const std::string &StreamSession::GetDescript() const
197{
198    return descript_;
199}
200
201const std::string StreamSession::GetProgramName() const
202{
203    return programName_;
204}
205
206void StreamSession::SetTokenType(int32_t type)
207{
208#ifdef OHOS_BUILD_ENABLE_RUST
209    StreamSessionSetTokenType(streamSessionPtr_.get(), type);
210#else
211    tokenType_ = type;
212#endif // OHOS_BUILD_ENABLE_RUST
213}
214
215int32_t StreamSession::GetTokenType() const
216{
217#ifdef OHOS_BUILD_ENABLE_RUST
218    return StreamSessionGetTokenType(streamSessionPtr_.get());
219#else
220    return tokenType_;
221#endif // OHOS_BUILD_ENABLE_RUST
222}
223} // namespace Sensors
224} // namespace OHOS