/* * Copyright (C) 2021 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "include/sp_utils.h" #include "include/startup_delay.h" #include "include/sp_log.h" #include "include/sdk_data_recv.h" #include "memory_collector.h" #include "collect_result.h" #include "include/sp_task.h" #include "include/sp_utils.h" #include "securec.h" namespace OHOS { namespace SmartPerf { SdkDataRecv::SdkDataRecv() { FD_ZERO(&readFds); } std::map SdkDataRecv::ItemData() { return std::map(); } int SdkDataRecv::CreateOhSocketServer(int basePort) { int i = 0; int socketFd = 0; struct sockaddr_in address; const int reuse = 1; socketFd = socket(AF_INET, SOCK_STREAM, IPPROTO_IP); if (socketFd < 0) { LOGE("Create socket error."); return -1; } setsockopt(socketFd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); std::fill_n(reinterpret_cast(&address), sizeof(address), 0); address.sin_family = AF_INET; address.sin_addr.s_addr = inet_addr("127.0.0.1"); for (i = 0; i < SOCKET_PORT_NUM_PER_TYPE; i++) { address.sin_port = htons(basePort + i); if (::bind(socketFd, reinterpret_cast(&address), sizeof(address)) == 0) { break; } } if (i >= SOCKET_PORT_NUM_PER_TYPE) { LOGE("Bind socket error."); return -1; } if (listen(socketFd, OH_SOCKET_MAX) < 0) { LOGE("Listen socket error."); close(socketFd); return -1; } LOGD("Listen port %d success, fd is %d", socketFd); return socketFd; } std::string SdkDataRecv::ProcessData(std::string message, ServerParams ¶ms) { std::stringstream ss(message); std::string item; std::string source; std::string timestamp; std::string eventName; std::string enable; std::string value; std::string realTimestamp; while (std::getline(ss, item, ',')) { std::stringstream itemSS(item); std::string first; std::string second; std::getline(itemSS, first, ':'); std::getline(itemSS, second, ':'); if (first == "src") { source = second; } else if (first == "para0") { eventName = second; } else if (first == "time") { realTimestamp = std::to_string(std::stoll(second) - SPTask::GetInstance().GetRealStartTime()); timestamp = std::to_string(std::stoll(second) - params.startTime); } else if (first == "enable") { enable = second; } else if (first == "value") { value = second; } } item = source + "," + timestamp + "," + eventName + "," + enable + "," + value + "\r\n"; sdkDataRealtimeData += source + "_" + realTimestamp + "_" + eventName + "_" + enable + "_" + value + ";"; return item; } std::string SdkDataRecv::OhDataReceive(int index, ServerParams ¶ms) { char receiveBuf[MSG_MAX_LEN]; std::string resStr; int readLen = 0; if ((readLen = read(params.receiveFd[index], receiveBuf, MSG_MAX_LEN)) <= 0) { close(params.receiveFd[index]); params.receiveFd[index] = -1; return ""; } LOGI("Fd %d,%d, receove %s", index, params.receiveFd[index], receiveBuf); receiveBuffer = receiveBuf; bool processFlag = true; while (processFlag) { size_t start = receiveBuffer.find('{'); if (start == std::string::npos) { processFlag = false; break; } size_t end = receiveBuffer.find('}', start); if (end == std::string::npos) { processFlag = false; break; } std::size_t startPosition = start + 1; std::size_t length = end > start ? end - start - 1 : 0; if (startPosition >= receiveBuffer.size() || length > receiveBuffer.size() - startPosition) { processFlag = false; break; } std::string message = receiveBuffer.substr(startPosition, length); resStr += ProcessData(message, params); receiveBuffer.erase(0, end + 1); const int bufferSizeCheck = 2; if (receiveBuffer.size() <= bufferSizeCheck) { processFlag = false; } } if (!resStr.empty() && resStr.back() == '\n') { resStr.pop_back(); } return resStr; } void SdkDataRecv::SetRunningState(bool state) { collectRunring = state; } void SdkDataRecv::ServerThread(std::vector &dataVec) { for (int i = 0; i < OH_SOCKET_MAX; i++) { sdkParams.receiveFd[i] = -1; } sdkParams.startTime = SPUtils::GetCurTime(); sdkParams.serverFd = CreateOhSocketServer(OH_DATA_PORT); if (sdkParams.serverFd < 0) { LOGE("Create server failed."); return; } if (pipe(sdkParams.pipFd) == -1) { LOGE("Create service failed."); close(sdkParams.serverFd); return; } listenFd = sdkParams.pipFd[1]; RunServerThread(dataVec, sdkParams); } void SdkDataRecv::RunServerThread(std::vector &dataVec, ServerParams ¶ms) { while (collectRunring) { SetUpFdSet(params); if (select(maxFd + 1, &readFds, nullptr, nullptr, nullptr) <= 0) { continue; } for (int i = 0; i < OH_SOCKET_MAX; i++) { HandleReceiveFd(dataVec, i, params); } HandleServerFd(params); } CleanUpResources(params); } void SdkDataRecv::SetUpFdSet(ServerParams ¶ms) { FD_ZERO(&readFds); FD_SET(params.serverFd, &readFds); FD_SET(params.pipFd[0], &readFds); maxFd = std::max(params.serverFd, params.pipFd[0]); for (int i = 0; i < OH_SOCKET_MAX; i++) { if (params.receiveFd[i] >= 0) { FD_SET(params.receiveFd[i], &readFds); maxFd = std::max(maxFd, params.receiveFd[i]); } } } void SdkDataRecv::HandleReceiveFd(std::vector &dataVec, int i, ServerParams ¶ms) { if (params.receiveFd[i] >= 0 && FD_ISSET(params.receiveFd[i], &readFds)) { std::string data = OhDataReceive(i, params); if (SPTask::GetInstance().GetRecordState()) { dataVec.push_back(data); } } } void SdkDataRecv::HandleServerFd(ServerParams ¶ms) { if (!FD_ISSET(params.serverFd, &readFds)) { return; } int fd = accept(params.serverFd, nullptr, nullptr); if (fd < 0) { return; } for (int i = 0; i < OH_SOCKET_MAX; i++) { if (params.receiveFd[i] < 0) { params.receiveFd[i] = fd; if (fd > maxFd) { maxFd = fd; } break; } } } void SdkDataRecv::CleanUpResources(ServerParams ¶ms) { if (params.serverFd != -1) { close(params.serverFd); params.serverFd = -1; } if (params.pipFd[0] != -1) { close(params.pipFd[0]); params.pipFd[0] = -1; } for (int i = 0; i < OH_SOCKET_MAX; i++) { if (params.receiveFd[i] != -1) { close(params.receiveFd[i]); params.receiveFd[i] = -1; } } } int SdkDataRecv::GetListenFd() { return listenFd; } void SdkDataRecv::SetListenFd(int fd) { listenFd = fd; } void SdkDataRecv::GetSdkDataRealtimeData(std::map &dataMap) { if (sdkDataRealtimeData.size() > 0) { std::map sdkDataRealtimeDataMap; sdkDataRealtimeDataMap["sdkData"] = sdkDataRealtimeData; realtimeDataLock.lock(); dataMap.insert(sdkDataRealtimeDataMap.begin(), sdkDataRealtimeDataMap.end()); realtimeDataLock.unlock(); sdkDataRealtimeData.clear(); } } void SdkDataRecv::SetStartRecordTime() { sdkParams.startTime = SPUtils::GetCurTime(); } } }