1/* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16#include "stream_session.h" 17 18#include <cinttypes> 19#include <sstream> 20 21#include "proto.h" 22#include "sensor_errors.h" 23#include "stream_socket.h" 24 25#undef LOG_TAG 26#define LOG_TAG "StreamSession" 27 28namespace OHOS { 29namespace Sensors { 30 31StreamSession::StreamSession(const std::string &programName, const int32_t fd, const int32_t uid, const int32_t pid) 32 : programName_(programName) 33#ifdef OHOS_BUILD_ENABLE_RUST 34{ 35 StreamSessionSetFd(streamSessionPtr_.get(), fd); 36 StreamSessionSetUid(streamSessionPtr_.get(), uid); 37 StreamSessionSetPid(streamSessionPtr_.get(), pid); 38 UpdateDescript(); 39} 40#else 41, 42 fd_(fd), 43 uid_(uid), 44 pid_(pid) 45{ 46 UpdateDescript(); 47} 48#endif // OHOS_BUILD_ENABLE_RUST 49 50 51bool StreamSession::SendMsg(const char *buf, size_t size) const 52{ 53#ifdef OHOS_BUILD_ENABLE_RUST 54 return StreamSessionSendMsg(streamSessionPtr_.get(), buf, size); 55#else 56 CHKPF(buf); 57 if ((size == 0) || (size > MAX_PACKET_BUF_SIZE)) { 58 SEN_HILOGE("buf size:%{public}zu", size); 59 return false; 60 } 61 if (fd_ < 0) { 62 SEN_HILOGE("The fd_ is less than 0"); 63 return false; 64 } 65 size_t idx = 0; 66 size_t retryCount = 0; 67 size_t remSize = size; 68 while (remSize > 0 && retryCount < SEND_RETRY_LIMIT) { 69 ++retryCount; 70 auto count = send(fd_, &buf[idx], remSize, MSG_DONTWAIT | MSG_NOSIGNAL); 71 if (count < 0) { 72 if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK) { 73#ifdef OHOS_BUILD_ENABLE_RUST 74 sleep(Duration::from_micros(SEND_RETRY_SLEEP_TIME)); 75#else 76 usleep(SEND_RETRY_SLEEP_TIME); 77#endif 78 SEN_HILOGW("Continue for errno EAGAIN|EINTR|EWOULDBLOCK, errno:%{public}d", errno); 79 continue; 80 } 81 SEN_HILOGE("Send return failed, error:%{public}d, fd:%{public}d", errno, fd_); 82 return false; 83 } 84 idx += static_cast<size_t>(count); 85 remSize -= static_cast<size_t>(count); 86 if (remSize > 0) { 87#ifdef OHOS_BUILD_ENABLE_RUST 88 sleep(Duration::from_micros(SEND_RETRY_SLEEP_TIME)); 89#else 90 usleep(SEND_RETRY_SLEEP_TIME); 91#endif 92 } 93 } 94 if (retryCount >= SEND_RETRY_LIMIT || remSize != 0) { 95 SEN_HILOGE("Send too many times:%{public}zu/%{public}zu, size:%{public}zu/%{public}zu, fd:%{public}d", 96 retryCount, SEND_RETRY_LIMIT, idx, size, fd_); 97 return false; 98 } 99 return true; 100#endif // OHOS_BUILD_ENABLE_RUST 101} 102 103void StreamSession::Close() 104{ 105#ifdef OHOS_BUILD_ENABLE_RUST 106 StreamSessionClose(streamSessionPtr_.get()); 107 UpdateDescript(); 108#else 109 if (fd_ >= 0) { 110 close(fd_); 111 fd_ = -1; 112 UpdateDescript(); 113 } 114#endif // OHOS_BUILD_ENABLE_RUST 115} 116 117void StreamSession::UpdateDescript() 118{ 119#ifdef OHOS_BUILD_ENABLE_RUST 120 std::ostringstream oss; 121 oss << "fd = " << StreamSessionGetFd(streamSessionPtr_.get()) 122 << ", programName = " << programName_ 123 << ", moduleType = " << StreamSessionGetModuleType(streamSessionPtr_.get()) 124 << ((StreamSessionGetFd(streamSessionPtr_.get()) < 0) ? ", closed" : ", opened") 125 << ", uid = " << StreamSessionGetUid(streamSessionPtr_.get()) 126 << ", pid = " << StreamSessionGetPid(streamSessionPtr_.get()) 127 << ", tokenType = " << StreamSessionGetTokenType(streamSessionPtr_.get()) 128 << std::endl; 129 descript_ = oss.str().c_str(); 130#else 131 std::ostringstream oss; 132 oss << "fd = " << fd_ 133 << ", programName = " << programName_ 134 << ((fd_ < 0) ? ", closed" : ", opened") 135 << ", uid = " << uid_ 136 << ", pid = " << pid_ 137 << ", tokenType = " << tokenType_ 138 << std::endl; 139 descript_ = oss.str().c_str(); 140#endif // OHOS_BUILD_ENABLE_RUST 141} 142 143bool StreamSession::SendMsg(const NetPacket &pkt) const 144{ 145#ifdef OHOS_BUILD_ENABLE_RUST 146 if (StreamBufferChkRWError(pkt.streamBufferPtr_.get())) { 147 SEN_HILOGE("Read and write status is error"); 148 return false; 149 } 150 StreamBuffer buf; 151 pkt.MakeData(buf); 152 return SendMsg(StreamBufferData(buf.streamBufferPtr_.get()), StreamBufferSize(buf.streamBufferPtr_.get())); 153#else 154 if (pkt.ChkRWError()) { 155 SEN_HILOGE("Read and write status failed"); 156 return false; 157 } 158 StreamBuffer buf; 159 pkt.MakeData(buf); 160 return SendMsg(buf.Data(), buf.Size()); 161#endif // OHOS_BUILD_ENABLE_RUST 162} 163 164int32_t StreamSession::GetUid() const 165{ 166#ifdef OHOS_BUILD_ENABLE_RUST 167 return StreamSessionGetUid(streamSessionPtr_.get()); 168#else 169 return uid_; 170#endif // OHOS_BUILD_ENABLE_RUST 171} 172 173int32_t StreamSession::GetPid() const 174{ 175#ifdef OHOS_BUILD_ENABLE_RUST 176 return StreamSessionGetPid(streamSessionPtr_.get()); 177#else 178 return pid_; 179#endif // OHOS_BUILD_ENABLE_RUST 180} 181 182SessionPtr StreamSession::GetSharedPtr() 183{ 184 return shared_from_this(); 185} 186 187int32_t StreamSession::GetFd() const 188{ 189#ifdef OHOS_BUILD_ENABLE_RUST 190 return StreamSessionGetFd(streamSessionPtr_.get()); 191#else 192 return fd_; 193#endif // OHOS_BUILD_ENABLE_RUST 194} 195 196const std::string &StreamSession::GetDescript() const 197{ 198 return descript_; 199} 200 201const std::string StreamSession::GetProgramName() const 202{ 203 return programName_; 204} 205 206void StreamSession::SetTokenType(int32_t type) 207{ 208#ifdef OHOS_BUILD_ENABLE_RUST 209 StreamSessionSetTokenType(streamSessionPtr_.get(), type); 210#else 211 tokenType_ = type; 212#endif // OHOS_BUILD_ENABLE_RUST 213} 214 215int32_t StreamSession::GetTokenType() const 216{ 217#ifdef OHOS_BUILD_ENABLE_RUST 218 return StreamSessionGetTokenType(streamSessionPtr_.get()); 219#else 220 return tokenType_; 221#endif // OHOS_BUILD_ENABLE_RUST 222} 223} // namespace Sensors 224} // namespace OHOS