xref: /third_party/python/Lib/test/test_dtrace.py (revision 7db96d56)
17db96d56Sopenharmony_ciimport dis
27db96d56Sopenharmony_ciimport os.path
37db96d56Sopenharmony_ciimport re
47db96d56Sopenharmony_ciimport subprocess
57db96d56Sopenharmony_ciimport sys
67db96d56Sopenharmony_ciimport types
77db96d56Sopenharmony_ciimport unittest
87db96d56Sopenharmony_ci
97db96d56Sopenharmony_cifrom test import support
107db96d56Sopenharmony_cifrom test.support import findfile
117db96d56Sopenharmony_ci
127db96d56Sopenharmony_ci
137db96d56Sopenharmony_ciif not support.has_subprocess_support:
147db96d56Sopenharmony_ci    raise unittest.SkipTest("test module requires subprocess")
157db96d56Sopenharmony_ci
167db96d56Sopenharmony_ci
177db96d56Sopenharmony_cidef abspath(filename):
187db96d56Sopenharmony_ci    return os.path.abspath(findfile(filename, subdir="dtracedata"))
197db96d56Sopenharmony_ci
207db96d56Sopenharmony_ci
217db96d56Sopenharmony_cidef normalize_trace_output(output):
227db96d56Sopenharmony_ci    """Normalize DTrace output for comparison.
237db96d56Sopenharmony_ci
247db96d56Sopenharmony_ci    DTrace keeps a per-CPU buffer, and when showing the fired probes, buffers
257db96d56Sopenharmony_ci    are concatenated. So if the operating system moves our thread around, the
267db96d56Sopenharmony_ci    straight result can be "non-causal". So we add timestamps to the probe
277db96d56Sopenharmony_ci    firing, sort by that field, then strip it from the output"""
287db96d56Sopenharmony_ci
297db96d56Sopenharmony_ci    # When compiling with '--with-pydebug', strip '[# refs]' debug output.
307db96d56Sopenharmony_ci    output = re.sub(r"\[[0-9]+ refs\]", "", output)
317db96d56Sopenharmony_ci    try:
327db96d56Sopenharmony_ci        result = [
337db96d56Sopenharmony_ci            row.split("\t")
347db96d56Sopenharmony_ci            for row in output.splitlines()
357db96d56Sopenharmony_ci            if row and not row.startswith('#')
367db96d56Sopenharmony_ci        ]
377db96d56Sopenharmony_ci        result.sort(key=lambda row: int(row[0]))
387db96d56Sopenharmony_ci        result = [row[1] for row in result]
397db96d56Sopenharmony_ci        return "\n".join(result)
407db96d56Sopenharmony_ci    except (IndexError, ValueError):
417db96d56Sopenharmony_ci        raise AssertionError(
427db96d56Sopenharmony_ci            "tracer produced unparsable output:\n{}".format(output)
437db96d56Sopenharmony_ci        )
447db96d56Sopenharmony_ci
457db96d56Sopenharmony_ci
467db96d56Sopenharmony_ciclass TraceBackend:
477db96d56Sopenharmony_ci    EXTENSION = None
487db96d56Sopenharmony_ci    COMMAND = None
497db96d56Sopenharmony_ci    COMMAND_ARGS = []
507db96d56Sopenharmony_ci
517db96d56Sopenharmony_ci    def run_case(self, name, optimize_python=None):
527db96d56Sopenharmony_ci        actual_output = normalize_trace_output(self.trace_python(
537db96d56Sopenharmony_ci            script_file=abspath(name + self.EXTENSION),
547db96d56Sopenharmony_ci            python_file=abspath(name + ".py"),
557db96d56Sopenharmony_ci            optimize_python=optimize_python))
567db96d56Sopenharmony_ci
577db96d56Sopenharmony_ci        with open(abspath(name + self.EXTENSION + ".expected")) as f:
587db96d56Sopenharmony_ci            expected_output = f.read().rstrip()
597db96d56Sopenharmony_ci
607db96d56Sopenharmony_ci        return (expected_output, actual_output)
617db96d56Sopenharmony_ci
627db96d56Sopenharmony_ci    def generate_trace_command(self, script_file, subcommand=None):
637db96d56Sopenharmony_ci        command = self.COMMAND + [script_file]
647db96d56Sopenharmony_ci        if subcommand:
657db96d56Sopenharmony_ci            command += ["-c", subcommand]
667db96d56Sopenharmony_ci        return command
677db96d56Sopenharmony_ci
687db96d56Sopenharmony_ci    def trace(self, script_file, subcommand=None):
697db96d56Sopenharmony_ci        command = self.generate_trace_command(script_file, subcommand)
707db96d56Sopenharmony_ci        stdout, _ = subprocess.Popen(command,
717db96d56Sopenharmony_ci                                     stdout=subprocess.PIPE,
727db96d56Sopenharmony_ci                                     stderr=subprocess.STDOUT,
737db96d56Sopenharmony_ci                                     universal_newlines=True).communicate()
747db96d56Sopenharmony_ci        return stdout
757db96d56Sopenharmony_ci
767db96d56Sopenharmony_ci    def trace_python(self, script_file, python_file, optimize_python=None):
777db96d56Sopenharmony_ci        python_flags = []
787db96d56Sopenharmony_ci        if optimize_python:
797db96d56Sopenharmony_ci            python_flags.extend(["-O"] * optimize_python)
807db96d56Sopenharmony_ci        subcommand = " ".join([sys.executable] + python_flags + [python_file])
817db96d56Sopenharmony_ci        return self.trace(script_file, subcommand)
827db96d56Sopenharmony_ci
837db96d56Sopenharmony_ci    def assert_usable(self):
847db96d56Sopenharmony_ci        try:
857db96d56Sopenharmony_ci            output = self.trace(abspath("assert_usable" + self.EXTENSION))
867db96d56Sopenharmony_ci            output = output.strip()
877db96d56Sopenharmony_ci        except (FileNotFoundError, NotADirectoryError, PermissionError) as fnfe:
887db96d56Sopenharmony_ci            output = str(fnfe)
897db96d56Sopenharmony_ci        if output != "probe: success":
907db96d56Sopenharmony_ci            raise unittest.SkipTest(
917db96d56Sopenharmony_ci                "{}(1) failed: {}".format(self.COMMAND[0], output)
927db96d56Sopenharmony_ci            )
937db96d56Sopenharmony_ci
947db96d56Sopenharmony_ci
957db96d56Sopenharmony_ciclass DTraceBackend(TraceBackend):
967db96d56Sopenharmony_ci    EXTENSION = ".d"
977db96d56Sopenharmony_ci    COMMAND = ["dtrace", "-q", "-s"]
987db96d56Sopenharmony_ci
997db96d56Sopenharmony_ci
1007db96d56Sopenharmony_ciclass SystemTapBackend(TraceBackend):
1017db96d56Sopenharmony_ci    EXTENSION = ".stp"
1027db96d56Sopenharmony_ci    COMMAND = ["stap", "-g"]
1037db96d56Sopenharmony_ci
1047db96d56Sopenharmony_ci
1057db96d56Sopenharmony_ciclass TraceTests:
1067db96d56Sopenharmony_ci    # unittest.TestCase options
1077db96d56Sopenharmony_ci    maxDiff = None
1087db96d56Sopenharmony_ci
1097db96d56Sopenharmony_ci    # TraceTests options
1107db96d56Sopenharmony_ci    backend = None
1117db96d56Sopenharmony_ci    optimize_python = 0
1127db96d56Sopenharmony_ci
1137db96d56Sopenharmony_ci    @classmethod
1147db96d56Sopenharmony_ci    def setUpClass(self):
1157db96d56Sopenharmony_ci        self.backend.assert_usable()
1167db96d56Sopenharmony_ci
1177db96d56Sopenharmony_ci    def run_case(self, name):
1187db96d56Sopenharmony_ci        actual_output, expected_output = self.backend.run_case(
1197db96d56Sopenharmony_ci            name, optimize_python=self.optimize_python)
1207db96d56Sopenharmony_ci        self.assertEqual(actual_output, expected_output)
1217db96d56Sopenharmony_ci
1227db96d56Sopenharmony_ci    def test_function_entry_return(self):
1237db96d56Sopenharmony_ci        self.run_case("call_stack")
1247db96d56Sopenharmony_ci
1257db96d56Sopenharmony_ci    def test_verify_call_opcodes(self):
1267db96d56Sopenharmony_ci        """Ensure our call stack test hits all function call opcodes"""
1277db96d56Sopenharmony_ci
1287db96d56Sopenharmony_ci        opcodes = set(["CALL_FUNCTION", "CALL_FUNCTION_EX", "CALL_FUNCTION_KW"])
1297db96d56Sopenharmony_ci
1307db96d56Sopenharmony_ci        with open(abspath("call_stack.py")) as f:
1317db96d56Sopenharmony_ci            code_string = f.read()
1327db96d56Sopenharmony_ci
1337db96d56Sopenharmony_ci        def get_function_instructions(funcname):
1347db96d56Sopenharmony_ci            # Recompile with appropriate optimization setting
1357db96d56Sopenharmony_ci            code = compile(source=code_string,
1367db96d56Sopenharmony_ci                           filename="<string>",
1377db96d56Sopenharmony_ci                           mode="exec",
1387db96d56Sopenharmony_ci                           optimize=self.optimize_python)
1397db96d56Sopenharmony_ci
1407db96d56Sopenharmony_ci            for c in code.co_consts:
1417db96d56Sopenharmony_ci                if isinstance(c, types.CodeType) and c.co_name == funcname:
1427db96d56Sopenharmony_ci                    return dis.get_instructions(c)
1437db96d56Sopenharmony_ci            return []
1447db96d56Sopenharmony_ci
1457db96d56Sopenharmony_ci        for instruction in get_function_instructions('start'):
1467db96d56Sopenharmony_ci            opcodes.discard(instruction.opname)
1477db96d56Sopenharmony_ci
1487db96d56Sopenharmony_ci        self.assertEqual(set(), opcodes)
1497db96d56Sopenharmony_ci
1507db96d56Sopenharmony_ci    def test_gc(self):
1517db96d56Sopenharmony_ci        self.run_case("gc")
1527db96d56Sopenharmony_ci
1537db96d56Sopenharmony_ci    def test_line(self):
1547db96d56Sopenharmony_ci        self.run_case("line")
1557db96d56Sopenharmony_ci
1567db96d56Sopenharmony_ci
1577db96d56Sopenharmony_ciclass DTraceNormalTests(TraceTests, unittest.TestCase):
1587db96d56Sopenharmony_ci    backend = DTraceBackend()
1597db96d56Sopenharmony_ci    optimize_python = 0
1607db96d56Sopenharmony_ci
1617db96d56Sopenharmony_ci
1627db96d56Sopenharmony_ciclass DTraceOptimizedTests(TraceTests, unittest.TestCase):
1637db96d56Sopenharmony_ci    backend = DTraceBackend()
1647db96d56Sopenharmony_ci    optimize_python = 2
1657db96d56Sopenharmony_ci
1667db96d56Sopenharmony_ci
1677db96d56Sopenharmony_ciclass SystemTapNormalTests(TraceTests, unittest.TestCase):
1687db96d56Sopenharmony_ci    backend = SystemTapBackend()
1697db96d56Sopenharmony_ci    optimize_python = 0
1707db96d56Sopenharmony_ci
1717db96d56Sopenharmony_ci
1727db96d56Sopenharmony_ciclass SystemTapOptimizedTests(TraceTests, unittest.TestCase):
1737db96d56Sopenharmony_ci    backend = SystemTapBackend()
1747db96d56Sopenharmony_ci    optimize_python = 2
1757db96d56Sopenharmony_ci
1767db96d56Sopenharmony_ci
1777db96d56Sopenharmony_ciif __name__ == '__main__':
1787db96d56Sopenharmony_ci    unittest.main()
179