11cb0ef41Sopenharmony_ci# Copyright 2018 the V8 project authors. All rights reserved. 21cb0ef41Sopenharmony_ci# Use of this source code is governed by a BSD-style license that can be 31cb0ef41Sopenharmony_ci# found in the LICENSE file. 41cb0ef41Sopenharmony_ci 51cb0ef41Sopenharmony_ciimport collections 61cb0ef41Sopenharmony_ciimport traceback 71cb0ef41Sopenharmony_ci 81cb0ef41Sopenharmony_cifrom . import base 91cb0ef41Sopenharmony_cifrom ..local import pool 101cb0ef41Sopenharmony_ci 111cb0ef41Sopenharmony_ci 121cb0ef41Sopenharmony_ci# Global function for multiprocessing, because pickling a static method doesn't 131cb0ef41Sopenharmony_ci# work on Windows. 141cb0ef41Sopenharmony_cidef run_job(job, process_context): 151cb0ef41Sopenharmony_ci return job.run(process_context) 161cb0ef41Sopenharmony_ci 171cb0ef41Sopenharmony_ci 181cb0ef41Sopenharmony_cidef create_process_context(result_reduction): 191cb0ef41Sopenharmony_ci return ProcessContext(result_reduction) 201cb0ef41Sopenharmony_ci 211cb0ef41Sopenharmony_ci 221cb0ef41Sopenharmony_ciJobResult = collections.namedtuple('JobResult', ['id', 'result']) 231cb0ef41Sopenharmony_ciProcessContext = collections.namedtuple('ProcessContext', ['result_reduction']) 241cb0ef41Sopenharmony_ci 251cb0ef41Sopenharmony_ci 261cb0ef41Sopenharmony_ciclass Job(object): 271cb0ef41Sopenharmony_ci def __init__(self, test_id, cmd, outproc, keep_output): 281cb0ef41Sopenharmony_ci self.test_id = test_id 291cb0ef41Sopenharmony_ci self.cmd = cmd 301cb0ef41Sopenharmony_ci self.outproc = outproc 311cb0ef41Sopenharmony_ci self.keep_output = keep_output 321cb0ef41Sopenharmony_ci 331cb0ef41Sopenharmony_ci def run(self, process_ctx): 341cb0ef41Sopenharmony_ci output = self.cmd.execute() 351cb0ef41Sopenharmony_ci reduction = process_ctx.result_reduction if not self.keep_output else None 361cb0ef41Sopenharmony_ci result = self.outproc.process(output, reduction) 371cb0ef41Sopenharmony_ci return JobResult(self.test_id, result) 381cb0ef41Sopenharmony_ci 391cb0ef41Sopenharmony_ci 401cb0ef41Sopenharmony_ciclass ExecutionProc(base.TestProc): 411cb0ef41Sopenharmony_ci """Last processor in the chain. Instead of passing tests further it creates 421cb0ef41Sopenharmony_ci commands and output processors, executes them in multiple worker processes and 431cb0ef41Sopenharmony_ci sends results to the previous processor. 441cb0ef41Sopenharmony_ci """ 451cb0ef41Sopenharmony_ci 461cb0ef41Sopenharmony_ci def __init__(self, jobs, outproc_factory=None): 471cb0ef41Sopenharmony_ci super(ExecutionProc, self).__init__() 481cb0ef41Sopenharmony_ci self._pool = pool.Pool(jobs, notify_fun=self.notify_previous) 491cb0ef41Sopenharmony_ci self._outproc_factory = outproc_factory or (lambda t: t.output_proc) 501cb0ef41Sopenharmony_ci self._tests = {} 511cb0ef41Sopenharmony_ci 521cb0ef41Sopenharmony_ci def connect_to(self, next_proc): 531cb0ef41Sopenharmony_ci assert False, 'ExecutionProc cannot be connected to anything' 541cb0ef41Sopenharmony_ci 551cb0ef41Sopenharmony_ci def run(self): 561cb0ef41Sopenharmony_ci it = self._pool.imap_unordered( 571cb0ef41Sopenharmony_ci fn=run_job, 581cb0ef41Sopenharmony_ci gen=[], 591cb0ef41Sopenharmony_ci process_context_fn=create_process_context, 601cb0ef41Sopenharmony_ci process_context_args=[self._prev_requirement], 611cb0ef41Sopenharmony_ci ) 621cb0ef41Sopenharmony_ci for pool_result in it: 631cb0ef41Sopenharmony_ci self._unpack_result(pool_result) 641cb0ef41Sopenharmony_ci 651cb0ef41Sopenharmony_ci def next_test(self, test): 661cb0ef41Sopenharmony_ci if self.is_stopped: 671cb0ef41Sopenharmony_ci return False 681cb0ef41Sopenharmony_ci 691cb0ef41Sopenharmony_ci test_id = test.procid 701cb0ef41Sopenharmony_ci cmd = test.get_command() 711cb0ef41Sopenharmony_ci self._tests[test_id] = test, cmd 721cb0ef41Sopenharmony_ci 731cb0ef41Sopenharmony_ci outproc = self._outproc_factory(test) 741cb0ef41Sopenharmony_ci self._pool.add([Job(test_id, cmd, outproc, test.keep_output)]) 751cb0ef41Sopenharmony_ci 761cb0ef41Sopenharmony_ci return True 771cb0ef41Sopenharmony_ci 781cb0ef41Sopenharmony_ci def result_for(self, test, result): 791cb0ef41Sopenharmony_ci assert False, 'ExecutionProc cannot receive results' 801cb0ef41Sopenharmony_ci 811cb0ef41Sopenharmony_ci def stop(self): 821cb0ef41Sopenharmony_ci super(ExecutionProc, self).stop() 831cb0ef41Sopenharmony_ci self._pool.abort() 841cb0ef41Sopenharmony_ci 851cb0ef41Sopenharmony_ci def _unpack_result(self, pool_result): 861cb0ef41Sopenharmony_ci if pool_result.heartbeat: 871cb0ef41Sopenharmony_ci self.heartbeat() 881cb0ef41Sopenharmony_ci return 891cb0ef41Sopenharmony_ci 901cb0ef41Sopenharmony_ci job_result = pool_result.value 911cb0ef41Sopenharmony_ci test_id, result = job_result 921cb0ef41Sopenharmony_ci 931cb0ef41Sopenharmony_ci test, result.cmd = self._tests[test_id] 941cb0ef41Sopenharmony_ci del self._tests[test_id] 951cb0ef41Sopenharmony_ci self._send_result(test, result) 96