1bae4d13cSopenharmony_ci/*
2bae4d13cSopenharmony_ci * Copyright (c) 2023 Huawei Device Co., Ltd.
3bae4d13cSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
4bae4d13cSopenharmony_ci * you may not use this file except in compliance with the License.
5bae4d13cSopenharmony_ci * You may obtain a copy of the License at
6bae4d13cSopenharmony_ci *
7bae4d13cSopenharmony_ci *     http://www.apache.org/licenses/LICENSE-2.0
8bae4d13cSopenharmony_ci *
9bae4d13cSopenharmony_ci * Unless required by applicable law or agreed to in writing, software
10bae4d13cSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
11bae4d13cSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12bae4d13cSopenharmony_ci * See the License for the specific language governing permissions and
13bae4d13cSopenharmony_ci * limitations under the License.
14bae4d13cSopenharmony_ci */
15bae4d13cSopenharmony_ci
16bae4d13cSopenharmony_ci#include "stream_session.h"
17bae4d13cSopenharmony_ci
18bae4d13cSopenharmony_ci#include <cinttypes>
19bae4d13cSopenharmony_ci#include <sstream>
20bae4d13cSopenharmony_ci
21bae4d13cSopenharmony_ci#include "proto.h"
22bae4d13cSopenharmony_ci#include "sensor_errors.h"
23bae4d13cSopenharmony_ci#include "stream_socket.h"
24bae4d13cSopenharmony_ci
25bae4d13cSopenharmony_ci#undef LOG_TAG
26bae4d13cSopenharmony_ci#define LOG_TAG "StreamSession"
27bae4d13cSopenharmony_ci
28bae4d13cSopenharmony_cinamespace OHOS {
29bae4d13cSopenharmony_cinamespace Sensors {
30bae4d13cSopenharmony_ci
31bae4d13cSopenharmony_ciStreamSession::StreamSession(const std::string &programName, const int32_t fd, const int32_t uid, const int32_t pid)
32bae4d13cSopenharmony_ci    : programName_(programName)
33bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST
34bae4d13cSopenharmony_ci{
35bae4d13cSopenharmony_ci    StreamSessionSetFd(streamSessionPtr_.get(), fd);
36bae4d13cSopenharmony_ci    StreamSessionSetUid(streamSessionPtr_.get(), uid);
37bae4d13cSopenharmony_ci    StreamSessionSetPid(streamSessionPtr_.get(), pid);
38bae4d13cSopenharmony_ci    UpdateDescript();
39bae4d13cSopenharmony_ci}
40bae4d13cSopenharmony_ci#else
41bae4d13cSopenharmony_ci,
42bae4d13cSopenharmony_ci      fd_(fd),
43bae4d13cSopenharmony_ci      uid_(uid),
44bae4d13cSopenharmony_ci      pid_(pid)
45bae4d13cSopenharmony_ci{
46bae4d13cSopenharmony_ci    UpdateDescript();
47bae4d13cSopenharmony_ci}
48bae4d13cSopenharmony_ci#endif // OHOS_BUILD_ENABLE_RUST
49bae4d13cSopenharmony_ci
50bae4d13cSopenharmony_ci
51bae4d13cSopenharmony_cibool StreamSession::SendMsg(const char *buf, size_t size) const
52bae4d13cSopenharmony_ci{
53bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST
54bae4d13cSopenharmony_ci    return StreamSessionSendMsg(streamSessionPtr_.get(), buf, size);
55bae4d13cSopenharmony_ci#else
56bae4d13cSopenharmony_ci    CHKPF(buf);
57bae4d13cSopenharmony_ci    if ((size == 0) || (size > MAX_PACKET_BUF_SIZE)) {
58bae4d13cSopenharmony_ci        SEN_HILOGE("buf size:%{public}zu", size);
59bae4d13cSopenharmony_ci        return false;
60bae4d13cSopenharmony_ci    }
61bae4d13cSopenharmony_ci    if (fd_ < 0) {
62bae4d13cSopenharmony_ci        SEN_HILOGE("The fd_ is less than 0");
63bae4d13cSopenharmony_ci        return false;
64bae4d13cSopenharmony_ci    }
65bae4d13cSopenharmony_ci    size_t idx = 0;
66bae4d13cSopenharmony_ci    size_t retryCount = 0;
67bae4d13cSopenharmony_ci    size_t remSize = size;
68bae4d13cSopenharmony_ci    while (remSize > 0 && retryCount < SEND_RETRY_LIMIT) {
69bae4d13cSopenharmony_ci        ++retryCount;
70bae4d13cSopenharmony_ci        auto count = send(fd_, &buf[idx], remSize, MSG_DONTWAIT | MSG_NOSIGNAL);
71bae4d13cSopenharmony_ci        if (count < 0) {
72bae4d13cSopenharmony_ci            if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK) {
73bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST
74bae4d13cSopenharmony_ci                sleep(Duration::from_micros(SEND_RETRY_SLEEP_TIME));
75bae4d13cSopenharmony_ci#else
76bae4d13cSopenharmony_ci                usleep(SEND_RETRY_SLEEP_TIME);
77bae4d13cSopenharmony_ci#endif
78bae4d13cSopenharmony_ci                SEN_HILOGW("Continue for errno EAGAIN|EINTR|EWOULDBLOCK, errno:%{public}d", errno);
79bae4d13cSopenharmony_ci                continue;
80bae4d13cSopenharmony_ci            }
81bae4d13cSopenharmony_ci            SEN_HILOGE("Send return failed, error:%{public}d, fd:%{public}d", errno, fd_);
82bae4d13cSopenharmony_ci            return false;
83bae4d13cSopenharmony_ci        }
84bae4d13cSopenharmony_ci        idx += static_cast<size_t>(count);
85bae4d13cSopenharmony_ci        remSize -= static_cast<size_t>(count);
86bae4d13cSopenharmony_ci        if (remSize > 0) {
87bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST
88bae4d13cSopenharmony_ci            sleep(Duration::from_micros(SEND_RETRY_SLEEP_TIME));
89bae4d13cSopenharmony_ci#else
90bae4d13cSopenharmony_ci            usleep(SEND_RETRY_SLEEP_TIME);
91bae4d13cSopenharmony_ci#endif
92bae4d13cSopenharmony_ci        }
93bae4d13cSopenharmony_ci    }
94bae4d13cSopenharmony_ci    if (retryCount >= SEND_RETRY_LIMIT || remSize != 0) {
95bae4d13cSopenharmony_ci        SEN_HILOGE("Send too many times:%{public}zu/%{public}zu, size:%{public}zu/%{public}zu, fd:%{public}d",
96bae4d13cSopenharmony_ci            retryCount, SEND_RETRY_LIMIT, idx, size, fd_);
97bae4d13cSopenharmony_ci        return false;
98bae4d13cSopenharmony_ci    }
99bae4d13cSopenharmony_ci    return true;
100bae4d13cSopenharmony_ci#endif // OHOS_BUILD_ENABLE_RUST
101bae4d13cSopenharmony_ci}
102bae4d13cSopenharmony_ci
103bae4d13cSopenharmony_civoid StreamSession::Close()
104bae4d13cSopenharmony_ci{
105bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST
106bae4d13cSopenharmony_ci    StreamSessionClose(streamSessionPtr_.get());
107bae4d13cSopenharmony_ci    UpdateDescript();
108bae4d13cSopenharmony_ci#else
109bae4d13cSopenharmony_ci    if (fd_ >= 0) {
110bae4d13cSopenharmony_ci        close(fd_);
111bae4d13cSopenharmony_ci        fd_ = -1;
112bae4d13cSopenharmony_ci        UpdateDescript();
113bae4d13cSopenharmony_ci    }
114bae4d13cSopenharmony_ci#endif // OHOS_BUILD_ENABLE_RUST
115bae4d13cSopenharmony_ci}
116bae4d13cSopenharmony_ci
117bae4d13cSopenharmony_civoid StreamSession::UpdateDescript()
118bae4d13cSopenharmony_ci{
119bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST
120bae4d13cSopenharmony_ci    std::ostringstream oss;
121bae4d13cSopenharmony_ci    oss << "fd = " << StreamSessionGetFd(streamSessionPtr_.get())
122bae4d13cSopenharmony_ci        << ", programName = " << programName_
123bae4d13cSopenharmony_ci        << ", moduleType = " << StreamSessionGetModuleType(streamSessionPtr_.get())
124bae4d13cSopenharmony_ci        << ((StreamSessionGetFd(streamSessionPtr_.get()) < 0) ? ", closed" : ", opened")
125bae4d13cSopenharmony_ci        << ", uid = " << StreamSessionGetUid(streamSessionPtr_.get())
126bae4d13cSopenharmony_ci        << ", pid = " << StreamSessionGetPid(streamSessionPtr_.get())
127bae4d13cSopenharmony_ci        << ", tokenType = " << StreamSessionGetTokenType(streamSessionPtr_.get())
128bae4d13cSopenharmony_ci        << std::endl;
129bae4d13cSopenharmony_ci    descript_ = oss.str().c_str();
130bae4d13cSopenharmony_ci#else
131bae4d13cSopenharmony_ci    std::ostringstream oss;
132bae4d13cSopenharmony_ci    oss << "fd = " << fd_
133bae4d13cSopenharmony_ci        << ", programName = " << programName_
134bae4d13cSopenharmony_ci        << ((fd_ < 0) ? ", closed" : ", opened")
135bae4d13cSopenharmony_ci        << ", uid = " << uid_
136bae4d13cSopenharmony_ci        << ", pid = " << pid_
137bae4d13cSopenharmony_ci        << ", tokenType = " << tokenType_
138bae4d13cSopenharmony_ci        << std::endl;
139bae4d13cSopenharmony_ci    descript_ = oss.str().c_str();
140bae4d13cSopenharmony_ci#endif // OHOS_BUILD_ENABLE_RUST
141bae4d13cSopenharmony_ci}
142bae4d13cSopenharmony_ci
143bae4d13cSopenharmony_cibool StreamSession::SendMsg(const NetPacket &pkt) const
144bae4d13cSopenharmony_ci{
145bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST
146bae4d13cSopenharmony_ci    if (StreamBufferChkRWError(pkt.streamBufferPtr_.get())) {
147bae4d13cSopenharmony_ci        SEN_HILOGE("Read and write status is error");
148bae4d13cSopenharmony_ci        return false;
149bae4d13cSopenharmony_ci    }
150bae4d13cSopenharmony_ci    StreamBuffer buf;
151bae4d13cSopenharmony_ci    pkt.MakeData(buf);
152bae4d13cSopenharmony_ci    return SendMsg(StreamBufferData(buf.streamBufferPtr_.get()), StreamBufferSize(buf.streamBufferPtr_.get()));
153bae4d13cSopenharmony_ci#else
154bae4d13cSopenharmony_ci    if (pkt.ChkRWError()) {
155bae4d13cSopenharmony_ci        SEN_HILOGE("Read and write status failed");
156bae4d13cSopenharmony_ci        return false;
157bae4d13cSopenharmony_ci    }
158bae4d13cSopenharmony_ci    StreamBuffer buf;
159bae4d13cSopenharmony_ci    pkt.MakeData(buf);
160bae4d13cSopenharmony_ci    return SendMsg(buf.Data(), buf.Size());
161bae4d13cSopenharmony_ci#endif // OHOS_BUILD_ENABLE_RUST
162bae4d13cSopenharmony_ci}
163bae4d13cSopenharmony_ci
164bae4d13cSopenharmony_ciint32_t StreamSession::GetUid() const
165bae4d13cSopenharmony_ci{
166bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST
167bae4d13cSopenharmony_ci    return StreamSessionGetUid(streamSessionPtr_.get());
168bae4d13cSopenharmony_ci#else
169bae4d13cSopenharmony_ci    return uid_;
170bae4d13cSopenharmony_ci#endif // OHOS_BUILD_ENABLE_RUST
171bae4d13cSopenharmony_ci}
172bae4d13cSopenharmony_ci
173bae4d13cSopenharmony_ciint32_t StreamSession::GetPid() const
174bae4d13cSopenharmony_ci{
175bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST
176bae4d13cSopenharmony_ci    return StreamSessionGetPid(streamSessionPtr_.get());
177bae4d13cSopenharmony_ci#else
178bae4d13cSopenharmony_ci    return pid_;
179bae4d13cSopenharmony_ci#endif // OHOS_BUILD_ENABLE_RUST
180bae4d13cSopenharmony_ci}
181bae4d13cSopenharmony_ci
182bae4d13cSopenharmony_ciSessionPtr StreamSession::GetSharedPtr()
183bae4d13cSopenharmony_ci{
184bae4d13cSopenharmony_ci    return shared_from_this();
185bae4d13cSopenharmony_ci}
186bae4d13cSopenharmony_ci
187bae4d13cSopenharmony_ciint32_t StreamSession::GetFd() const
188bae4d13cSopenharmony_ci{
189bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST
190bae4d13cSopenharmony_ci    return StreamSessionGetFd(streamSessionPtr_.get());
191bae4d13cSopenharmony_ci#else
192bae4d13cSopenharmony_ci    return fd_;
193bae4d13cSopenharmony_ci#endif // OHOS_BUILD_ENABLE_RUST
194bae4d13cSopenharmony_ci}
195bae4d13cSopenharmony_ci
196bae4d13cSopenharmony_ciconst std::string &StreamSession::GetDescript() const
197bae4d13cSopenharmony_ci{
198bae4d13cSopenharmony_ci    return descript_;
199bae4d13cSopenharmony_ci}
200bae4d13cSopenharmony_ci
201bae4d13cSopenharmony_ciconst std::string StreamSession::GetProgramName() const
202bae4d13cSopenharmony_ci{
203bae4d13cSopenharmony_ci    return programName_;
204bae4d13cSopenharmony_ci}
205bae4d13cSopenharmony_ci
206bae4d13cSopenharmony_civoid StreamSession::SetTokenType(int32_t type)
207bae4d13cSopenharmony_ci{
208bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST
209bae4d13cSopenharmony_ci    StreamSessionSetTokenType(streamSessionPtr_.get(), type);
210bae4d13cSopenharmony_ci#else
211bae4d13cSopenharmony_ci    tokenType_ = type;
212bae4d13cSopenharmony_ci#endif // OHOS_BUILD_ENABLE_RUST
213bae4d13cSopenharmony_ci}
214bae4d13cSopenharmony_ci
215bae4d13cSopenharmony_ciint32_t StreamSession::GetTokenType() const
216bae4d13cSopenharmony_ci{
217bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST
218bae4d13cSopenharmony_ci    return StreamSessionGetTokenType(streamSessionPtr_.get());
219bae4d13cSopenharmony_ci#else
220bae4d13cSopenharmony_ci    return tokenType_;
221bae4d13cSopenharmony_ci#endif // OHOS_BUILD_ENABLE_RUST
222bae4d13cSopenharmony_ci}
223bae4d13cSopenharmony_ci} // namespace Sensors
224bae4d13cSopenharmony_ci} // namespace OHOS