1/* 2 * Copyright (C) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15#include <sstream> 16#include <fstream> 17#include <climits> 18#include <cstring> 19#include <cstdio> 20#include <algorithm> 21#include <iostream> 22#include <thread> 23#include <unistd.h> 24#include <string> 25#include <regex> 26#include <cstdarg> 27#include <sys/time.h> 28#include <sys/select.h> 29#include <netinet/in.h> 30#include "include/sp_utils.h" 31#include "include/startup_delay.h" 32#include "include/sp_log.h" 33#include "include/sdk_data_recv.h" 34#include "memory_collector.h" 35#include "collect_result.h" 36#include "include/sp_task.h" 37#include "include/sp_utils.h" 38#include "securec.h" 39namespace OHOS { 40 namespace SmartPerf { 41 SdkDataRecv::SdkDataRecv() 42 { 43 FD_ZERO(&readFds); 44 } 45 46 std::map<std::string, std::string> SdkDataRecv::ItemData() 47 { 48 return std::map<std::string, std::string>(); 49 } 50 51 int SdkDataRecv::CreateOhSocketServer(int basePort) 52 { 53 int i = 0; 54 int socketFd = 0; 55 struct sockaddr_in address; 56 const int reuse = 1; 57 58 socketFd = socket(AF_INET, SOCK_STREAM, IPPROTO_IP); 59 if (socketFd < 0) { 60 LOGE("Create socket error."); 61 return -1; 62 } 63 setsockopt(socketFd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); 64 65 std::fill_n(reinterpret_cast<char*>(&address), sizeof(address), 0); 66 address.sin_family = AF_INET; 67 address.sin_addr.s_addr = inet_addr("127.0.0.1"); 68 69 for (i = 0; i < SOCKET_PORT_NUM_PER_TYPE; i++) { 70 address.sin_port = htons(basePort + i); 71 if (::bind(socketFd, reinterpret_cast<struct sockaddr *>(&address), sizeof(address)) == 0) { 72 break; 73 } 74 } 75 76 if (i >= SOCKET_PORT_NUM_PER_TYPE) { 77 LOGE("Bind socket error."); 78 return -1; 79 } 80 81 if (listen(socketFd, OH_SOCKET_MAX) < 0) { 82 LOGE("Listen socket error."); 83 close(socketFd); 84 return -1; 85 } 86 87 LOGD("Listen port %d success, fd is %d", socketFd); 88 return socketFd; 89 } 90 std::string SdkDataRecv::ProcessData(std::string message, ServerParams ¶ms) 91 { 92 std::stringstream ss(message); 93 std::string item; 94 std::string source; 95 std::string timestamp; 96 std::string eventName; 97 std::string enable; 98 std::string value; 99 std::string realTimestamp; 100 while (std::getline(ss, item, ',')) { 101 std::stringstream itemSS(item); 102 std::string first; 103 std::string second; 104 std::getline(itemSS, first, ':'); 105 std::getline(itemSS, second, ':'); 106 if (first == "src") { 107 source = second; 108 } else if (first == "para0") { 109 eventName = second; 110 } else if (first == "time") { 111 realTimestamp = std::to_string(std::stoll(second) - SPTask::GetInstance().GetRealStartTime()); 112 timestamp = std::to_string(std::stoll(second) - params.startTime); 113 } else if (first == "enable") { 114 enable = second; 115 } else if (first == "value") { 116 value = second; 117 } 118 } 119 item = source + "," + timestamp + "," + eventName + "," + enable + "," + value + "\r\n"; 120 sdkDataRealtimeData += source + "_" + realTimestamp + "_" + eventName + "_" + enable + "_" + value + ";"; 121 return item; 122 } 123 124 std::string SdkDataRecv::OhDataReceive(int index, ServerParams ¶ms) 125 { 126 char receiveBuf[MSG_MAX_LEN]; 127 std::string resStr; 128 int readLen = 0; 129 if ((readLen = read(params.receiveFd[index], receiveBuf, MSG_MAX_LEN)) <= 0) { 130 close(params.receiveFd[index]); 131 params.receiveFd[index] = -1; 132 return ""; 133 } 134 LOGI("Fd %d,%d, receove %s", index, params.receiveFd[index], receiveBuf); 135 receiveBuffer = receiveBuf; 136 137 bool processFlag = true; 138 while (processFlag) { 139 size_t start = receiveBuffer.find('{'); 140 if (start == std::string::npos) { 141 processFlag = false; 142 break; 143 } 144 145 size_t end = receiveBuffer.find('}', start); 146 if (end == std::string::npos) { 147 processFlag = false; 148 break; 149 } 150 151 std::size_t startPosition = start + 1; 152 std::size_t length = end > start ? end - start - 1 : 0; 153 154 if (startPosition >= receiveBuffer.size() || length > receiveBuffer.size() - startPosition) { 155 processFlag = false; 156 break; 157 } 158 159 std::string message = receiveBuffer.substr(startPosition, length); 160 resStr += ProcessData(message, params); 161 162 receiveBuffer.erase(0, end + 1); 163 164 const int bufferSizeCheck = 2; 165 if (receiveBuffer.size() <= bufferSizeCheck) { 166 processFlag = false; 167 } 168 } 169 170 if (!resStr.empty() && resStr.back() == '\n') { 171 resStr.pop_back(); 172 } 173 return resStr; 174 } 175 176 void SdkDataRecv::SetRunningState(bool state) 177 { 178 collectRunring = state; 179 } 180 181 void SdkDataRecv::ServerThread(std::vector<std::string> &dataVec) 182 { 183 for (int i = 0; i < OH_SOCKET_MAX; i++) { 184 sdkParams.receiveFd[i] = -1; 185 } 186 sdkParams.startTime = SPUtils::GetCurTime(); 187 sdkParams.serverFd = CreateOhSocketServer(OH_DATA_PORT); 188 if (sdkParams.serverFd < 0) { 189 LOGE("Create server failed."); 190 return; 191 } 192 if (pipe(sdkParams.pipFd) == -1) { 193 LOGE("Create service failed."); 194 close(sdkParams.serverFd); 195 return; 196 } 197 listenFd = sdkParams.pipFd[1]; 198 RunServerThread(dataVec, sdkParams); 199 } 200 201 void SdkDataRecv::RunServerThread(std::vector<std::string> &dataVec, ServerParams ¶ms) 202 { 203 while (collectRunring) { 204 SetUpFdSet(params); 205 if (select(maxFd + 1, &readFds, nullptr, nullptr, nullptr) <= 0) { 206 continue; 207 } 208 for (int i = 0; i < OH_SOCKET_MAX; i++) { 209 HandleReceiveFd(dataVec, i, params); 210 } 211 HandleServerFd(params); 212 } 213 CleanUpResources(params); 214 } 215 216 void SdkDataRecv::SetUpFdSet(ServerParams ¶ms) 217 { 218 FD_ZERO(&readFds); 219 FD_SET(params.serverFd, &readFds); 220 FD_SET(params.pipFd[0], &readFds); 221 222 maxFd = std::max(params.serverFd, params.pipFd[0]); 223 for (int i = 0; i < OH_SOCKET_MAX; i++) { 224 if (params.receiveFd[i] >= 0) { 225 FD_SET(params.receiveFd[i], &readFds); 226 maxFd = std::max(maxFd, params.receiveFd[i]); 227 } 228 } 229 } 230 231 void SdkDataRecv::HandleReceiveFd(std::vector<std::string> &dataVec, int i, ServerParams ¶ms) 232 { 233 if (params.receiveFd[i] >= 0 && FD_ISSET(params.receiveFd[i], &readFds)) { 234 std::string data = OhDataReceive(i, params); 235 if (SPTask::GetInstance().GetRecordState()) { 236 dataVec.push_back(data); 237 } 238 } 239 } 240 241 void SdkDataRecv::HandleServerFd(ServerParams ¶ms) 242 { 243 if (!FD_ISSET(params.serverFd, &readFds)) { 244 return; 245 } 246 247 int fd = accept(params.serverFd, nullptr, nullptr); 248 if (fd < 0) { 249 return; 250 } 251 252 for (int i = 0; i < OH_SOCKET_MAX; i++) { 253 if (params.receiveFd[i] < 0) { 254 params.receiveFd[i] = fd; 255 if (fd > maxFd) { 256 maxFd = fd; 257 } 258 break; 259 } 260 } 261 } 262 263 void SdkDataRecv::CleanUpResources(ServerParams ¶ms) 264 { 265 if (params.serverFd != -1) { 266 close(params.serverFd); 267 params.serverFd = -1; 268 } 269 if (params.pipFd[0] != -1) { 270 close(params.pipFd[0]); 271 params.pipFd[0] = -1; 272 } 273 for (int i = 0; i < OH_SOCKET_MAX; i++) { 274 if (params.receiveFd[i] != -1) { 275 close(params.receiveFd[i]); 276 params.receiveFd[i] = -1; 277 } 278 } 279 } 280 281 int SdkDataRecv::GetListenFd() 282 { 283 return listenFd; 284 } 285 void SdkDataRecv::SetListenFd(int fd) 286 { 287 listenFd = fd; 288 } 289 290 void SdkDataRecv::GetSdkDataRealtimeData(std::map<std::string, std::string> &dataMap) 291 { 292 if (sdkDataRealtimeData.size() > 0) { 293 std::map<std::string, std::string> sdkDataRealtimeDataMap; 294 sdkDataRealtimeDataMap["sdkData"] = sdkDataRealtimeData; 295 realtimeDataLock.lock(); 296 dataMap.insert(sdkDataRealtimeDataMap.begin(), sdkDataRealtimeDataMap.end()); 297 realtimeDataLock.unlock(); 298 sdkDataRealtimeData.clear(); 299 } 300 } 301 302 void SdkDataRecv::SetStartRecordTime() 303 { 304 sdkParams.startTime = SPUtils::GetCurTime(); 305 } 306 } 307}