1#!/usr/bin/env python3 2# Copyright 2019 the V8 project authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import os 7import sys 8import tempfile 9import unittest 10 11# Needed because the test runner contains relative imports. 12TOOLS_PATH = os.path.dirname( 13 os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) 14sys.path.append(TOOLS_PATH) 15 16from testrunner.testproc import base 17from testrunner.testproc.variant import VariantProc 18 19 20class FakeResultObserver(base.TestProcObserver): 21 22 def __init__(self): 23 super(FakeResultObserver, self).__init__() 24 25 self.results = set() 26 27 def result_for(self, test, result): 28 self.results.add((test, result)) 29 30 31class FakeFilter(base.TestProcFilter): 32 33 def __init__(self, filter_predicate): 34 super(FakeFilter, self).__init__() 35 36 self._filter_predicate = filter_predicate 37 38 self.loaded = set() 39 self.call_counter = 0 40 41 def next_test(self, test): 42 self.call_counter += 1 43 44 if self._filter_predicate(test): 45 return False 46 47 self.loaded.add(test) 48 return True 49 50 51class FakeSuite(object): 52 53 def __init__(self, name): 54 self.name = name 55 56 57class FakeTest(object): 58 59 def __init__(self, procid): 60 self.suite = FakeSuite("fake_suite") 61 self.procid = procid 62 63 self.keep_output = False 64 65 def create_subtest(self, proc, subtest_id, **kwargs): 66 variant = kwargs['variant'] 67 68 variant.origin = self 69 return variant 70 71 72class FakeVariantGen(object): 73 74 def __init__(self, variants): 75 self._variants = variants 76 77 def gen(self, test): 78 for variant in self._variants: 79 yield variant, [], "fake_suffix" 80 81 82class TestVariantProcLoading(unittest.TestCase): 83 84 def setUp(self): 85 self.test = FakeTest("test") 86 87 def _simulate_proc(self, variants): 88 """Expects the list of instantiated test variants to load into the 89 VariantProc.""" 90 variants_mapping = {self.test: variants} 91 92 # Creates a Variant processor containing the possible types of test 93 # variants. 94 self.variant_proc = VariantProc(variants=["to_filter", "to_load"]) 95 self.variant_proc._variant_gens = {"fake_suite": FakeVariantGen(variants)} 96 97 # FakeFilter only lets tests passing the predicate to be loaded. 98 self.fake_filter = FakeFilter( 99 filter_predicate=(lambda t: t.procid == "to_filter")) 100 101 # FakeResultObserver to verify that VariantProc calls result_for correctly. 102 self.fake_result_observer = FakeResultObserver() 103 104 # Links up processors together to form a test processing pipeline. 105 self.variant_proc._prev_proc = self.fake_result_observer 106 self.fake_filter._prev_proc = self.variant_proc 107 self.variant_proc._next_proc = self.fake_filter 108 109 # Injects the test into the VariantProc 110 is_loaded = self.variant_proc.next_test(self.test) 111 112 # Verifies the behavioral consistency by using the instrumentation in 113 # FakeFilter 114 loaded_variants = list(self.fake_filter.loaded) 115 self.assertEqual(is_loaded, any(loaded_variants)) 116 return self.fake_filter.loaded, self.fake_filter.call_counter 117 118 def test_filters_first_two_variants(self): 119 variants = [ 120 FakeTest('to_filter'), 121 FakeTest('to_filter'), 122 FakeTest('to_load'), 123 FakeTest('to_load'), 124 ] 125 expected_load_results = {variants[2]} 126 127 load_results, call_count = self._simulate_proc(variants) 128 129 self.assertSetEqual(expected_load_results, load_results) 130 self.assertEqual(call_count, 3) 131 132 def test_stops_loading_after_first_successful_load(self): 133 variants = [ 134 FakeTest('to_load'), 135 FakeTest('to_load'), 136 FakeTest('to_filter'), 137 ] 138 expected_load_results = {variants[0]} 139 140 loaded_tests, call_count = self._simulate_proc(variants) 141 142 self.assertSetEqual(expected_load_results, loaded_tests) 143 self.assertEqual(call_count, 1) 144 145 def test_return_result_when_out_of_variants(self): 146 variants = [ 147 FakeTest('to_filter'), 148 FakeTest('to_load'), 149 ] 150 151 self._simulate_proc(variants) 152 153 self.variant_proc.result_for(variants[1], None) 154 155 expected_results = {(self.test, None)} 156 157 self.assertSetEqual(expected_results, self.fake_result_observer.results) 158 159 def test_return_result_after_running_variants(self): 160 variants = [ 161 FakeTest('to_filter'), 162 FakeTest('to_load'), 163 FakeTest('to_load'), 164 ] 165 166 self._simulate_proc(variants) 167 self.variant_proc.result_for(variants[1], None) 168 169 self.assertSetEqual(set(variants[1:]), self.fake_filter.loaded) 170 171 self.variant_proc.result_for(variants[2], None) 172 173 expected_results = {(self.test, None)} 174 self.assertSetEqual(expected_results, self.fake_result_observer.results) 175 176 177if __name__ == '__main__': 178 unittest.main() 179