1/*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15#include <sstream>
16#include <fstream>
17#include <climits>
18#include <cstring>
19#include <cstdio>
20#include <algorithm>
21#include <iostream>
22#include <thread>
23#include <unistd.h>
24#include <string>
25#include <regex>
26#include <cstdarg>
27#include <sys/time.h>
28#include <sys/select.h>
29#include <netinet/in.h>
30#include "include/sp_utils.h"
31#include "include/startup_delay.h"
32#include "include/sp_log.h"
33#include "include/sdk_data_recv.h"
34#include "memory_collector.h"
35#include "collect_result.h"
36#include "include/sp_task.h"
37#include "include/sp_utils.h"
38#include "securec.h"
39namespace OHOS {
40    namespace SmartPerf {
41        SdkDataRecv::SdkDataRecv()
42        {
43            FD_ZERO(&readFds);
44        }
45
46        std::map<std::string, std::string> SdkDataRecv::ItemData()
47        {
48            return std::map<std::string, std::string>();
49        }
50
51        int SdkDataRecv::CreateOhSocketServer(int basePort)
52        {
53            int i = 0;
54            int socketFd = 0;
55            struct sockaddr_in address;
56            const int reuse = 1;
57
58            socketFd = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
59            if (socketFd < 0) {
60                LOGE("Create socket error.");
61                return -1;
62            }
63            setsockopt(socketFd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
64
65            std::fill_n(reinterpret_cast<char*>(&address), sizeof(address), 0);
66            address.sin_family = AF_INET;
67            address.sin_addr.s_addr = inet_addr("127.0.0.1");
68
69            for (i = 0; i < SOCKET_PORT_NUM_PER_TYPE; i++) {
70                address.sin_port = htons(basePort + i);
71                if (::bind(socketFd, reinterpret_cast<struct sockaddr *>(&address), sizeof(address)) == 0) {
72                    break;
73                }
74            }
75
76            if (i >= SOCKET_PORT_NUM_PER_TYPE) {
77                LOGE("Bind socket error.");
78                return -1;
79            }
80
81            if (listen(socketFd, OH_SOCKET_MAX) < 0) {
82                LOGE("Listen socket error.");
83                close(socketFd);
84                return -1;
85            }
86
87            LOGD("Listen port %d success, fd is %d", socketFd);
88            return socketFd;
89        }
90        std::string SdkDataRecv::ProcessData(std::string message, ServerParams &params)
91        {
92            std::stringstream ss(message);
93            std::string item;
94            std::string source;
95            std::string timestamp;
96            std::string eventName;
97            std::string enable;
98            std::string value;
99            std::string realTimestamp;
100            while (std::getline(ss, item, ',')) {
101                std::stringstream itemSS(item);
102                std::string first;
103                std::string second;
104                std::getline(itemSS, first, ':');
105                std::getline(itemSS, second, ':');
106                if (first == "src") {
107                    source = second;
108                } else if (first == "para0") {
109                    eventName = second;
110                } else if (first == "time") {
111                    realTimestamp = std::to_string(std::stoll(second) - SPTask::GetInstance().GetRealStartTime());
112                    timestamp = std::to_string(std::stoll(second) - params.startTime);
113                } else if (first == "enable") {
114                    enable = second;
115                } else if (first == "value") {
116                    value = second;
117                }
118            }
119            item = source + "," + timestamp + "," + eventName + "," + enable + "," + value + "\r\n";
120            sdkDataRealtimeData += source + "_" + realTimestamp + "_" + eventName + "_" + enable + "_" + value + ";";
121            return item;
122        }
123
124        std::string SdkDataRecv::OhDataReceive(int index, ServerParams &params)
125        {
126            char receiveBuf[MSG_MAX_LEN];
127            std::string resStr;
128            int readLen = 0;
129            if ((readLen = read(params.receiveFd[index], receiveBuf, MSG_MAX_LEN)) <= 0) {
130                close(params.receiveFd[index]);
131                params.receiveFd[index] = -1;
132                return "";
133            }
134            LOGI("Fd %d,%d, receove %s", index, params.receiveFd[index], receiveBuf);
135            receiveBuffer = receiveBuf;
136
137            bool processFlag = true;
138            while (processFlag) {
139                size_t start = receiveBuffer.find('{');
140                if (start == std::string::npos) {
141                    processFlag = false;
142                    break;
143                }
144
145                size_t end = receiveBuffer.find('}', start);
146                if (end == std::string::npos) {
147                    processFlag = false;
148                    break;
149                }
150
151                std::size_t startPosition = start + 1;
152                std::size_t length = end > start ? end - start - 1 : 0;
153
154                if (startPosition >= receiveBuffer.size() || length > receiveBuffer.size() - startPosition) {
155                    processFlag = false;
156                    break;
157                }
158
159                std::string message = receiveBuffer.substr(startPosition, length);
160                resStr += ProcessData(message, params);
161
162                receiveBuffer.erase(0, end + 1);
163
164                const int bufferSizeCheck = 2;
165                if (receiveBuffer.size() <= bufferSizeCheck) {
166                    processFlag = false;
167                }
168            }
169
170            if (!resStr.empty() && resStr.back() == '\n') {
171                resStr.pop_back();
172            }
173            return resStr;
174        }
175
176        void SdkDataRecv::SetRunningState(bool state)
177        {
178            collectRunring = state;
179        }
180
181        void SdkDataRecv::ServerThread(std::vector<std::string> &dataVec)
182        {
183            for (int i = 0; i < OH_SOCKET_MAX; i++) {
184                sdkParams.receiveFd[i] = -1;
185            }
186            sdkParams.startTime = SPUtils::GetCurTime();
187            sdkParams.serverFd = CreateOhSocketServer(OH_DATA_PORT);
188            if (sdkParams.serverFd < 0) {
189                LOGE("Create server failed.");
190                return;
191            }
192            if (pipe(sdkParams.pipFd) == -1) {
193                LOGE("Create service failed.");
194                close(sdkParams.serverFd);
195                return;
196            }
197            listenFd = sdkParams.pipFd[1];
198            RunServerThread(dataVec, sdkParams);
199        }
200
201        void SdkDataRecv::RunServerThread(std::vector<std::string> &dataVec, ServerParams &params)
202        {
203            while (collectRunring) {
204                SetUpFdSet(params);
205                if (select(maxFd + 1, &readFds, nullptr, nullptr, nullptr) <= 0) {
206                    continue;
207                }
208                for (int i = 0; i < OH_SOCKET_MAX; i++) {
209                    HandleReceiveFd(dataVec, i, params);
210                }
211                HandleServerFd(params);
212            }
213            CleanUpResources(params);
214        }
215
216        void SdkDataRecv::SetUpFdSet(ServerParams &params)
217        {
218            FD_ZERO(&readFds);
219            FD_SET(params.serverFd, &readFds);
220            FD_SET(params.pipFd[0], &readFds);
221
222            maxFd = std::max(params.serverFd, params.pipFd[0]);
223            for (int i = 0; i < OH_SOCKET_MAX; i++) {
224                if (params.receiveFd[i] >= 0) {
225                    FD_SET(params.receiveFd[i], &readFds);
226                    maxFd = std::max(maxFd, params.receiveFd[i]);
227                }
228            }
229        }
230
231        void SdkDataRecv::HandleReceiveFd(std::vector<std::string> &dataVec, int i, ServerParams &params)
232        {
233            if (params.receiveFd[i] >= 0 && FD_ISSET(params.receiveFd[i], &readFds)) {
234                std::string data = OhDataReceive(i, params);
235                if (SPTask::GetInstance().GetRecordState()) {
236                    dataVec.push_back(data);
237                }
238            }
239        }
240
241        void SdkDataRecv::HandleServerFd(ServerParams &params)
242        {
243            if (!FD_ISSET(params.serverFd, &readFds)) {
244                return;
245            }
246
247            int fd = accept(params.serverFd, nullptr, nullptr);
248            if (fd < 0) {
249                return;
250            }
251
252            for (int i = 0; i < OH_SOCKET_MAX; i++) {
253                if (params.receiveFd[i] < 0) {
254                    params.receiveFd[i] = fd;
255                    if (fd > maxFd) {
256                        maxFd = fd;
257                    }
258                    break;
259                }
260            }
261        }
262
263        void SdkDataRecv::CleanUpResources(ServerParams &params)
264        {
265            if (params.serverFd != -1) {
266                close(params.serverFd);
267                params.serverFd = -1;
268            }
269            if (params.pipFd[0] != -1) {
270                close(params.pipFd[0]);
271                params.pipFd[0] = -1;
272            }
273            for (int i = 0; i < OH_SOCKET_MAX; i++) {
274                if (params.receiveFd[i] != -1) {
275                    close(params.receiveFd[i]);
276                    params.receiveFd[i] = -1;
277                }
278            }
279        }
280
281        int SdkDataRecv::GetListenFd()
282        {
283            return listenFd;
284        }
285        void SdkDataRecv::SetListenFd(int fd)
286        {
287            listenFd = fd;
288        }
289
290        void SdkDataRecv::GetSdkDataRealtimeData(std::map<std::string, std::string> &dataMap)
291        {
292            if (sdkDataRealtimeData.size() > 0) {
293                std::map<std::string, std::string> sdkDataRealtimeDataMap;
294                sdkDataRealtimeDataMap["sdkData"] = sdkDataRealtimeData;
295                realtimeDataLock.lock();
296                dataMap.insert(sdkDataRealtimeDataMap.begin(), sdkDataRealtimeDataMap.end());
297                realtimeDataLock.unlock();
298                sdkDataRealtimeData.clear();
299            }
300        }
301
302        void SdkDataRecv::SetStartRecordTime()
303        {
304            sdkParams.startTime = SPUtils::GetCurTime();
305        }
306    }
307}