1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include <sstream>
16 #include <fstream>
17 #include <climits>
18 #include <cstring>
19 #include <cstdio>
20 #include <algorithm>
21 #include <iostream>
22 #include <thread>
23 #include <unistd.h>
24 #include <string>
25 #include <regex>
26 #include <cstdarg>
27 #include <sys/time.h>
28 #include <sys/select.h>
29 #include <netinet/in.h>
30 #include "include/sp_utils.h"
31 #include "include/startup_delay.h"
32 #include "include/sp_log.h"
33 #include "include/sdk_data_recv.h"
34 #include "memory_collector.h"
35 #include "collect_result.h"
36 #include "include/sp_task.h"
37 #include "include/sp_utils.h"
38 #include "securec.h"
39 namespace OHOS {
40     namespace SmartPerf {
SdkDataRecv()41         SdkDataRecv::SdkDataRecv()
42         {
43             FD_ZERO(&readFds);
44         }
45 
ItemData()46         std::map<std::string, std::string> SdkDataRecv::ItemData()
47         {
48             return std::map<std::string, std::string>();
49         }
50 
CreateOhSocketServer(int basePort)51         int SdkDataRecv::CreateOhSocketServer(int basePort)
52         {
53             int i = 0;
54             int socketFd = 0;
55             struct sockaddr_in address;
56             const int reuse = 1;
57 
58             socketFd = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
59             if (socketFd < 0) {
60                 LOGE("Create socket error.");
61                 return -1;
62             }
63             setsockopt(socketFd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
64 
65             std::fill_n(reinterpret_cast<char*>(&address), sizeof(address), 0);
66             address.sin_family = AF_INET;
67             address.sin_addr.s_addr = inet_addr("127.0.0.1");
68 
69             for (i = 0; i < SOCKET_PORT_NUM_PER_TYPE; i++) {
70                 address.sin_port = htons(basePort + i);
71                 if (::bind(socketFd, reinterpret_cast<struct sockaddr *>(&address), sizeof(address)) == 0) {
72                     break;
73                 }
74             }
75 
76             if (i >= SOCKET_PORT_NUM_PER_TYPE) {
77                 LOGE("Bind socket error.");
78                 return -1;
79             }
80 
81             if (listen(socketFd, OH_SOCKET_MAX) < 0) {
82                 LOGE("Listen socket error.");
83                 close(socketFd);
84                 return -1;
85             }
86 
87             LOGD("Listen port %d success, fd is %d", socketFd);
88             return socketFd;
89         }
ProcessData(std::string message, ServerParams &params)90         std::string SdkDataRecv::ProcessData(std::string message, ServerParams &params)
91         {
92             std::stringstream ss(message);
93             std::string item;
94             std::string source;
95             std::string timestamp;
96             std::string eventName;
97             std::string enable;
98             std::string value;
99             std::string realTimestamp;
100             while (std::getline(ss, item, ',')) {
101                 std::stringstream itemSS(item);
102                 std::string first;
103                 std::string second;
104                 std::getline(itemSS, first, ':');
105                 std::getline(itemSS, second, ':');
106                 if (first == "src") {
107                     source = second;
108                 } else if (first == "para0") {
109                     eventName = second;
110                 } else if (first == "time") {
111                     realTimestamp = std::to_string(std::stoll(second) - SPTask::GetInstance().GetRealStartTime());
112                     timestamp = std::to_string(std::stoll(second) - params.startTime);
113                 } else if (first == "enable") {
114                     enable = second;
115                 } else if (first == "value") {
116                     value = second;
117                 }
118             }
119             item = source + "," + timestamp + "," + eventName + "," + enable + "," + value + "\r\n";
120             sdkDataRealtimeData += source + "_" + realTimestamp + "_" + eventName + "_" + enable + "_" + value + ";";
121             return item;
122         }
123 
OhDataReceive(int index, ServerParams &params)124         std::string SdkDataRecv::OhDataReceive(int index, ServerParams &params)
125         {
126             char receiveBuf[MSG_MAX_LEN];
127             std::string resStr;
128             int readLen = 0;
129             if ((readLen = read(params.receiveFd[index], receiveBuf, MSG_MAX_LEN)) <= 0) {
130                 close(params.receiveFd[index]);
131                 params.receiveFd[index] = -1;
132                 return "";
133             }
134             LOGI("Fd %d,%d, receove %s", index, params.receiveFd[index], receiveBuf);
135             receiveBuffer = receiveBuf;
136 
137             bool processFlag = true;
138             while (processFlag) {
139                 size_t start = receiveBuffer.find('{');
140                 if (start == std::string::npos) {
141                     processFlag = false;
142                     break;
143                 }
144 
145                 size_t end = receiveBuffer.find('}', start);
146                 if (end == std::string::npos) {
147                     processFlag = false;
148                     break;
149                 }
150 
151                 std::size_t startPosition = start + 1;
152                 std::size_t length = end > start ? end - start - 1 : 0;
153 
154                 if (startPosition >= receiveBuffer.size() || length > receiveBuffer.size() - startPosition) {
155                     processFlag = false;
156                     break;
157                 }
158 
159                 std::string message = receiveBuffer.substr(startPosition, length);
160                 resStr += ProcessData(message, params);
161 
162                 receiveBuffer.erase(0, end + 1);
163 
164                 const int bufferSizeCheck = 2;
165                 if (receiveBuffer.size() <= bufferSizeCheck) {
166                     processFlag = false;
167                 }
168             }
169 
170             if (!resStr.empty() && resStr.back() == '\n') {
171                 resStr.pop_back();
172             }
173             return resStr;
174         }
175 
SetRunningState(bool state)176         void SdkDataRecv::SetRunningState(bool state)
177         {
178             collectRunring = state;
179         }
180 
ServerThread(std::vector<std::string> &dataVec)181         void SdkDataRecv::ServerThread(std::vector<std::string> &dataVec)
182         {
183             for (int i = 0; i < OH_SOCKET_MAX; i++) {
184                 sdkParams.receiveFd[i] = -1;
185             }
186             sdkParams.startTime = SPUtils::GetCurTime();
187             sdkParams.serverFd = CreateOhSocketServer(OH_DATA_PORT);
188             if (sdkParams.serverFd < 0) {
189                 LOGE("Create server failed.");
190                 return;
191             }
192             if (pipe(sdkParams.pipFd) == -1) {
193                 LOGE("Create service failed.");
194                 close(sdkParams.serverFd);
195                 return;
196             }
197             listenFd = sdkParams.pipFd[1];
198             RunServerThread(dataVec, sdkParams);
199         }
200 
RunServerThread(std::vector<std::string> &dataVec, ServerParams &params)201         void SdkDataRecv::RunServerThread(std::vector<std::string> &dataVec, ServerParams &params)
202         {
203             while (collectRunring) {
204                 SetUpFdSet(params);
205                 if (select(maxFd + 1, &readFds, nullptr, nullptr, nullptr) <= 0) {
206                     continue;
207                 }
208                 for (int i = 0; i < OH_SOCKET_MAX; i++) {
209                     HandleReceiveFd(dataVec, i, params);
210                 }
211                 HandleServerFd(params);
212             }
213             CleanUpResources(params);
214         }
215 
SetUpFdSet(ServerParams &params)216         void SdkDataRecv::SetUpFdSet(ServerParams &params)
217         {
218             FD_ZERO(&readFds);
219             FD_SET(params.serverFd, &readFds);
220             FD_SET(params.pipFd[0], &readFds);
221 
222             maxFd = std::max(params.serverFd, params.pipFd[0]);
223             for (int i = 0; i < OH_SOCKET_MAX; i++) {
224                 if (params.receiveFd[i] >= 0) {
225                     FD_SET(params.receiveFd[i], &readFds);
226                     maxFd = std::max(maxFd, params.receiveFd[i]);
227                 }
228             }
229         }
230 
HandleReceiveFd(std::vector<std::string> &dataVec, int i, ServerParams &params)231         void SdkDataRecv::HandleReceiveFd(std::vector<std::string> &dataVec, int i, ServerParams &params)
232         {
233             if (params.receiveFd[i] >= 0 && FD_ISSET(params.receiveFd[i], &readFds)) {
234                 std::string data = OhDataReceive(i, params);
235                 if (SPTask::GetInstance().GetRecordState()) {
236                     dataVec.push_back(data);
237                 }
238             }
239         }
240 
HandleServerFd(ServerParams &params)241         void SdkDataRecv::HandleServerFd(ServerParams &params)
242         {
243             if (!FD_ISSET(params.serverFd, &readFds)) {
244                 return;
245             }
246 
247             int fd = accept(params.serverFd, nullptr, nullptr);
248             if (fd < 0) {
249                 return;
250             }
251 
252             for (int i = 0; i < OH_SOCKET_MAX; i++) {
253                 if (params.receiveFd[i] < 0) {
254                     params.receiveFd[i] = fd;
255                     if (fd > maxFd) {
256                         maxFd = fd;
257                     }
258                     break;
259                 }
260             }
261         }
262 
CleanUpResources(ServerParams &params)263         void SdkDataRecv::CleanUpResources(ServerParams &params)
264         {
265             if (params.serverFd != -1) {
266                 close(params.serverFd);
267                 params.serverFd = -1;
268             }
269             if (params.pipFd[0] != -1) {
270                 close(params.pipFd[0]);
271                 params.pipFd[0] = -1;
272             }
273             for (int i = 0; i < OH_SOCKET_MAX; i++) {
274                 if (params.receiveFd[i] != -1) {
275                     close(params.receiveFd[i]);
276                     params.receiveFd[i] = -1;
277                 }
278             }
279         }
280 
GetListenFd()281         int SdkDataRecv::GetListenFd()
282         {
283             return listenFd;
284         }
SetListenFd(int fd)285         void SdkDataRecv::SetListenFd(int fd)
286         {
287             listenFd = fd;
288         }
289 
GetSdkDataRealtimeData(std::map<std::string, std::string> &dataMap)290         void SdkDataRecv::GetSdkDataRealtimeData(std::map<std::string, std::string> &dataMap)
291         {
292             if (sdkDataRealtimeData.size() > 0) {
293                 std::map<std::string, std::string> sdkDataRealtimeDataMap;
294                 sdkDataRealtimeDataMap["sdkData"] = sdkDataRealtimeData;
295                 realtimeDataLock.lock();
296                 dataMap.insert(sdkDataRealtimeDataMap.begin(), sdkDataRealtimeDataMap.end());
297                 realtimeDataLock.unlock();
298                 sdkDataRealtimeData.clear();
299             }
300         }
301 
SetStartRecordTime()302         void SdkDataRecv::SetStartRecordTime()
303         {
304             sdkParams.startTime = SPUtils::GetCurTime();
305         }
306     }
307 }