1# Copyright 2018 the V8 project authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from collections import defaultdict 6import time 7 8from . import base 9from ..objects import testcase 10from ..outproc import base as outproc 11 12class CombinerProc(base.TestProc): 13 def __init__(self, rng, min_group_size, max_group_size, count): 14 """ 15 Args: 16 rng: random number generator 17 min_group_size: minimum number of tests to combine 18 max_group_size: maximum number of tests to combine 19 count: how many tests to generate. 0 means infinite running 20 """ 21 super(CombinerProc, self).__init__() 22 23 self._rng = rng 24 self._min_size = min_group_size 25 self._max_size = max_group_size 26 self._count = count 27 28 # Index of the last generated test 29 self._current_num = 0 30 31 # {suite name: instance of TestGroups} 32 self._groups = defaultdict(TestGroups) 33 34 # {suite name: instance of TestCombiner} 35 self._combiners = {} 36 37 def setup(self, requirement=base.DROP_RESULT): 38 # Combiner is not able to pass results (even as None) to the previous 39 # processor. 40 assert requirement == base.DROP_RESULT 41 self._next_proc.setup(base.DROP_RESULT) 42 43 def next_test(self, test): 44 group_key = self._get_group_key(test) 45 if not group_key: 46 # Test not suitable for combining 47 return False 48 49 self._groups[test.suite.name].add_test(group_key, test) 50 return True 51 52 def _get_group_key(self, test): 53 combiner = self._get_combiner(test.suite) 54 if not combiner: 55 print ('>>> Warning: There is no combiner for %s testsuite' % 56 test.suite.name) 57 return None 58 return combiner.get_group_key(test) 59 60 def result_for(self, test, result): 61 self._send_next_test() 62 63 def generate_initial_tests(self, num=1): 64 for _ in range(0, num): 65 self._send_next_test() 66 67 def _send_next_test(self): 68 if self.is_stopped: 69 return False 70 71 if self._count and self._current_num >= self._count: 72 return False 73 74 combined_test = self._create_new_test() 75 if not combined_test: 76 # Not enough tests 77 return False 78 79 return self._send_test(combined_test) 80 81 def _create_new_test(self): 82 suite, combiner = self._select_suite() 83 groups = self._groups[suite] 84 85 max_size = self._rng.randint(self._min_size, self._max_size) 86 sample = groups.sample(self._rng, max_size) 87 if not sample: 88 return None 89 90 self._current_num += 1 91 return combiner.combine('%s-%d' % (suite, self._current_num), sample) 92 93 def _select_suite(self): 94 """Returns pair (suite name, combiner).""" 95 selected = self._rng.randint(0, len(self._groups) - 1) 96 for n, suite in enumerate(self._groups): 97 if n == selected: 98 return suite, self._combiners[suite] 99 100 def _get_combiner(self, suite): 101 combiner = self._combiners.get(suite.name) 102 if not combiner: 103 combiner = suite.get_test_combiner() 104 self._combiners[suite.name] = combiner 105 return combiner 106 107 108class TestGroups(object): 109 def __init__(self): 110 self._groups = defaultdict(list) 111 self._keys = [] 112 113 def add_test(self, key, test): 114 self._groups[key].append(test) 115 self._keys.append(key) 116 117 def sample(self, rng, max_size): 118 # Not enough tests 119 if not self._groups: 120 return None 121 122 group_key = rng.choice(self._keys) 123 tests = self._groups[group_key] 124 return [rng.choice(tests) for _ in range(0, max_size)] 125