1 /*
2  * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "pbreader_parser.h"
17 #include <unistd.h>
18 #include "app_start_filter.h"
19 #include "binder_filter.h"
20 #include "common_types.pbreader.h"
21 #include "cpu_filter.h"
22 #include "data_area.h"
23 #ifdef ENABLE_HTRACE
24 #include "ftrace_event.pbreader.h"
25 #include "trace_plugin_result.pbreader.h"
26 #endif
27 #ifdef ENABLE_MEMORY
28 #include "memory_plugin_result.pbreader.h"
29 #endif
30 #include "stat_filter.h"
31 #if IS_WASM
32 #include "wasm_func.h"
33 #endif
34 namespace SysTuning {
35 namespace TraceStreamer {
PbreaderParser(TraceDataCache *dataCache, const TraceStreamerFilters *filters)36 PbreaderParser::PbreaderParser(TraceDataCache *dataCache, const TraceStreamerFilters *filters)
37     : ParserBase(filters),
38       pbreaderClockDetailParser_(std::make_unique<PbreaderClockDetailParser>(dataCache, filters)),
39 #ifdef ENABLE_HTRACE
40       htraceCpuDetailParser_(std::make_unique<HtraceCpuDetailParser>(dataCache, filters)),
41       htraceSymbolsDetailParser_(std::make_unique<HtraceSymbolsDetailParser>(dataCache, filters)),
42 #endif
43 #ifdef ENABLE_FFRT
44       pbreaderFfrtParser_(
45           std::make_unique<PbreaderFfrtDetailParser>(dataCache, filters, htraceCpuDetailParser_->eventParser_.get())),
46 #endif
47 #ifdef ENABLE_MEMORY
48       pbreaderMemParser_(std::make_unique<PbreaderMemParser>(dataCache, filters)),
49 #endif
50 #ifdef ENABLE_HILOG
51       pbreaderHiLogParser_(std::make_unique<PbreaderHiLogParser>(dataCache, filters)),
52 #endif
53 #ifdef ENABLE_NATIVE_HOOK
54       pbreaderNativeHookParser_(std::make_unique<PbreaderNativeHookParser>(dataCache, filters)),
55 #endif
56 #ifdef ENABLE_HTDUMP
57       pbreaderHidumpParser_(std::make_unique<PbreaderHidumpParser>(dataCache, filters)),
58 #endif
59 #ifdef ENABLE_CPUDATA
60       cpuUsageParser_(std::make_unique<PbreaderCpuDataParser>(dataCache, filters)),
61 #endif
62 #ifdef ENABLE_NETWORK
63       networkParser_(std::make_unique<PbreaderNetworkParser>(dataCache, filters)),
64 #endif
65 #ifdef ENABLE_DISKIO
66       diskIOParser_(std::make_unique<PbreaderDiskIOParser>(dataCache, filters)),
67 #endif
68 #ifdef ENABLE_PROCESS
69       processParser_(std::make_unique<PbreaderProcessParser>(dataCache, filters)),
70 #endif
71 #ifdef ENABLE_HISYSEVENT
72       hisyseventParser_(std::make_unique<PbreaderHisyseventParser>(dataCache, filters)),
73 #endif
74 #ifdef ENABLE_ARKTS
75       jsMemoryParser_(std::make_unique<PbreaderJSMemoryParser>(dataCache, filters)),
76 #endif
77 #ifdef ENABLE_HIPERF
78       perfDataParser_(std::make_unique<PerfDataParser>(dataCache, filters)),
79 #endif
80 #ifdef ENABLE_EBPF
81       ebpfDataParser_(std::make_unique<EbpfDataParser>(dataCache, filters)),
82 #endif
83 #ifdef ENABLE_STREAM_EXTEND
84       pbreaderStreamParser_(std::make_unique<PbreaderStreamParser>(dataCache, filters)),
85 #endif
86       xpowerParser_(std::make_unique<PbreaderXpowerParser>(dataCache, filters)),
87       traceDataCache_(dataCache)
88 {
89     InitPluginNameIndex();
90     if (traceDataCache_->supportThread_) {
91         dataSegArray_ = std::make_unique<PbreaderDataSegment[]>(maxSegArraySize);
92     } else {
93         dataSegArray_ = std::make_unique<PbreaderDataSegment[]>(1);
94     }
95 }
InitHookPluginNameIndex()96 inline void PbreaderParser::InitHookPluginNameIndex()
97 {
98 #ifdef ENABLE_NATIVE_HOOK
99     nativeHookPluginIndex_.insert(traceDataCache_->GetDataIndex("nativehook"));
100     nativeHookPluginIndex_.insert(traceDataCache_->GetDataIndex("hookdaemon"));
101     nativeHookConfigIndex_ = traceDataCache_->GetDataIndex("nativehook_config");
102     supportPluginNameIndex_.insert(nativeHookPluginIndex_.begin(), nativeHookPluginIndex_.end());
103     supportPluginNameIndex_.insert(nativeHookConfigIndex_);
104 #endif
105 }
InitMemoryPluginNameIndex()106 inline void PbreaderParser::InitMemoryPluginNameIndex()
107 {
108 #ifdef ENABLE_MEMORY
109     memPluginIndex_ = traceDataCache_->GetDataIndex("memory-plugin");
110     memoryPluginConfigIndex_ = traceDataCache_->GetDataIndex("memory-plugin_config");
111     supportPluginNameIndex_.insert(memPluginIndex_);
112     supportPluginNameIndex_.insert(memoryPluginConfigIndex_);
113 #endif
114 }
InitHiPluginNameIndex()115 inline void PbreaderParser::InitHiPluginNameIndex()
116 {
117 #ifdef ENABLE_HTDUMP
118     hidumpPluginIndex_.insert(traceDataCache_->GetDataIndex("hidump-plugin"));
119     hidumpPluginIndex_.insert(traceDataCache_->GetDataIndex("/data/local/tmp/libhidumpplugin.z.so"));
120     supportPluginNameIndex_.insert(hidumpPluginIndex_.begin(), hidumpPluginIndex_.end());
121 #endif
122 #ifdef ENABLE_HILOG
123     hilogPluginIndex_.insert(traceDataCache_->GetDataIndex("hilog-plugin"));
124     hilogPluginIndex_.insert(traceDataCache_->GetDataIndex("/data/local/tmp/libhilogplugin.z.so"));
125     supportPluginNameIndex_.insert(hilogPluginIndex_.begin(), hilogPluginIndex_.end());
126 #endif
127 #ifdef ENABLE_HISYSEVENT
128     hisyseventPluginIndex_ = traceDataCache_->GetDataIndex("hisysevent-plugin");
129     hisyseventPluginConfigIndex_ = traceDataCache_->GetDataIndex("hisysevent-plugin_config");
130     supportPluginNameIndex_.insert(hisyseventPluginIndex_);
131     supportPluginNameIndex_.insert(hisyseventPluginConfigIndex_);
132 #endif
133 }
InitPluginNameIndex()134 void PbreaderParser::InitPluginNameIndex()
135 {
136 #ifdef ENABLE_PROCESS
137     processPluginIndex_ = traceDataCache_->GetDataIndex("process-plugin");
138     supportPluginNameIndex_.insert(processPluginIndex_);
139 #endif
140 #ifdef ENABLE_DISKIO
141     diskioPluginIndex_ = traceDataCache_->GetDataIndex("diskio-plugin");
142     supportPluginNameIndex_.insert(diskioPluginIndex_);
143 #endif
144     InitMemoryPluginNameIndex();
145     InitHiPluginNameIndex();
146 #ifdef ENABLE_CPUDATA
147     cpuPluginIndex_ = traceDataCache_->GetDataIndex("cpu-plugin");
148     supportPluginNameIndex_.insert(cpuPluginIndex_);
149 #endif
150 #ifdef ENABLE_NETWORK
151     networkPluginIndex_ = traceDataCache_->GetDataIndex("network-plugin");
152     supportPluginNameIndex_.insert(networkPluginIndex_);
153 #endif
154     InitHookPluginNameIndex();
155 #ifdef ENABLE_ARKTS
156     arktsPluginIndex_ = traceDataCache_->GetDataIndex("arkts-plugin");
157     arktsPluginConfigIndex_ = traceDataCache_->GetDataIndex("arkts-plugin_config");
158     supportPluginNameIndex_.insert(arktsPluginIndex_);
159     supportPluginNameIndex_.insert(arktsPluginConfigIndex_);
160 #endif
161 #ifdef ENABLE_HTRACE
162     ftracePluginIndex_.insert(traceDataCache_->GetDataIndex("ftrace-plugin"));
163     ftracePluginIndex_.insert(traceDataCache_->GetDataIndex("/data/local/tmp/libftrace_plugin.z.so"));
164     supportPluginNameIndex_.insert(ftracePluginIndex_.begin(), ftracePluginIndex_.end());
165 #endif
166 #ifdef ENABLE_FFRT
167     ffrtPluginIndex_ = traceDataCache_->GetDataIndex("ffrt-profiler");
168     ffrtPluginConfigIndex_ = traceDataCache_->GetDataIndex("ffrt-profiler_config");
169     supportPluginNameIndex_.insert(ffrtPluginIndex_);
170     supportPluginNameIndex_.insert(ffrtPluginConfigIndex_);
171 #endif
172 #ifdef ENABLE_STREAM_EXTEND
173     streamPluginIndex_ = traceDataCache_->GetDataIndex("stream-plugin");
174     supportPluginNameIndex_.insert(streamPluginIndex_);
175 #endif
176     xpowerPluginIndex_ = traceDataCache_->GetDataIndex("xpower-plugin");
177     supportPluginNameIndex_.insert(xpowerPluginIndex_);
178 }
179 
180 #if defined(ENABLE_HIPERF) || defined(ENABLE_NATIVE_HOOK) || defined(ENABLE_EBPF)
ParserFileSO(std::string &directory, const std::vector<std::string> &relativeFilePaths)181 void PbreaderParser::ParserFileSO(std::string &directory, const std::vector<std::string> &relativeFilePaths)
182 {
183     for (const auto &filePath : relativeFilePaths) {
184         auto symbolsFile = OHOS::Developtools::HiPerf::SymbolsFile::CreateSymbolsFile(SYMBOL_ELF_FILE, filePath);
185         symbolsFile->setSymbolsFilePath(directory);
186         auto res = symbolsFile->LoadSymbols(nullptr, filePath);
187         if (!res) {
188             continue;
189         }
190         symbolsFiles_.emplace_back(std::move(symbolsFile));
191     }
192 }
193 #endif
194 
~PbreaderParser()195 PbreaderParser::~PbreaderParser()
196 {
197     TS_LOGI("clockid 2 is for RealTime and 1 is for BootTime");
198 }
199 
ReparseSymbolFilesAndResymbolization(std::string &symbolsPath, std::vector<std::string> &symbolsPaths)200 bool PbreaderParser::ReparseSymbolFilesAndResymbolization(std::string &symbolsPath,
201                                                           std::vector<std::string> &symbolsPaths)
202 {
203     auto parseStatus = false;
204 #if defined(ENABLE_HIPERF) || defined(ENABLE_NATIVE_HOOK) || defined(ENABLE_EBPF)
205     ParserFileSO(symbolsPath, symbolsPaths);
206 #endif
207 #ifdef ENABLE_HIPERF
208     if (traceDataCache_->GetPerfFilesData()->Size() > 0) {
209         perfDataParser_->PerfReloadSymbolFiles(symbolsFiles_);
210         parseStatus = true;
211     }
212 #endif
213 #ifdef ENABLE_NATIVE_HOOK
214     if (traceDataCache_->GetNativeHookFrameData()->Size() > 0) {
215         pbreaderNativeHookParser_->NativeHookReloadElfSymbolTable(symbolsFiles_);
216         parseStatus = true;
217     }
218 #endif
219 #ifdef ENABLE_EBPF
220     if (traceDataCache_->GetEbpfCallStack()->Size() > 0) {
221         ebpfDataParser_->EBPFReloadElfSymbolTable(symbolsFiles_);
222         parseStatus = true;
223     }
224 #endif
225     symbolsFiles_.clear();
226     return parseStatus;
227 }
228 
WaitForHPluginParserEnd()229 inline void PbreaderParser::WaitForHPluginParserEnd()
230 {
231 #ifdef ENABLE_HTRACE
232     htraceCpuDetailParser_->FilterAllEvents();
233     traceDataCache_->GetDataSourceClockIdData()->SetDataSourceClockId(DATA_SOURCE_TYPE_TRACE,
234                                                                       dataSourceTypeTraceClockid_);
235 #endif
236 #ifdef ENABLE_HILOG
237     pbreaderHiLogParser_->Finish();
238     traceDataCache_->GetDataSourceClockIdData()->SetDataSourceClockId(DATA_SOURCE_TYPE_HILOG,
239                                                                       dataSourceTypeHilogClockid_);
240 #endif
241 #ifdef ENABLE_HTDUMP
242     pbreaderHidumpParser_->Finish();
243     traceDataCache_->GetDataSourceClockIdData()->SetDataSourceClockId(DATA_SOURCE_TYPE_FPS, dataSourceTypeFpsClockid_);
244 #endif
245 #ifdef ENABLE_HISYSEVENT
246     hisyseventParser_->Finish();
247     traceDataCache_->GetDataSourceClockIdData()->SetDataSourceClockId(DATA_SOURCE_TYPE_HISYSEVENT,
248                                                                       dataSourceTypeHisyseventClockid_);
249 #endif
250 }
251 
WaitForOtherPluginParserEnd()252 inline void PbreaderParser::WaitForOtherPluginParserEnd()
253 {
254 #ifdef ENABLE_NATIVE_HOOK
255     pbreaderNativeHookParser_->FinishParseNativeHookData();
256     pbreaderNativeHookParser_->Finish();
257     traceDataCache_->GetDataSourceClockIdData()->SetDataSourceClockId(DATA_SOURCE_TYPE_NATIVEHOOK,
258                                                                       dataSourceTypeNativeHookClockid_);
259 #endif
260 #ifdef ENABLE_CPUDATA
261     cpuUsageParser_->Finish();
262     traceDataCache_->GetDataSourceClockIdData()->SetDataSourceClockId(DATA_SOURCE_TYPE_CPU, dataSourceTypeCpuClockid_);
263 #endif
264 #ifdef ENABLE_NETWORK
265     networkParser_->Finish();
266     traceDataCache_->GetDataSourceClockIdData()->SetDataSourceClockId(DATA_SOURCE_TYPE_NETWORK,
267                                                                       dataSourceTypeNetworkClockid_);
268 #endif
269 #ifdef ENABLE_PROCESS
270     processParser_->Finish();
271     traceDataCache_->GetDataSourceClockIdData()->SetDataSourceClockId(DATA_SOURCE_TYPE_PROCESS,
272                                                                       dataSourceTypeProcessClockid_);
273 #endif
274 #ifdef ENABLE_DISKIO
275     diskIOParser_->Finish();
276     traceDataCache_->GetDataSourceClockIdData()->SetDataSourceClockId(DATA_SOURCE_TYPE_DISKIO,
277                                                                       dataSourceTypeDiskioClockid_);
278 #endif
279 #ifdef ENABLE_ARKTS
280     jsMemoryParser_->Finish();
281     traceDataCache_->GetDataSourceClockIdData()->SetDataSourceClockId(DATA_SOURCE_TYPE_JSMEMORY,
282                                                                       dataSourceTypeJSMemoryClockid_);
283 #endif
284 #ifdef ENABLE_EBPF
285     ebpfDataParser_->Finish(); // keep final upate perf and ebpf data time range
286 #endif
287 #ifdef ENABLE_HIPERF
288     perfDataParser_->Finish();
289 #endif
290 #ifdef ENABLE_MEMORY
291     pbreaderMemParser_->Finish();
292     traceDataCache_->GetDataSourceClockIdData()->SetDataSourceClockId(DATA_SOURCE_TYPE_MEM, dataSourceTypeMemClockid_);
293 #endif
294 }
295 
WaitForParserEnd()296 void PbreaderParser::WaitForParserEnd()
297 {
298     if (parseThreadStarted_ || filterThreadStarted_) {
299         toExit_ = true;
300         while (!exited_) {
301             usleep(sleepDur_ * sleepDur_);
302         }
303     }
304     hasGotHeader_ = false;
305     WaitForHPluginParserEnd();
306     WaitForOtherPluginParserEnd();
307 #if defined(ENABLE_HTRACE) && defined(ENABLE_NATIVE_HOOK) && defined(ENABLE_HIPERF)
308     ParseNapiAsync();
309 #endif
310     traceDataCache_->GetDataSourceClockIdData()->Finish();
311     dataSegArray_.reset();
312     processedDataLen_ = 0;
313 }
314 
ParseTraceDataItem(const std::string &buffer)315 void PbreaderParser::ParseTraceDataItem(const std::string &buffer)
316 {
317     int32_t head = rawDataHead_;
318     if (!traceDataCache_->supportThread_ || traceDataCache_->isSplitFile_) {
319         dataSegArray_[head].seg = std::make_shared<std::string>(std::move(buffer));
320         dataSegArray_[head].status = TS_PARSE_STATUS_SEPRATED;
321         ParserData(dataSegArray_[head], traceDataCache_->isSplitFile_);
322         return;
323     }
324     while (!toExit_) {
325         if (dataSegArray_[head].status.load() != TS_PARSE_STATUS_INIT) {
326             usleep(sleepDur_);
327             continue;
328         }
329         dataSegArray_[head].seg = std::make_shared<std::string>(std::move(buffer));
330         dataSegArray_[head].status = TS_PARSE_STATUS_SEPRATED;
331         rawDataHead_ = (rawDataHead_ + 1) % maxSegArraySize;
332         break;
333     }
334     if (!parseThreadStarted_) {
335         parseThreadStarted_ = true;
336         int32_t tmp = traceDataCache_->parserThreadNum_;
337         while (tmp--) {
338             parserThreadCount_++;
339             std::thread ParseTypeThread(&PbreaderParser::ParseThread, this);
340             ParseTypeThread.detach();
341             TS_LOGI("parser Thread:%d/%d start working ...\n", traceDataCache_->parserThreadNum_ - tmp,
342                     traceDataCache_->parserThreadNum_);
343         }
344     }
345     if (!filterThreadStarted_) {
346         filterThreadStarted_ = true;
347         std::thread FilterTypeThread(&PbreaderParser::FilterThread, this);
348         TS_LOGI("FilterThread start working ...");
349         FilterTypeThread.detach();
350     }
351 }
352 
353 #ifdef ENABLE_ARKTS
EnableFileSeparate(bool enabled)354 void PbreaderParser::EnableFileSeparate(bool enabled)
355 {
356     jsMemoryParser_->EnableSaveFile(enabled);
357 }
358 #endif
FilterData(PbreaderDataSegment &seg, bool isSplitFile)359 void PbreaderParser::FilterData(PbreaderDataSegment &seg, bool isSplitFile)
360 {
361     bool haveSplitSeg = false;
362     if (seg.dataType == DATA_SOURCE_TYPE_TRACE) {
363 #ifdef ENABLE_HTRACE
364         htraceCpuDetailParser_->FilterAllEventsReader();
365 #endif
366     }
367 #ifdef ENABLE_FFRT
368     else if (seg.dataType == DATA_SOURCE_TYPE_FFRT) {
369         pbreaderFfrtParser_->FilterAllEventsReader();
370     }
371 #endif
372 #ifdef ENABLE_NATIVE_HOOK
373     else if (seg.dataType == DATA_SOURCE_TYPE_NATIVEHOOK) {
374         pbreaderNativeHookParser_->Parse(seg, haveSplitSeg);
375     } else if (seg.dataType == DATA_SOURCE_TYPE_NATIVEHOOK_CONFIG) {
376         pbreaderNativeHookParser_->ParseConfigInfo(seg);
377     }
378 #endif
379 #ifdef ENABLE_MEMORY
380     else if (seg.dataType == DATA_SOURCE_TYPE_MEM) {
381         pbreaderMemParser_->Parse(seg, seg.timeStamp, seg.clockId);
382     } else if (seg.dataType == DATA_SOURCE_TYPE_MEM_CONFIG) {
383         pbreaderMemParser_->ParseMemoryConfig(seg);
384     }
385 #endif
386 #ifdef ENABLE_HILOG
387     else if (seg.dataType == DATA_SOURCE_TYPE_HILOG) {
388         pbreaderHiLogParser_->Parse(seg.protoData, haveSplitSeg);
389     }
390 #endif
391 #ifdef ENABLE_CPUDATA
392     else if (seg.dataType == DATA_SOURCE_TYPE_CPU) {
393         cpuUsageParser_->Parse(seg.protoData, seg.timeStamp);
394     }
395 #endif
396 #ifdef ENABLE_HTDUMP
397     else if (seg.dataType == DATA_SOURCE_TYPE_FPS) {
398         pbreaderHidumpParser_->Parse(seg.protoData);
399         dataSourceTypeFpsClockid_ = pbreaderHidumpParser_->ClockId();
400     }
401 #endif
402 #ifdef ENABLE_NETWORK
403     else if (seg.dataType == DATA_SOURCE_TYPE_NETWORK) {
404         networkParser_->Parse(seg.protoData, seg.timeStamp);
405     }
406 #endif
407 #ifdef ENABLE_PROCESS
408     else if (seg.dataType == DATA_SOURCE_TYPE_PROCESS) {
409         processParser_->Parse(seg.protoData, seg.timeStamp);
410     }
411 #endif
412 #ifdef ENABLE_DISKIO
413     else if (seg.dataType == DATA_SOURCE_TYPE_DISKIO) {
414         diskIOParser_->Parse(seg.protoData, seg.timeStamp);
415     }
416 #endif
417 #ifdef ENABLE_ARKTS
418     else if (seg.dataType == DATA_SOURCE_TYPE_JSMEMORY) {
419         jsMemoryParser_->Parse(seg.protoData, seg.timeStamp, traceDataCache_->SplitFileMinTime(),
420                                traceDataCache_->SplitFileMaxTime(), profilerPluginData_);
421     } else if (seg.dataType == DATA_SOURCE_TYPE_JSMEMORY_CONFIG) {
422         jsMemoryParser_->ParseJSMemoryConfig(seg.protoData);
423     }
424 #endif
425 #ifdef ENABLE_HISYSEVENT
426     else if (seg.dataType == DATA_SOURCE_TYPE_HISYSEVENT) {
427         ProtoReader::HisyseventInfo_Reader hisyseventInfo(seg.protoData.data_, seg.protoData.size_);
428         hisyseventParser_->Parse(&hisyseventInfo, seg.timeStamp, haveSplitSeg);
429     } else if (seg.dataType == DATA_SOURCE_TYPE_HISYSEVENT_CONFIG) {
430         ProtoReader::HisyseventConfig_Reader hisyseventConfig(seg.protoData.data_, seg.protoData.size_);
431         hisyseventParser_->Parse(&hisyseventConfig, seg.timeStamp);
432     }
433 #endif
434 #ifdef ENABLE_STREAM_EXTEND
435     else if (seg.dataType == DATA_SOURCE_TYPE_STREAM) {
436         pbreaderStreamParser_->Parse(seg);
437     }
438 #endif
439     else if (seg.dataType == DATA_SOURCE_TYPE_XPOWER) {
440         xpowerParser_->Parse(seg, seg.timeStamp, seg.clockId);
441     }
442     if (traceDataCache_->isSplitFile_ && haveSplitSeg) {
443         mPbreaderSplitData_.emplace(splitFileOffset_, nextLength_ + packetSegLength_);
444     }
445     if (traceDataCache_->supportThread_ && !traceDataCache_->isSplitFile_) {
446         filterHead_ = (filterHead_ + 1) % maxSegArraySize;
447     }
448     seg.status = TS_PARSE_STATUS_INIT;
449 }
FilterThread()450 void PbreaderParser::FilterThread()
451 {
452     TS_LOGI("filter thread start work!");
453     while (true) {
454         PbreaderDataSegment &seg = dataSegArray_[filterHead_];
455         if (seg.status.load() == TS_PARSE_STATUS_INVALID) {
456             seg.status = TS_PARSE_STATUS_INIT;
457             filterHead_ = (filterHead_ + 1) % maxSegArraySize;
458             TS_LOGD("seprateHead_d:\t%d, parseHead_:\t%d, filterHead_:\t%d\n", rawDataHead_, parseHead_, filterHead_);
459             continue;
460         }
461         if (seg.status.load() != TS_PARSE_STATUS_PARSED) {
462             if (toExit_ && !parserThreadCount_) {
463                 TS_LOGI("exiting Filter Thread");
464                 exited_ = true;
465                 filterThreadStarted_ = false;
466                 TS_LOGI("seprateHead:\t%d, parseHead_:\t%d, filterHead_:\t%d, status:%d\n", rawDataHead_, parseHead_,
467                         filterHead_, seg.status.load());
468                 return;
469             }
470             TS_LOGD("seprateHead:\t%d, parseHead_:\t%d, filterHead_:\t%d, status:%d\n", rawDataHead_, parseHead_,
471                     filterHead_, seg.status.load());
472             usleep(sleepDur_);
473             continue;
474         }
475         FilterData(seg, false);
476     }
477 }
478 
SpliteConfigData(const std::string &pluginName, const PbreaderDataSegment &dataSeg)479 bool PbreaderParser::SpliteConfigData(const std::string &pluginName, const PbreaderDataSegment &dataSeg)
480 {
481     if (EndWith(pluginName, "arkts-plugin_config")) {
482         std::string dataString(dataSeg.seg->c_str(), dataSeg.seg->length());
483         arkTsConfigData_ = lenBuffer_ + dataString;
484         return true;
485     } else if (EndWith(pluginName, "config")) {
486         mPbreaderSplitData_.emplace(splitFileOffset_, nextLength_ + packetSegLength_);
487         return true;
488     }
489     return false;
490 }
491 
SpliteDataBySegment(DataIndex pluginNameIndex, PbreaderDataSegment &dataSeg)492 bool PbreaderParser::SpliteDataBySegment(DataIndex pluginNameIndex, PbreaderDataSegment &dataSeg)
493 {
494     bool isOtherPlugin = false;
495 #ifdef ENABLE_HTRACE
496     isOtherPlugin = isOtherPlugin || ftracePluginIndex_.count(pluginNameIndex);
497 #endif
498 #ifdef ENABLE_FFRT
499     isOtherPlugin = isOtherPlugin || (ffrtPluginIndex_ == pluginNameIndex);
500 #endif
501 #ifdef ENABLE_HISYSEVENT
502     isOtherPlugin = isOtherPlugin || (hisyseventPluginIndex_ == pluginNameIndex);
503 #endif
504 #ifdef ENABLE_NATIVE_HOOK
505     isOtherPlugin = isOtherPlugin || nativeHookPluginIndex_.count(pluginNameIndex);
506 #endif
507 #ifdef ENABLE_HILOG
508     isOtherPlugin = isOtherPlugin || hilogPluginIndex_.count(pluginNameIndex);
509 #endif
510     if (isOtherPlugin) {
511         return false;
512     }
513     // need convert to Primary Time Plugin
514 #ifdef ENABLE_MEMORY
515     if (pluginNameIndex == memPluginIndex_) {
516         dataSeg.timeStamp = streamFilters_->clockFilter_->ToPrimaryTraceTime(TS_CLOCK_REALTIME, dataSeg.timeStamp);
517         UpdatePluginTimeRange(TS_CLOCK_BOOTTIME, dataSeg.timeStamp, dataSeg.timeStamp);
518     }
519 #endif
520     if (dataSeg.timeStamp >= traceDataCache_->SplitFileMinTime() &&
521         dataSeg.timeStamp <= traceDataCache_->SplitFileMaxTime()) {
522         mPbreaderSplitData_.emplace(splitFileOffset_, nextLength_ + packetSegLength_);
523     }
524     if (pluginNameIndex == arktsPluginConfigIndex_ || pluginNameIndex == arktsPluginIndex_) {
525         return false;
526     }
527     return true;
528 }
ParseDataByPluginName(PbreaderDataSegment &dataSeg, DataIndex pulginNameIndex, const ProtoReader::ProfilerPluginData_Reader &pluginDataZero, bool isSplitFile)529 void PbreaderParser::ParseDataByPluginName(PbreaderDataSegment &dataSeg,
530                                            DataIndex pulginNameIndex,
531                                            const ProtoReader::ProfilerPluginData_Reader &pluginDataZero,
532                                            bool isSplitFile)
533 {
534     if (ftracePluginIndex_.count(pulginNameIndex)) { // ok
535 #ifdef ENABLE_HTRACE
536         ParseFtrace(dataSeg);
537 #endif
538     }
539 #ifdef ENABLE_FFRT
540     else if (ffrtPluginIndex_ == pulginNameIndex) {
541         ParseFfrt(dataSeg);
542     } else if (ffrtPluginConfigIndex_ == pulginNameIndex) {
543         ParseFfrtConfig(dataSeg);
544     }
545 #endif
546 #ifdef ENABLE_NATIVE_HOOK
547     else if (nativeHookPluginIndex_.count(pulginNameIndex)) {
548         ParseNativeHook(dataSeg, isSplitFile);
549     } else if (pulginNameIndex == nativeHookConfigIndex_) {
550         ParseNativeHookConfig(dataSeg);
551     }
552 #endif
553 #ifdef ENABLE_MEMORY
554     else if (pulginNameIndex == memPluginIndex_) {
555         ParseMemory(pluginDataZero, dataSeg);
556     } else if (pulginNameIndex == memoryPluginConfigIndex_) {
557         ParseMemoryConfig(dataSeg, pluginDataZero);
558     }
559 #endif
560 #ifdef ENABLE_HILOG
561     else if (hilogPluginIndex_.count(pulginNameIndex)) {
562         ParseHilog(dataSeg);
563     }
564 #endif
565 #ifdef ENABLE_HTDUMP
566     else if (hidumpPluginIndex_.count(pulginNameIndex)) {
567         ParseFPS(dataSeg);
568     }
569 #endif
570 #ifdef ENABLE_CPUDATA
571     else if (pulginNameIndex == cpuPluginIndex_) {
572         ParseCpuUsage(dataSeg);
573     }
574 #endif
575 #ifdef ENABLE_NETWORK
576     else if (pulginNameIndex == networkPluginIndex_) {
577         ParseNetwork(dataSeg);
578     }
579 #endif
580 #ifdef ENABLE_DISKIO
581     else if (pulginNameIndex == diskioPluginIndex_) {
582         ParseDiskIO(dataSeg);
583     }
584 #endif
585 #ifdef ENABLE_PROCESS
586     else if (pulginNameIndex == processPluginIndex_) {
587         ParseProcess(dataSeg);
588     }
589 #endif
590 #ifdef ENABLE_HISYSEVENT
591     else if (pulginNameIndex == hisyseventPluginIndex_) {
592         ParseHisysevent(dataSeg);
593     } else if (pulginNameIndex == hisyseventPluginConfigIndex_) {
594         ParseHisyseventConfig(dataSeg);
595     }
596 #endif
597 #ifdef ENABLE_ARKTS
598     else if (pulginNameIndex == arktsPluginIndex_) {
599         ParseJSMemory(pluginDataZero, dataSeg, isSplitFile);
600     } else if (pulginNameIndex == arktsPluginConfigIndex_) {
601         ParseJSMemoryConfig(dataSeg);
602     }
603 #endif
604 #ifdef ENABLE_STREAM_EXTEND
605     else if (pulginNameIndex == streamPluginIndex_) { // for trace extend demo
606         ParseStream(dataSeg);
607     }
608 #endif
609     else if (pulginNameIndex == xpowerPluginIndex_) {
610         ParseXpower(dataSeg);
611     }
612 }
613 
ParserData(PbreaderDataSegment &dataSeg, bool isSplitFile)614 void PbreaderParser::ParserData(PbreaderDataSegment &dataSeg, bool isSplitFile)
615 {
616     ProtoReader::ProfilerPluginData_Reader pluginDataZero(reinterpret_cast<const uint8_t *>(dataSeg.seg->c_str()),
617                                                           dataSeg.seg->length());
618     if (!pluginDataZero.has_name()) {
619         return;
620     }
621     auto pluginName = pluginDataZero.name().ToStdString();
622     auto pluginNameIndex = traceDataCache_->GetDataIndex(pluginName);
623     if (isSplitFile && SpliteConfigData(pluginName, dataSeg)) {
624         return;
625     }
626     if (pluginDataZero.has_tv_sec() && pluginDataZero.has_tv_nsec()) {
627         dataSeg.timeStamp = pluginDataZero.tv_sec() * SEC_TO_NS + pluginDataZero.tv_nsec();
628     }
629 
630     if (isSplitFile && SpliteDataBySegment(pluginNameIndex, dataSeg)) {
631         return;
632     }
633     if (supportPluginNameIndex_.count(pluginNameIndex)) {
634         dataSeg.protoData = pluginDataZero.data();
635         ParseDataByPluginName(dataSeg, pluginNameIndex, pluginDataZero, isSplitFile);
636     } else {
637 #if IS_WASM
638         TraceStreamerPluginOutFilter(reinterpret_cast<const char *>(pluginDataZero.data().data_),
639                                      pluginDataZero.data().size_, pluginName);
640 #endif
641         dataSeg.status = TS_PARSE_STATUS_INVALID;
642         streamFilters_->statFilter_->IncreaseStat(TRACE_EVENT_OTHER, STAT_EVENT_DATA_INVALID);
643         return;
644     }
645     if (!traceDataCache_->supportThread_ || traceDataCache_->isSplitFile_) {
646         FilterData(dataSeg, isSplitFile);
647     }
648 }
ParseThread()649 void PbreaderParser::ParseThread()
650 {
651     TS_LOGI("parser thread start work!\n");
652     while (true) {
653         int32_t head = GetNextSegment();
654         if (head < 0) {
655             if (head == ERROR_CODE_EXIT) {
656                 TS_LOGI("parse thread exit");
657                 return;
658             } else if (head == ERROR_CODE_NODATA) {
659                 continue;
660             }
661         }
662         PbreaderDataSegment &dataSeg = dataSegArray_[head];
663         ParserData(dataSeg, false);
664     }
665 }
666 
667 #ifdef ENABLE_MEMORY
ParseMemory(const ProtoReader::ProfilerPluginData_Reader &pluginDataZero, PbreaderDataSegment &dataSeg)668 void PbreaderParser::ParseMemory(const ProtoReader::ProfilerPluginData_Reader &pluginDataZero,
669                                  PbreaderDataSegment &dataSeg)
670 {
671     BuiltinClocks clockId = TS_CLOCK_REALTIME;
672     dataSourceTypeMemClockid_ = clockId;
673     dataSeg.dataType = DATA_SOURCE_TYPE_MEM;
674     dataSeg.clockId = clockId;
675     dataSeg.status = TS_PARSE_STATUS_PARSED;
676 }
ParseMemoryConfig(PbreaderDataSegment &dataSeg, const ProtoReader::ProfilerPluginData_Reader &pluginDataZero)677 void PbreaderParser::ParseMemoryConfig(PbreaderDataSegment &dataSeg,
678                                        const ProtoReader::ProfilerPluginData_Reader &pluginDataZero)
679 {
680     if (pluginDataZero.has_sample_interval()) {
681         uint32_t sampleInterval = pluginDataZero.sample_interval();
682         traceDataCache_->GetTraceConfigData()->AppendNewData("memory_config", "sample_interval",
683                                                              std::to_string(sampleInterval));
684     }
685     dataSeg.dataType = DATA_SOURCE_TYPE_MEM_CONFIG;
686     dataSeg.status = TS_PARSE_STATUS_PARSED;
687 }
688 #endif
689 #ifdef ENABLE_HILOG
ParseHilog(PbreaderDataSegment &dataSeg)690 void PbreaderParser::ParseHilog(PbreaderDataSegment &dataSeg)
691 {
692     dataSeg.dataType = DATA_SOURCE_TYPE_HILOG;
693     dataSourceTypeHilogClockid_ = TS_CLOCK_REALTIME;
694     dataSeg.status = TS_PARSE_STATUS_PARSED;
695 }
696 #endif
697 #ifdef ENABLE_NATIVE_HOOK
ParseNativeHookConfig(PbreaderDataSegment &dataSeg)698 void PbreaderParser::ParseNativeHookConfig(PbreaderDataSegment &dataSeg)
699 {
700     dataSeg.dataType = DATA_SOURCE_TYPE_NATIVEHOOK_CONFIG;
701     dataSeg.status = TS_PARSE_STATUS_PARSED;
702 }
ParseNativeHook(PbreaderDataSegment &dataSeg, bool isSplitFile)703 void PbreaderParser::ParseNativeHook(PbreaderDataSegment &dataSeg, bool isSplitFile)
704 {
705     dataSourceTypeNativeHookClockid_ = TS_CLOCK_REALTIME;
706     dataSeg.dataType = DATA_SOURCE_TYPE_NATIVEHOOK;
707     dataSeg.status = TS_PARSE_STATUS_PARSED;
708     if (isSplitFile) {
709         dataSourceType_ = DATA_SOURCE_TYPE_NATIVEHOOK;
710     }
711 }
712 #endif
713 
714 #ifdef ENABLE_HTRACE
ParseFtrace(PbreaderDataSegment &dataSeg)715 void PbreaderParser::ParseFtrace(PbreaderDataSegment &dataSeg)
716 {
717     dataSeg.dataType = DATA_SOURCE_TYPE_TRACE;
718     ProtoReader::TracePluginResult_Reader tracePluginResult(dataSeg.protoData);
719     if (tracePluginResult.has_ftrace_cpu_stats()) {
720         auto cpuStats = *tracePluginResult.ftrace_cpu_stats();
721         ProtoReader::FtraceCpuStatsMsg_Reader ftraceCpuStatsMsg(cpuStats.data_, cpuStats.size_);
722         auto s = *ftraceCpuStatsMsg.per_cpu_stats();
723         ProtoReader::PerCpuStatsMsg_Reader perCpuStatsMsg(s.data_, s.size_);
724         TS_LOGD("s.overrun():%" PRIu64 "", perCpuStatsMsg.overrun());
725         TS_LOGD("s.dropped_events():%" PRIu64 "", perCpuStatsMsg.dropped_events());
726         auto clock = ftraceCpuStatsMsg.trace_clock().ToStdString();
727         if (clock == "boot") {
728             clock_ = TS_CLOCK_BOOTTIME;
729         } else if (clock == "mono") {
730             clock_ = TS_MONOTONIC;
731         } else {
732             TS_LOGI("invalid clock:%s", clock.c_str());
733             dataSeg.status = TS_PARSE_STATUS_INVALID;
734             return;
735         }
736         dataSeg.clockId = clock_;
737         dataSourceTypeTraceClockid_ = clock_;
738         dataSeg.status = TS_PARSE_STATUS_PARSED;
739         return;
740     }
741     bool haveSplitSeg = false;
742     dataSeg.clockId = clock_;
743     if (tracePluginResult.has_ftrace_cpu_detail()) {
744         htraceCpuDetailParser_->Parse(dataSeg, tracePluginResult, haveSplitSeg);
745     }
746     if (tracePluginResult.has_symbols_detail()) {
747         htraceSymbolsDetailParser_->Parse(dataSeg.protoData); // has Event
748         haveSplitSeg = true;
749     }
750     if (tracePluginResult.has_clocks_detail()) {
751         pbreaderClockDetailParser_->Parse(dataSeg.protoData); // has Event
752         haveSplitSeg = true;
753     }
754     if (traceDataCache_->isSplitFile_ && haveSplitSeg) {
755         mPbreaderSplitData_.emplace(splitFileOffset_, nextLength_ + packetSegLength_);
756     }
757     if (tracePluginResult.has_ftrace_cpu_detail() || tracePluginResult.has_clocks_detail() ||
758         tracePluginResult.has_symbols_detail()) {
759         dataSeg.status = TS_PARSE_STATUS_PARSED;
760     } else {
761         dataSeg.status = TS_PARSE_STATUS_INVALID;
762     }
763 }
764 #endif
765 #ifdef ENABLE_FFRT
ParseFfrtConfig(PbreaderDataSegment &dataSeg)766 void PbreaderParser::ParseFfrtConfig(PbreaderDataSegment &dataSeg)
767 {
768     pbreaderFfrtParser_->SetFfrtSrcClockid(dataSeg);
769     dataSeg.dataType = DATA_SOURCE_TYPE_FFRT_CONFIG;
770     dataSeg.status = TS_PARSE_STATUS_PARSED;
771 }
ParseFfrt(PbreaderDataSegment &dataSeg)772 void PbreaderParser::ParseFfrt(PbreaderDataSegment &dataSeg)
773 {
774     bool haveSplitSeg = false;
775     pbreaderFfrtParser_->Parser(dataSeg, haveSplitSeg);
776     if (haveSplitSeg) {
777         mPbreaderSplitData_.emplace(splitFileOffset_, nextLength_ + packetSegLength_);
778     }
779     dataSeg.dataType = DATA_SOURCE_TYPE_FFRT;
780     dataSeg.status = TS_PARSE_STATUS_PARSED;
781 }
782 #endif
783 #ifdef ENABLE_HTDUMP
ParseFPS(PbreaderDataSegment &dataSeg)784 void PbreaderParser::ParseFPS(PbreaderDataSegment &dataSeg)
785 {
786     dataSeg.dataType = DATA_SOURCE_TYPE_FPS;
787     dataSeg.status = TS_PARSE_STATUS_PARSED;
788 }
789 #endif
790 
791 #ifdef ENABLE_CPUDATA
ParseCpuUsage(PbreaderDataSegment &dataSeg)792 void PbreaderParser::ParseCpuUsage(PbreaderDataSegment &dataSeg)
793 {
794     dataSourceTypeCpuClockid_ = TS_CLOCK_REALTIME;
795     dataSeg.dataType = DATA_SOURCE_TYPE_CPU;
796     dataSeg.status = TS_PARSE_STATUS_PARSED;
797 }
798 #endif
799 #ifdef ENABLE_NETWORK
ParseNetwork(PbreaderDataSegment &dataSeg)800 void PbreaderParser::ParseNetwork(PbreaderDataSegment &dataSeg)
801 {
802     dataSourceTypeNetworkClockid_ = TS_CLOCK_REALTIME;
803     dataSeg.dataType = DATA_SOURCE_TYPE_NETWORK;
804     dataSeg.status = TS_PARSE_STATUS_PARSED;
805 }
806 #endif
ParseXpower(PbreaderDataSegment &dataSeg)807 void PbreaderParser::ParseXpower(PbreaderDataSegment &dataSeg)
808 {
809     dataSourceTypeNetworkClockid_ = TS_CLOCK_REALTIME;
810     dataSeg.dataType = DATA_SOURCE_TYPE_XPOWER;
811     dataSeg.status = TS_PARSE_STATUS_PARSED;
812 }
813 #ifdef ENABLE_DISKIO
ParseDiskIO(PbreaderDataSegment &dataSeg)814 void PbreaderParser::ParseDiskIO(PbreaderDataSegment &dataSeg)
815 {
816     dataSourceTypeDiskioClockid_ = TS_CLOCK_REALTIME;
817     dataSeg.dataType = DATA_SOURCE_TYPE_DISKIO;
818     dataSeg.status = TS_PARSE_STATUS_PARSED;
819 }
820 #endif
821 
822 #ifdef ENABLE_PROCESS
ParseProcess(PbreaderDataSegment &dataSeg)823 void PbreaderParser::ParseProcess(PbreaderDataSegment &dataSeg)
824 {
825     dataSourceTypeProcessClockid_ = TS_CLOCK_BOOTTIME;
826     dataSeg.dataType = DATA_SOURCE_TYPE_PROCESS;
827     dataSeg.status = TS_PARSE_STATUS_PARSED;
828 }
829 #endif
830 
831 #ifdef ENABLE_HISYSEVENT
ParseHisysevent(PbreaderDataSegment &dataSeg)832 void PbreaderParser::ParseHisysevent(PbreaderDataSegment &dataSeg)
833 {
834     dataSourceTypeHisyseventClockid_ = TS_CLOCK_REALTIME;
835     dataSeg.dataType = DATA_SOURCE_TYPE_HISYSEVENT;
836     dataSeg.status = TS_PARSE_STATUS_PARSED;
837 }
ParseHisyseventConfig(PbreaderDataSegment &dataSeg)838 void PbreaderParser::ParseHisyseventConfig(PbreaderDataSegment &dataSeg)
839 {
840     dataSourceTypeHisyseventClockid_ = TS_CLOCK_REALTIME;
841     dataSeg.dataType = DATA_SOURCE_TYPE_HISYSEVENT_CONFIG;
842     dataSeg.status = TS_PARSE_STATUS_PARSED;
843 }
844 #endif
845 
846 #ifdef ENABLE_ARKTS
ParseJSMemory(const ProtoReader::ProfilerPluginData_Reader &pluginDataZero, PbreaderDataSegment &dataSeg, bool isSplitFile)847 void PbreaderParser::ParseJSMemory(const ProtoReader::ProfilerPluginData_Reader &pluginDataZero,
848                                    PbreaderDataSegment &dataSeg,
849                                    bool isSplitFile)
850 {
851     if (isSplitFile) {
852         dataSourceType_ = DATA_SOURCE_TYPE_JSMEMORY;
853         profilerPluginData_.name = pluginDataZero.name().ToStdString();
854         profilerPluginData_.status = pluginDataZero.status();
855         profilerPluginData_.clockId = pluginDataZero.clock_id();
856         profilerPluginData_.tvSec = pluginDataZero.tv_sec();
857         profilerPluginData_.tvNsec = pluginDataZero.tv_nsec();
858         profilerPluginData_.version = pluginDataZero.version().ToStdString();
859         profilerPluginData_.sampleInterval = pluginDataZero.sample_interval();
860     }
861     dataSourceTypeJSMemoryClockid_ = TS_CLOCK_REALTIME;
862     dataSeg.dataType = DATA_SOURCE_TYPE_JSMEMORY;
863     dataSeg.status = TS_PARSE_STATUS_PARSED;
864 }
ParseJSMemoryConfig(PbreaderDataSegment &dataSeg)865 void PbreaderParser::ParseJSMemoryConfig(PbreaderDataSegment &dataSeg)
866 {
867     dataSourceTypeJSMemoryClockid_ = TS_CLOCK_REALTIME;
868     dataSeg.dataType = DATA_SOURCE_TYPE_JSMEMORY_CONFIG;
869     dataSeg.status = TS_PARSE_STATUS_PARSED;
870 }
871 #endif
872 
873 #ifdef ENABLE_STREAM_EXTEND
ParseStream(PbreaderDataSegment &dataSeg)874 void PbreaderParser::ParseStream(PbreaderDataSegment &dataSeg)
875 {
876     dataSeg.dataType = DATA_SOURCE_TYPE_STREAM;
877     dataSeg.status = TS_PARSE_STATUS_PARSED;
878 }
879 #endif
880 
GetNextSegment()881 int32_t PbreaderParser::GetNextSegment()
882 {
883     int32_t head;
884     std::lock_guard<std::mutex> muxLockGuard(pbreaderDataSegMux_);
885     head = parseHead_;
886     PbreaderDataSegment &pbreaderDataSegmentSeg = dataSegArray_[head];
887     if (pbreaderDataSegmentSeg.status.load() != TS_PARSE_STATUS_SEPRATED) {
888         if (toExit_) {
889             parserThreadCount_--;
890             TS_LOGI("exiting parser, parserThread Count:%d\n", parserThreadCount_);
891             TS_LOGI("seprateHead_x:\t%d, parseHead_:\t%d, filterHead_:\t%d status:%d\n", rawDataHead_, parseHead_,
892                     filterHead_, pbreaderDataSegmentSeg.status.load());
893             if (!parserThreadCount_ && !filterThreadStarted_) {
894                 exited_ = true;
895             }
896             return ERROR_CODE_EXIT;
897         }
898         usleep(sleepDur_);
899         return ERROR_CODE_NODATA;
900     }
901     parseHead_ = (parseHead_ + 1) % maxSegArraySize;
902     pbreaderDataSegmentSeg.status = TS_PARSE_STATUS_PARSING;
903     return head;
904 }
905 #ifdef ENABLE_EBPF
CalcEbpfCutOffset(std::deque<uint8_t>::iterator &packagesBegin, size_t &currentLength)906 bool PbreaderParser::CalcEbpfCutOffset(std::deque<uint8_t>::iterator &packagesBegin, size_t &currentLength)
907 {
908     auto standaloneDataLength = profilerDataLength_ - packetHeaderLength_;
909     if (traceDataCache_->isSplitFile_ && !parsedEbpfOver_) {
910         if (!hasInitEbpfPublicData_) {
911             // Record the offset of Hiperf's 1024-byte header relative to the entire file.
912             ebpfDataParser_->SetEbpfDataOffset(processedDataLen_);
913             ebpfDataParser_->SetSpliteTimeRange(traceDataCache_->SplitFileMinTime(),
914                                                 traceDataCache_->SplitFileMaxTime());
915             parsedFileOffset_ += profilerDataLength_ - packetHeaderLength_;
916             hasInitEbpfPublicData_ = true;
917         }
918         parsedEbpfOver_ = ebpfDataParser_->AddAndSplitEbpfData(packagesBuffer_);
919         if (parsedEbpfOver_) {
920             profilerDataType_ = ProfilerTraceFileHeader::UNKNOW_TYPE;
921             hasGotHeader_ = false;
922             processedDataLen_ += standaloneDataLength;
923         }
924         return false;
925     }
926     if (!traceDataCache_->isSplitFile_ && packagesBuffer_.size() >= standaloneDataLength) {
927         ebpfDataParser_->InitAndParseEbpfData(packagesBuffer_, standaloneDataLength);
928         currentLength -= standaloneDataLength;
929         packagesBegin += standaloneDataLength;
930         profilerDataType_ = ProfilerTraceFileHeader::UNKNOW_TYPE;
931         hasGotHeader_ = false;
932         return true;
933     }
934     return false;
935 }
936 #endif
937 
GetHeaderAndUpdateLengthMark(std::deque<uint8_t>::iterator &packagesBegin, size_t &currentLength)938 bool PbreaderParser::GetHeaderAndUpdateLengthMark(std::deque<uint8_t>::iterator &packagesBegin, size_t &currentLength)
939 {
940     if (!hasGotHeader_) {
941         if (!InitProfilerTraceFileHeader()) {
942             return false;
943         }
944         packagesBuffer_.erase(packagesBuffer_.begin(), packagesBuffer_.begin() + packetHeaderLength_);
945         processedDataLen_ += packetHeaderLength_;
946         currentLength -= packetHeaderLength_;
947         packagesBegin += packetHeaderLength_;
948         parsedFileOffset_ += packetHeaderLength_;
949         pbreaderCurentLength_ = profilerDataLength_ - packetHeaderLength_;
950         hasGotHeader_ = true;
951         if (!currentLength) {
952             return false;
953         }
954     }
955     return true;
956 }
957 #if IS_WASM
ParseSDKData()958 bool PbreaderParser::ParseSDKData()
959 {
960     if (packagesBuffer_.size() >= profilerDataLength_ - packetHeaderLength_) {
961         auto thirdPartySize = profilerDataLength_ - packetHeaderLength_;
962         auto buffer = std::make_unique<uint8_t[]>(thirdPartySize).get();
963         std::copy(packagesBuffer_.begin(), packagesBuffer_.begin() + thirdPartySize, buffer);
964         TraceStreamerPluginOutFilter(reinterpret_cast<const char *>(buffer), thirdPartySize, standalonePluginName_);
965         return true;
966     }
967     return false;
968 }
969 #endif
970 
ParseSegLengthAndEnsureSegDataEnough(std::deque<uint8_t>::iterator &packagesBegin, size_t &currentLength)971 bool PbreaderParser::ParseSegLengthAndEnsureSegDataEnough(std::deque<uint8_t>::iterator &packagesBegin,
972                                                           size_t &currentLength)
973 {
974     std::string bufferLine;
975     if (!hasGotSegLength_) {
976         if (currentLength < packetSegLength_) {
977             return false;
978         }
979         bufferLine.assign(packagesBegin, packagesBegin + packetSegLength_);
980         const uint32_t *len = reinterpret_cast<const uint32_t *>(bufferLine.data());
981         nextLength_ = *len;
982         lenBuffer_ = bufferLine;
983         pbreaderLength_ += nextLength_ + packetSegLength_;
984         hasGotSegLength_ = true;
985         currentLength -= packetSegLength_;
986         packagesBegin += packetSegLength_;
987         parsedFileOffset_ += packetSegLength_;
988         splitFileOffset_ = profilerDataLength_ - pbreaderCurentLength_;
989         pbreaderCurentLength_ -= packetSegLength_;
990     }
991     if (currentLength < nextLength_) {
992         return false;
993     }
994     return true;
995 }
ParseDataRecursively(std::deque<uint8_t>::iterator &packagesBegin, size_t &currentLength)996 bool PbreaderParser::ParseDataRecursively(std::deque<uint8_t>::iterator &packagesBegin, size_t &currentLength)
997 {
998     TS_CHECK_TRUE_RET(GetHeaderAndUpdateLengthMark(packagesBegin, currentLength), false);
999 #ifdef ENABLE_HIPERF
1000     if (profilerDataType_ == ProfilerTraceFileHeader::HIPERF_DATA) {
1001         return ParseHiperfData(packagesBegin, currentLength);
1002     }
1003 #endif
1004     if (profilerDataType_ == ProfilerTraceFileHeader::STANDALONE_DATA) {
1005         if (EBPF_PLUGIN_NAME.compare(standalonePluginName_) == 0) {
1006 #ifdef ENABLE_EBPF
1007             return CalcEbpfCutOffset(packagesBegin, currentLength);
1008 #else
1009             return false;
1010 #endif
1011         } else {
1012 #if IS_WASM
1013             TS_CHECK_TRUE_RET(ParseSDKData(), false); // 三方sdk逻辑待验证。
1014 #endif
1015         }
1016     }
1017     std::string bufferLine;
1018     while (true) {
1019         TS_CHECK_TRUE_RET(ParseSegLengthAndEnsureSegDataEnough(packagesBegin, currentLength), true);
1020         bufferLine.assign(packagesBegin, packagesBegin + nextLength_);
1021         ParseTraceDataItem(bufferLine);
1022         hasGotSegLength_ = false;
1023         packagesBegin += nextLength_;
1024         currentLength -= nextLength_;
1025         parsedFileOffset_ += nextLength_;
1026         if (nextLength_ > pbreaderCurentLength_) {
1027             TS_LOGE("fatal error, data length not match nextLength_:%u, pbreaderCurentLength_:%" PRIu64 "", nextLength_,
1028                     pbreaderCurentLength_);
1029         }
1030         pbreaderCurentLength_ -= nextLength_;
1031         if (pbreaderCurentLength_ == 0) {
1032             hasGotHeader_ = false;
1033             processedDataLen_ += packagesBegin - packagesBuffer_.begin();
1034             packagesBuffer_.erase(packagesBuffer_.begin(), packagesBegin);
1035             profilerDataType_ = ProfilerTraceFileHeader::UNKNOW_TYPE;
1036             TS_LOGD("read proto finished!");
1037             return ParseDataRecursively(packagesBegin, currentLength);
1038         }
1039     }
1040     return true;
1041 }
1042 
ParseTraceDataSegment(std::unique_ptr<uint8_t[]> bufferStr, size_t size, bool isFinish)1043 void PbreaderParser::ParseTraceDataSegment(std::unique_ptr<uint8_t[]> bufferStr, size_t size, bool isFinish)
1044 {
1045     packagesBuffer_.insert(packagesBuffer_.end(), &bufferStr[0], &bufferStr[size]);
1046     auto packagesBegin = packagesBuffer_.begin();
1047     auto currentLength = packagesBuffer_.size();
1048     if (ParseDataRecursively(packagesBegin, currentLength)) {
1049         processedDataLen_ += packagesBegin - packagesBuffer_.begin();
1050         packagesBuffer_.erase(packagesBuffer_.begin(), packagesBegin);
1051     }
1052     return;
1053 }
1054 #ifdef ENABLE_HIPERF
ParseHiperfData(std::deque<uint8_t>::iterator &packagesBegin, size_t &currentLength)1055 bool PbreaderParser::ParseHiperfData(std::deque<uint8_t>::iterator &packagesBegin, size_t &currentLength)
1056 {
1057     if (!traceDataCache_->isSplitFile_) {
1058         if (packagesBuffer_.size() >= profilerDataLength_ - packetHeaderLength_) {
1059             auto size = profilerDataLength_ - packetHeaderLength_;
1060             (void)perfDataParser_->InitPerfDataAndLoad(packagesBuffer_, size, processedDataLen_, false, true);
1061             currentLength -= size;
1062             packagesBegin += size;
1063             profilerDataType_ = ProfilerTraceFileHeader::UNKNOW_TYPE;
1064             hasGotHeader_ = false;
1065             return true;
1066         }
1067         return false;
1068     }
1069     bool isFinish = perfProcessedLen_ + packagesBuffer_.size() >= profilerDataLength_ - packetHeaderLength_;
1070     auto size = packagesBuffer_.size();
1071     if (isFinish) {
1072         size = profilerDataLength_ - packetHeaderLength_ - perfProcessedLen_;
1073     }
1074     auto ret = perfDataParser_->InitPerfDataAndLoad(packagesBuffer_, size, processedDataLen_, true, isFinish);
1075     perfProcessedLen_ += ret;
1076     currentLength -= ret;
1077     packagesBegin += ret;
1078     if (isFinish) {
1079         profilerDataType_ = ProfilerTraceFileHeader::UNKNOW_TYPE;
1080         hasGotHeader_ = false;
1081     }
1082     return true;
1083 }
StoreTraceDataSegment(std::unique_ptr<uint8_t[]> bufferStr, size_t size, int32_t isFinish)1084 void PbreaderParser::StoreTraceDataSegment(std::unique_ptr<uint8_t[]> bufferStr, size_t size, int32_t isFinish)
1085 {
1086     packagesBuffer_.insert(packagesBuffer_.end(), &bufferStr[0], &bufferStr[size]);
1087     if (!traceDataCache_->isSplitFile_) {
1088         return;
1089     }
1090 
1091     uint64_t length = packagesBuffer_.size();
1092     auto ret = perfDataParser_->InitPerfDataAndLoad(packagesBuffer_, length, 0, true, isFinish);
1093     perfProcessedLen_ += ret;
1094     processedDataLen_ += ret;
1095     packagesBuffer_.erase(packagesBuffer_.begin(), packagesBuffer_.begin() + ret);
1096     return;
1097 }
TraceDataSegmentEnd(bool isSplitFile)1098 void PbreaderParser::TraceDataSegmentEnd(bool isSplitFile)
1099 {
1100     perfDataParser_->InitPerfDataAndLoad(packagesBuffer_, packagesBuffer_.size(), 0, isSplitFile, true);
1101     packagesBuffer_.clear();
1102     return;
1103 }
1104 #endif
1105 
InitProfilerTraceFileHeader()1106 bool PbreaderParser::InitProfilerTraceFileHeader()
1107 {
1108     if (packagesBuffer_.size() < packetHeaderLength_) {
1109         TS_LOGI("buffer size less than profiler trace file header");
1110         return false;
1111     }
1112     uint8_t buffer[packetHeaderLength_];
1113     (void)memset_s(buffer, sizeof(buffer), 0, sizeof(buffer));
1114     int32_t i = 0;
1115     for (auto it = packagesBuffer_.begin(); it != packagesBuffer_.begin() + packetHeaderLength_; ++it, ++i) {
1116         buffer[i] = *it;
1117     }
1118     ProfilerTraceFileHeader *pHeader = reinterpret_cast<ProfilerTraceFileHeader *>(buffer);
1119     if (pHeader->data.length <= packetHeaderLength_ || pHeader->data.magic != ProfilerTraceFileHeader::HEADER_MAGIC) {
1120         TS_LOGE("Profiler Trace data is truncated or invalid magic! len = %" PRIu64 ", maigc = %" PRIx64 "",
1121                 pHeader->data.length, pHeader->data.magic);
1122         return false;
1123     }
1124     if (pHeader->data.dataType == ProfilerTraceFileHeader::HIPERF_DATA) {
1125 #ifdef ENABLE_HIPERF
1126         perfDataParser_->RecordPerfProfilerHeader(buffer, packetHeaderLength_);
1127 #endif
1128     } else if (pHeader->data.dataType == ProfilerTraceFileHeader::STANDALONE_DATA &&
1129                EBPF_PLUGIN_NAME.compare(pHeader->data.standalonePluginName) == 0) {
1130 #ifdef ENABLE_EBPF
1131         ebpfDataParser_->RecordEbpfProfilerHeader(buffer, packetHeaderLength_);
1132 #endif
1133     } else {
1134         auto ret = memcpy_s(&profilerTraceFileHeader_, sizeof(profilerTraceFileHeader_), buffer, packetHeaderLength_);
1135         if (ret == -1 || profilerTraceFileHeader_.data.magic != ProfilerTraceFileHeader::HEADER_MAGIC) {
1136             TS_LOGE("Get profiler trace file header failed! ret = %d, magic = %" PRIx64 "", ret,
1137                     profilerTraceFileHeader_.data.magic);
1138             return false;
1139         }
1140     }
1141     profilerDataLength_ = pHeader->data.length;
1142     profilerDataType_ = pHeader->data.dataType;
1143     memcpy_s(standalonePluginName_, sizeof(standalonePluginName_), pHeader->data.standalonePluginName,
1144              sizeof(standalonePluginName_));
1145 
1146     TS_LOGI("magic = %" PRIx64 ", length = %" PRIu64 ", dataType = %x, boottime = %" PRIu64 "", pHeader->data.magic,
1147             pHeader->data.length, pHeader->data.dataType, pHeader->data.boottime);
1148 #if IS_WASM
1149     const int32_t DATA_TYPE_CLOCK = 100;
1150     TraceStreamerPluginOutSendData(reinterpret_cast<char *>(buffer), packetHeaderLength_, DATA_TYPE_CLOCK);
1151 #endif
1152     pbreaderClockDetailParser_->Parse(pHeader);
1153     return true;
1154 }
1155 
1156 #if defined(ENABLE_HTRACE) && defined(ENABLE_NATIVE_HOOK) && defined(ENABLE_HIPERF)
ParseNapiAsync()1157 void PbreaderParser::ParseNapiAsync()
1158 {
1159     // 将native memory中存在的traceid取出, 并记录其对应的callstackid
1160     std::unordered_map<std::string, uint32_t> traceidToCallchainidMap;
1161     GetTraceidInfoFromNativeHook(traceidToCallchainidMap);
1162 
1163     // 从callstack表中获取所有的traceid, 根据其所属的itid将SliceInfo存入对应queue
1164     std::unordered_map<uint64_t, std::queue<SliceInfo>> itidToCallstackIdsMap;
1165     GetTraceidInfoFromCallstack(traceidToCallchainidMap, itidToCallstackIdsMap);
1166 
1167     // 筛选出包含NativeAsyncWork::AsyncWorkCallback的函数栈的callchainid, 将其存入callchainIdSet
1168     std::unordered_set<uint32_t> callchainIdSet;
1169     GetCallchainIdSetFromHiperf(callchainIdSet);
1170 
1171     DumpDataFromHiperf(traceidToCallchainidMap, callchainIdSet, itidToCallstackIdsMap);
1172 }
1173 
GetTraceidInfoFromNativeHook(std::unordered_map<std::string, uint32_t> &traceidToCallchainidMap)1174 void PbreaderParser::GetTraceidInfoFromNativeHook(std::unordered_map<std::string, uint32_t> &traceidToCallchainidMap)
1175 {
1176     auto nativeHook = traceDataCache_->GetConstNativeHookData();
1177     std::string preWord("napi:");
1178     for (int i = 0; i < nativeHook.Size(); i++) {
1179         auto subType = nativeHook.SubTypes()[i];
1180         if (subType == INVALID_UINT64) {
1181             continue;
1182         }
1183         auto subTypeStr = traceDataCache_->GetDataFromDict(subType);
1184         if (!StartWith(subTypeStr, preWord)) {
1185             continue;
1186         }
1187         auto pos = subTypeStr.find(preWord) + preWord.size();
1188         auto traceidStr = subTypeStr.substr(pos, subTypeStr.find_last_of(':') - pos);
1189         auto traceidIndex = traceDataCache_->GetDataIndex(traceidStr);
1190         traceidToCallchainidMap.emplace(std::move(traceidStr), nativeHook.CallChainIds()[i]);
1191     }
1192 }
1193 
GetTraceidInfoFromCallstack( const std::unordered_map<std::string, uint32_t> &traceidToCallchainidMap, std::unordered_map<uint64_t, std::queue<SliceInfo>> &itidToCallstackIdsMap)1194 void PbreaderParser::GetTraceidInfoFromCallstack(
1195     const std::unordered_map<std::string, uint32_t> &traceidToCallchainidMap,
1196     std::unordered_map<uint64_t, std::queue<SliceInfo>> &itidToCallstackIdsMap)
1197 {
1198     auto callStack = traceDataCache_->GetConstInternalSlicesData();
1199     std::string preWord("traceid:");
1200     std::string invalidTraceidStr("0x0");
1201     for (int i = 0; i < callStack.Size(); i++) {
1202         auto name = traceDataCache_->GetDataFromDict(callStack.NamesData()[i]);
1203         if (!StartWith(name, "H:Napi execute, name:") || callStack.DursData()[i] == INVALID_UINT64) {
1204             continue;
1205         }
1206         auto traceidStr = name.substr(name.find(preWord) + preWord.size());
1207         if (traceidStr == invalidTraceidStr) {
1208             continue;
1209         }
1210         if (traceidToCallchainidMap.find(traceidStr) == traceidToCallchainidMap.end()) {
1211             continue;
1212         }
1213         auto iter = itidToCallstackIdsMap.find(callStack.CallIds()[i]);
1214         if (iter == itidToCallstackIdsMap.end()) {
1215             itidToCallstackIdsMap.emplace(callStack.CallIds()[i], std::queue<SliceInfo>());
1216             iter = itidToCallstackIdsMap.find(callStack.CallIds()[i]);
1217         }
1218         iter->second.emplace(callStack.TimeStampData()[i], callStack.TimeStampData()[i] + callStack.DursData()[i],
1219                              traceidStr);
1220     }
1221 }
1222 
GetCallchainIdSetFromHiperf(std::unordered_set<uint32_t> &callchainIdSet)1223 void PbreaderParser::GetCallchainIdSetFromHiperf(std::unordered_set<uint32_t> &callchainIdSet)
1224 {
1225     auto perfCallChain = traceDataCache_->GetConstPerfCallChainData();
1226     std::string asyncWork("NativeAsyncWork::AsyncWorkCallback");
1227     for (int i = 0; i < perfCallChain.Size(); i++) {
1228         auto callchainId = perfCallChain.CallChainIds()[i];
1229         if (callchainIdSet.find(callchainId) != callchainIdSet.end()) {
1230             continue;
1231         }
1232         auto nameIndex = perfCallChain.Names()[i];
1233         if (nameIndex == INVALID_UINT64) {
1234             continue;
1235         }
1236         auto name = traceDataCache_->GetDataFromDict(nameIndex);
1237         if (name.find(asyncWork) != std::string::npos) {
1238             callchainIdSet.emplace(perfCallChain.CallChainIds()[i]);
1239         }
1240     }
1241 }
1242 
DumpDataFromHiperf(const std::unordered_map<std::string, uint32_t> &traceidToCallchainidMap, const std::unordered_set<uint32_t> &callchainIdSet, std::unordered_map<uint64_t, std::queue<SliceInfo>> &itidToCallstackIdsMap)1243 void PbreaderParser::DumpDataFromHiperf(const std::unordered_map<std::string, uint32_t> &traceidToCallchainidMap,
1244                                         const std::unordered_set<uint32_t> &callchainIdSet,
1245                                         std::unordered_map<uint64_t, std::queue<SliceInfo>> &itidToCallstackIdsMap)
1246 {
1247     auto perfThread = traceDataCache_->GetConstPerfThreadData();
1248     std::unordered_map<uint32_t, uint32_t> tidToPidMap;
1249     for (size_t i = 0; i < perfThread.Size(); i++) {
1250         tidToPidMap.emplace(perfThread.Tids()[i], perfThread.Pids()[i]);
1251     }
1252     auto perfSample = traceDataCache_->GetConstPerfSampleData();
1253     auto callStack = traceDataCache_->GetConstInternalSlicesData();
1254     for (size_t i = 0; i < perfSample.Size(); i++) {
1255         // callchainid未命中, 即当前栈不包含NativeAsyncWork::AsyncWorkCallback
1256         if (callchainIdSet.find(perfSample.SampleIds()[i]) == callchainIdSet.end()) {
1257             continue;
1258         }
1259         // 根据tid和tsPerfSample查询对应的SliceInfo
1260         auto itid = streamFilters_->processFilter_->GetInternalTid(perfSample.Tids()[i]);
1261         if (itidToCallstackIdsMap.find(itid) == itidToCallstackIdsMap.end()) {
1262             continue;
1263         }
1264         auto tsPerfSample = perfSample.TimestampTraces()[i];
1265         auto queue = itidToCallstackIdsMap.at(itid);
1266         while (!queue.empty() && queue.front().tsEnd_ < tsPerfSample) {
1267             queue.pop();
1268         }
1269         if (queue.empty() || tsPerfSample < queue.front().tsBegin_) {
1270             continue;
1271         }
1272         // 根据traceid查询native侧的callchainid
1273         auto iterNative = traceidToCallchainidMap.find(queue.front().traceid_);
1274         if (iterNative == traceidToCallchainidMap.end()) {
1275             continue;
1276         }
1277         // 根据tid查询pid
1278         auto iterPid = tidToPidMap.find(perfSample.Tids()[i]);
1279         if (iterPid == tidToPidMap.end()) {
1280             continue;
1281         }
1282         PerfNapiAsyncRow perfNapiAsyncRow{
1283             .timeStamp = tsPerfSample,
1284             .traceid = traceDataCache_->GetDataIndex(queue.front().traceid_),
1285             .cpuId = static_cast<uint8_t>(perfSample.CpuIds()[i]),
1286             .threadId = perfSample.Tids()[i],
1287             .processId = iterPid->second,
1288             .callerCallchainid = iterNative->second,
1289             .calleeCallchainid = perfSample.SampleIds()[i],
1290             .perfSampleId = i,
1291             .eventCount = perfSample.EventCounts()[i],
1292             .eventTypeId = perfSample.EventTypeIds()[i],
1293         };
1294         traceDataCache_->GetPerfNapiAsyncData()->AppendNewPerfNapiAsync(perfNapiAsyncRow);
1295     }
1296 }
1297 #endif
1298 } // namespace TraceStreamer
1299 } // namespace SysTuning
1300