1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19from ohos.parser import *
20
21__all__ = ["OHYaraTestParser"]
22
23LOG = platform_logger("OHYaraTestParser")
24
25
26@Plugin(type=Plugin.PARSER, id=CommonParserType.oh_yara)
27class OHYaraTestParser(IParser):
28    last_line = ""
29    pattern = r"(\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}\.\d{3}) "
30
31    def __init__(self):
32        self.state_machine = StateRecorder()
33        self.suites_name = ""
34        self.vul_items = None
35        self.listeners = []
36
37    def get_listeners(self):
38        return self.listeners
39
40    def __process__(self, lines):
41        self.parse(lines)
42
43    def __done__(self):
44        pass
45
46    def parse(self, lines):
47        for line in lines:
48            if line:
49                self.handle_suites_started_tag()
50                self.handle_suite_started_tag()
51                self.handle_one_test_tag(line)
52                self.handle_suite_ended_tag()
53                self.handle_suites_ended_tag()
54
55    def handle_suites_started_tag(self):
56        self.state_machine.get_suites(reset=True)
57        test_suites = self.state_machine.get_suites()
58        test_suites.suites_name = self.suites_name
59        test_suites.test_num = len(self.vul_items)
60        for listener in self.get_listeners():
61            suite_report = copy.copy(test_suites)
62            listener.__started__(LifeCycle.TestSuites, suite_report)
63
64    def handle_suites_ended_tag(self):
65        suites = self.state_machine.get_suites()
66        suites.is_completed = True
67        for listener in self.get_listeners():
68            listener.__ended__(LifeCycle.TestSuites, test_result=suites,
69                               suites_name=suites.suites_name)
70
71    def handle_one_test_tag(self, message):
72        status_dict = {"pass": ResultCode.PASSED, "fail": ResultCode.FAILED,
73                       "block": ResultCode.BLOCKED}
74        message = message.strip().split("|")
75        test_name = message[0]
76        status = status_dict.get(message[3])
77        trace = message[6] if message[3] else ""
78        run_time = 0
79        test_suite = self.state_machine.suite()
80        test_result = self.state_machine.test(reset=True)
81        test_result.test_class = test_suite.suite_name
82        test_result.test_name = test_name
83        test_result.run_time = run_time
84        test_result.code = status.value
85        test_result.stacktrace = trace
86        test_result.current = self.state_machine.running_test_index + 1
87        self.state_machine.suite().run_time += run_time
88        for listener in self.get_listeners():
89            test_result = copy.copy(test_result)
90            listener.__started__(LifeCycle.TestCase, test_result)
91
92        test_suites = self.state_machine.get_suites()
93        self.state_machine.test().is_completed = True
94        test_suites.test_num += 1
95        for listener in self.get_listeners():
96            result = copy.copy(test_result)
97            listener.__ended__(LifeCycle.TestCase, result)
98        self.state_machine.running_test_index += 1
99
100    def handle_suite_started_tag(self):
101        self.state_machine.suite(reset=True)
102        self.state_machine.running_test_index = 0
103        test_suite = self.state_machine.suite()
104        test_suite.suite_name = self.suites_name
105        test_suite.test_num = 1
106        for listener in self.get_listeners():
107            suite_report = copy.copy(test_suite)
108            listener.__started__(LifeCycle.TestSuite, suite_report)
109
110    def handle_suite_ended_tag(self):
111        suite_result = self.state_machine.suite()
112        suites = self.state_machine.get_suites()
113        suite_result.run_time = suite_result.run_time
114        suites.run_time += suite_result.run_time
115        suite_result.is_completed = True
116        for listener in self.get_listeners():
117            suite = copy.copy(suite_result)
118            listener.__ended__(LifeCycle.TestSuite, suite, is_clear=True)
119