1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19from ohos.parser import * 20 21__all__ = ["OHYaraTestParser"] 22 23LOG = platform_logger("OHYaraTestParser") 24 25 26@Plugin(type=Plugin.PARSER, id=CommonParserType.oh_yara) 27class OHYaraTestParser(IParser): 28 last_line = "" 29 pattern = r"(\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}\.\d{3}) " 30 31 def __init__(self): 32 self.state_machine = StateRecorder() 33 self.suites_name = "" 34 self.vul_items = None 35 self.listeners = [] 36 37 def get_listeners(self): 38 return self.listeners 39 40 def __process__(self, lines): 41 self.parse(lines) 42 43 def __done__(self): 44 pass 45 46 def parse(self, lines): 47 for line in lines: 48 if line: 49 self.handle_suites_started_tag() 50 self.handle_suite_started_tag() 51 self.handle_one_test_tag(line) 52 self.handle_suite_ended_tag() 53 self.handle_suites_ended_tag() 54 55 def handle_suites_started_tag(self): 56 self.state_machine.get_suites(reset=True) 57 test_suites = self.state_machine.get_suites() 58 test_suites.suites_name = self.suites_name 59 test_suites.test_num = len(self.vul_items) 60 for listener in self.get_listeners(): 61 suite_report = copy.copy(test_suites) 62 listener.__started__(LifeCycle.TestSuites, suite_report) 63 64 def handle_suites_ended_tag(self): 65 suites = self.state_machine.get_suites() 66 suites.is_completed = True 67 for listener in self.get_listeners(): 68 listener.__ended__(LifeCycle.TestSuites, test_result=suites, 69 suites_name=suites.suites_name) 70 71 def handle_one_test_tag(self, message): 72 status_dict = {"pass": ResultCode.PASSED, "fail": ResultCode.FAILED, 73 "block": ResultCode.BLOCKED} 74 message = message.strip().split("|") 75 test_name = message[0] 76 status = status_dict.get(message[3]) 77 trace = message[6] if message[3] else "" 78 run_time = 0 79 test_suite = self.state_machine.suite() 80 test_result = self.state_machine.test(reset=True) 81 test_result.test_class = test_suite.suite_name 82 test_result.test_name = test_name 83 test_result.run_time = run_time 84 test_result.code = status.value 85 test_result.stacktrace = trace 86 test_result.current = self.state_machine.running_test_index + 1 87 self.state_machine.suite().run_time += run_time 88 for listener in self.get_listeners(): 89 test_result = copy.copy(test_result) 90 listener.__started__(LifeCycle.TestCase, test_result) 91 92 test_suites = self.state_machine.get_suites() 93 self.state_machine.test().is_completed = True 94 test_suites.test_num += 1 95 for listener in self.get_listeners(): 96 result = copy.copy(test_result) 97 listener.__ended__(LifeCycle.TestCase, result) 98 self.state_machine.running_test_index += 1 99 100 def handle_suite_started_tag(self): 101 self.state_machine.suite(reset=True) 102 self.state_machine.running_test_index = 0 103 test_suite = self.state_machine.suite() 104 test_suite.suite_name = self.suites_name 105 test_suite.test_num = 1 106 for listener in self.get_listeners(): 107 suite_report = copy.copy(test_suite) 108 listener.__started__(LifeCycle.TestSuite, suite_report) 109 110 def handle_suite_ended_tag(self): 111 suite_result = self.state_machine.suite() 112 suites = self.state_machine.get_suites() 113 suite_result.run_time = suite_result.run_time 114 suites.run_time += suite_result.run_time 115 suite_result.is_completed = True 116 for listener in self.get_listeners(): 117 suite = copy.copy(suite_result) 118 listener.__ended__(LifeCycle.TestSuite, suite, is_clear=True) 119