1bae4d13cSopenharmony_ci/* 2bae4d13cSopenharmony_ci * Copyright (c) 2023 Huawei Device Co., Ltd. 3bae4d13cSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 4bae4d13cSopenharmony_ci * you may not use this file except in compliance with the License. 5bae4d13cSopenharmony_ci * You may obtain a copy of the License at 6bae4d13cSopenharmony_ci * 7bae4d13cSopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 8bae4d13cSopenharmony_ci * 9bae4d13cSopenharmony_ci * Unless required by applicable law or agreed to in writing, software 10bae4d13cSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 11bae4d13cSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12bae4d13cSopenharmony_ci * See the License for the specific language governing permissions and 13bae4d13cSopenharmony_ci * limitations under the License. 14bae4d13cSopenharmony_ci */ 15bae4d13cSopenharmony_ci 16bae4d13cSopenharmony_ci#include "stream_session.h" 17bae4d13cSopenharmony_ci 18bae4d13cSopenharmony_ci#include <cinttypes> 19bae4d13cSopenharmony_ci#include <sstream> 20bae4d13cSopenharmony_ci 21bae4d13cSopenharmony_ci#include "proto.h" 22bae4d13cSopenharmony_ci#include "sensor_errors.h" 23bae4d13cSopenharmony_ci#include "stream_socket.h" 24bae4d13cSopenharmony_ci 25bae4d13cSopenharmony_ci#undef LOG_TAG 26bae4d13cSopenharmony_ci#define LOG_TAG "StreamSession" 27bae4d13cSopenharmony_ci 28bae4d13cSopenharmony_cinamespace OHOS { 29bae4d13cSopenharmony_cinamespace Sensors { 30bae4d13cSopenharmony_ci 31bae4d13cSopenharmony_ciStreamSession::StreamSession(const std::string &programName, const int32_t fd, const int32_t uid, const int32_t pid) 32bae4d13cSopenharmony_ci : programName_(programName) 33bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST 34bae4d13cSopenharmony_ci{ 35bae4d13cSopenharmony_ci StreamSessionSetFd(streamSessionPtr_.get(), fd); 36bae4d13cSopenharmony_ci StreamSessionSetUid(streamSessionPtr_.get(), uid); 37bae4d13cSopenharmony_ci StreamSessionSetPid(streamSessionPtr_.get(), pid); 38bae4d13cSopenharmony_ci UpdateDescript(); 39bae4d13cSopenharmony_ci} 40bae4d13cSopenharmony_ci#else 41bae4d13cSopenharmony_ci, 42bae4d13cSopenharmony_ci fd_(fd), 43bae4d13cSopenharmony_ci uid_(uid), 44bae4d13cSopenharmony_ci pid_(pid) 45bae4d13cSopenharmony_ci{ 46bae4d13cSopenharmony_ci UpdateDescript(); 47bae4d13cSopenharmony_ci} 48bae4d13cSopenharmony_ci#endif // OHOS_BUILD_ENABLE_RUST 49bae4d13cSopenharmony_ci 50bae4d13cSopenharmony_ci 51bae4d13cSopenharmony_cibool StreamSession::SendMsg(const char *buf, size_t size) const 52bae4d13cSopenharmony_ci{ 53bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST 54bae4d13cSopenharmony_ci return StreamSessionSendMsg(streamSessionPtr_.get(), buf, size); 55bae4d13cSopenharmony_ci#else 56bae4d13cSopenharmony_ci CHKPF(buf); 57bae4d13cSopenharmony_ci if ((size == 0) || (size > MAX_PACKET_BUF_SIZE)) { 58bae4d13cSopenharmony_ci SEN_HILOGE("buf size:%{public}zu", size); 59bae4d13cSopenharmony_ci return false; 60bae4d13cSopenharmony_ci } 61bae4d13cSopenharmony_ci if (fd_ < 0) { 62bae4d13cSopenharmony_ci SEN_HILOGE("The fd_ is less than 0"); 63bae4d13cSopenharmony_ci return false; 64bae4d13cSopenharmony_ci } 65bae4d13cSopenharmony_ci size_t idx = 0; 66bae4d13cSopenharmony_ci size_t retryCount = 0; 67bae4d13cSopenharmony_ci size_t remSize = size; 68bae4d13cSopenharmony_ci while (remSize > 0 && retryCount < SEND_RETRY_LIMIT) { 69bae4d13cSopenharmony_ci ++retryCount; 70bae4d13cSopenharmony_ci auto count = send(fd_, &buf[idx], remSize, MSG_DONTWAIT | MSG_NOSIGNAL); 71bae4d13cSopenharmony_ci if (count < 0) { 72bae4d13cSopenharmony_ci if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK) { 73bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST 74bae4d13cSopenharmony_ci sleep(Duration::from_micros(SEND_RETRY_SLEEP_TIME)); 75bae4d13cSopenharmony_ci#else 76bae4d13cSopenharmony_ci usleep(SEND_RETRY_SLEEP_TIME); 77bae4d13cSopenharmony_ci#endif 78bae4d13cSopenharmony_ci SEN_HILOGW("Continue for errno EAGAIN|EINTR|EWOULDBLOCK, errno:%{public}d", errno); 79bae4d13cSopenharmony_ci continue; 80bae4d13cSopenharmony_ci } 81bae4d13cSopenharmony_ci SEN_HILOGE("Send return failed, error:%{public}d, fd:%{public}d", errno, fd_); 82bae4d13cSopenharmony_ci return false; 83bae4d13cSopenharmony_ci } 84bae4d13cSopenharmony_ci idx += static_cast<size_t>(count); 85bae4d13cSopenharmony_ci remSize -= static_cast<size_t>(count); 86bae4d13cSopenharmony_ci if (remSize > 0) { 87bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST 88bae4d13cSopenharmony_ci sleep(Duration::from_micros(SEND_RETRY_SLEEP_TIME)); 89bae4d13cSopenharmony_ci#else 90bae4d13cSopenharmony_ci usleep(SEND_RETRY_SLEEP_TIME); 91bae4d13cSopenharmony_ci#endif 92bae4d13cSopenharmony_ci } 93bae4d13cSopenharmony_ci } 94bae4d13cSopenharmony_ci if (retryCount >= SEND_RETRY_LIMIT || remSize != 0) { 95bae4d13cSopenharmony_ci SEN_HILOGE("Send too many times:%{public}zu/%{public}zu, size:%{public}zu/%{public}zu, fd:%{public}d", 96bae4d13cSopenharmony_ci retryCount, SEND_RETRY_LIMIT, idx, size, fd_); 97bae4d13cSopenharmony_ci return false; 98bae4d13cSopenharmony_ci } 99bae4d13cSopenharmony_ci return true; 100bae4d13cSopenharmony_ci#endif // OHOS_BUILD_ENABLE_RUST 101bae4d13cSopenharmony_ci} 102bae4d13cSopenharmony_ci 103bae4d13cSopenharmony_civoid StreamSession::Close() 104bae4d13cSopenharmony_ci{ 105bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST 106bae4d13cSopenharmony_ci StreamSessionClose(streamSessionPtr_.get()); 107bae4d13cSopenharmony_ci UpdateDescript(); 108bae4d13cSopenharmony_ci#else 109bae4d13cSopenharmony_ci if (fd_ >= 0) { 110bae4d13cSopenharmony_ci close(fd_); 111bae4d13cSopenharmony_ci fd_ = -1; 112bae4d13cSopenharmony_ci UpdateDescript(); 113bae4d13cSopenharmony_ci } 114bae4d13cSopenharmony_ci#endif // OHOS_BUILD_ENABLE_RUST 115bae4d13cSopenharmony_ci} 116bae4d13cSopenharmony_ci 117bae4d13cSopenharmony_civoid StreamSession::UpdateDescript() 118bae4d13cSopenharmony_ci{ 119bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST 120bae4d13cSopenharmony_ci std::ostringstream oss; 121bae4d13cSopenharmony_ci oss << "fd = " << StreamSessionGetFd(streamSessionPtr_.get()) 122bae4d13cSopenharmony_ci << ", programName = " << programName_ 123bae4d13cSopenharmony_ci << ", moduleType = " << StreamSessionGetModuleType(streamSessionPtr_.get()) 124bae4d13cSopenharmony_ci << ((StreamSessionGetFd(streamSessionPtr_.get()) < 0) ? ", closed" : ", opened") 125bae4d13cSopenharmony_ci << ", uid = " << StreamSessionGetUid(streamSessionPtr_.get()) 126bae4d13cSopenharmony_ci << ", pid = " << StreamSessionGetPid(streamSessionPtr_.get()) 127bae4d13cSopenharmony_ci << ", tokenType = " << StreamSessionGetTokenType(streamSessionPtr_.get()) 128bae4d13cSopenharmony_ci << std::endl; 129bae4d13cSopenharmony_ci descript_ = oss.str().c_str(); 130bae4d13cSopenharmony_ci#else 131bae4d13cSopenharmony_ci std::ostringstream oss; 132bae4d13cSopenharmony_ci oss << "fd = " << fd_ 133bae4d13cSopenharmony_ci << ", programName = " << programName_ 134bae4d13cSopenharmony_ci << ((fd_ < 0) ? ", closed" : ", opened") 135bae4d13cSopenharmony_ci << ", uid = " << uid_ 136bae4d13cSopenharmony_ci << ", pid = " << pid_ 137bae4d13cSopenharmony_ci << ", tokenType = " << tokenType_ 138bae4d13cSopenharmony_ci << std::endl; 139bae4d13cSopenharmony_ci descript_ = oss.str().c_str(); 140bae4d13cSopenharmony_ci#endif // OHOS_BUILD_ENABLE_RUST 141bae4d13cSopenharmony_ci} 142bae4d13cSopenharmony_ci 143bae4d13cSopenharmony_cibool StreamSession::SendMsg(const NetPacket &pkt) const 144bae4d13cSopenharmony_ci{ 145bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST 146bae4d13cSopenharmony_ci if (StreamBufferChkRWError(pkt.streamBufferPtr_.get())) { 147bae4d13cSopenharmony_ci SEN_HILOGE("Read and write status is error"); 148bae4d13cSopenharmony_ci return false; 149bae4d13cSopenharmony_ci } 150bae4d13cSopenharmony_ci StreamBuffer buf; 151bae4d13cSopenharmony_ci pkt.MakeData(buf); 152bae4d13cSopenharmony_ci return SendMsg(StreamBufferData(buf.streamBufferPtr_.get()), StreamBufferSize(buf.streamBufferPtr_.get())); 153bae4d13cSopenharmony_ci#else 154bae4d13cSopenharmony_ci if (pkt.ChkRWError()) { 155bae4d13cSopenharmony_ci SEN_HILOGE("Read and write status failed"); 156bae4d13cSopenharmony_ci return false; 157bae4d13cSopenharmony_ci } 158bae4d13cSopenharmony_ci StreamBuffer buf; 159bae4d13cSopenharmony_ci pkt.MakeData(buf); 160bae4d13cSopenharmony_ci return SendMsg(buf.Data(), buf.Size()); 161bae4d13cSopenharmony_ci#endif // OHOS_BUILD_ENABLE_RUST 162bae4d13cSopenharmony_ci} 163bae4d13cSopenharmony_ci 164bae4d13cSopenharmony_ciint32_t StreamSession::GetUid() const 165bae4d13cSopenharmony_ci{ 166bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST 167bae4d13cSopenharmony_ci return StreamSessionGetUid(streamSessionPtr_.get()); 168bae4d13cSopenharmony_ci#else 169bae4d13cSopenharmony_ci return uid_; 170bae4d13cSopenharmony_ci#endif // OHOS_BUILD_ENABLE_RUST 171bae4d13cSopenharmony_ci} 172bae4d13cSopenharmony_ci 173bae4d13cSopenharmony_ciint32_t StreamSession::GetPid() const 174bae4d13cSopenharmony_ci{ 175bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST 176bae4d13cSopenharmony_ci return StreamSessionGetPid(streamSessionPtr_.get()); 177bae4d13cSopenharmony_ci#else 178bae4d13cSopenharmony_ci return pid_; 179bae4d13cSopenharmony_ci#endif // OHOS_BUILD_ENABLE_RUST 180bae4d13cSopenharmony_ci} 181bae4d13cSopenharmony_ci 182bae4d13cSopenharmony_ciSessionPtr StreamSession::GetSharedPtr() 183bae4d13cSopenharmony_ci{ 184bae4d13cSopenharmony_ci return shared_from_this(); 185bae4d13cSopenharmony_ci} 186bae4d13cSopenharmony_ci 187bae4d13cSopenharmony_ciint32_t StreamSession::GetFd() const 188bae4d13cSopenharmony_ci{ 189bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST 190bae4d13cSopenharmony_ci return StreamSessionGetFd(streamSessionPtr_.get()); 191bae4d13cSopenharmony_ci#else 192bae4d13cSopenharmony_ci return fd_; 193bae4d13cSopenharmony_ci#endif // OHOS_BUILD_ENABLE_RUST 194bae4d13cSopenharmony_ci} 195bae4d13cSopenharmony_ci 196bae4d13cSopenharmony_ciconst std::string &StreamSession::GetDescript() const 197bae4d13cSopenharmony_ci{ 198bae4d13cSopenharmony_ci return descript_; 199bae4d13cSopenharmony_ci} 200bae4d13cSopenharmony_ci 201bae4d13cSopenharmony_ciconst std::string StreamSession::GetProgramName() const 202bae4d13cSopenharmony_ci{ 203bae4d13cSopenharmony_ci return programName_; 204bae4d13cSopenharmony_ci} 205bae4d13cSopenharmony_ci 206bae4d13cSopenharmony_civoid StreamSession::SetTokenType(int32_t type) 207bae4d13cSopenharmony_ci{ 208bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST 209bae4d13cSopenharmony_ci StreamSessionSetTokenType(streamSessionPtr_.get(), type); 210bae4d13cSopenharmony_ci#else 211bae4d13cSopenharmony_ci tokenType_ = type; 212bae4d13cSopenharmony_ci#endif // OHOS_BUILD_ENABLE_RUST 213bae4d13cSopenharmony_ci} 214bae4d13cSopenharmony_ci 215bae4d13cSopenharmony_ciint32_t StreamSession::GetTokenType() const 216bae4d13cSopenharmony_ci{ 217bae4d13cSopenharmony_ci#ifdef OHOS_BUILD_ENABLE_RUST 218bae4d13cSopenharmony_ci return StreamSessionGetTokenType(streamSessionPtr_.get()); 219bae4d13cSopenharmony_ci#else 220bae4d13cSopenharmony_ci return tokenType_; 221bae4d13cSopenharmony_ci#endif // OHOS_BUILD_ENABLE_RUST 222bae4d13cSopenharmony_ci} 223bae4d13cSopenharmony_ci} // namespace Sensors 224bae4d13cSopenharmony_ci} // namespace OHOS