11cb0ef41Sopenharmony_ci# Copyright 2018 the V8 project authors. All rights reserved.
21cb0ef41Sopenharmony_ci# Use of this source code is governed by a BSD-style license that can be
31cb0ef41Sopenharmony_ci# found in the LICENSE file.
41cb0ef41Sopenharmony_ci
51cb0ef41Sopenharmony_ciimport collections
61cb0ef41Sopenharmony_ciimport traceback
71cb0ef41Sopenharmony_ci
81cb0ef41Sopenharmony_cifrom . import base
91cb0ef41Sopenharmony_cifrom ..local import pool
101cb0ef41Sopenharmony_ci
111cb0ef41Sopenharmony_ci
121cb0ef41Sopenharmony_ci# Global function for multiprocessing, because pickling a static method doesn't
131cb0ef41Sopenharmony_ci# work on Windows.
141cb0ef41Sopenharmony_cidef run_job(job, process_context):
151cb0ef41Sopenharmony_ci  return job.run(process_context)
161cb0ef41Sopenharmony_ci
171cb0ef41Sopenharmony_ci
181cb0ef41Sopenharmony_cidef create_process_context(result_reduction):
191cb0ef41Sopenharmony_ci  return ProcessContext(result_reduction)
201cb0ef41Sopenharmony_ci
211cb0ef41Sopenharmony_ci
221cb0ef41Sopenharmony_ciJobResult = collections.namedtuple('JobResult', ['id', 'result'])
231cb0ef41Sopenharmony_ciProcessContext = collections.namedtuple('ProcessContext', ['result_reduction'])
241cb0ef41Sopenharmony_ci
251cb0ef41Sopenharmony_ci
261cb0ef41Sopenharmony_ciclass Job(object):
271cb0ef41Sopenharmony_ci  def __init__(self, test_id, cmd, outproc, keep_output):
281cb0ef41Sopenharmony_ci    self.test_id = test_id
291cb0ef41Sopenharmony_ci    self.cmd = cmd
301cb0ef41Sopenharmony_ci    self.outproc = outproc
311cb0ef41Sopenharmony_ci    self.keep_output = keep_output
321cb0ef41Sopenharmony_ci
331cb0ef41Sopenharmony_ci  def run(self, process_ctx):
341cb0ef41Sopenharmony_ci    output = self.cmd.execute()
351cb0ef41Sopenharmony_ci    reduction = process_ctx.result_reduction if not self.keep_output else None
361cb0ef41Sopenharmony_ci    result = self.outproc.process(output, reduction)
371cb0ef41Sopenharmony_ci    return JobResult(self.test_id, result)
381cb0ef41Sopenharmony_ci
391cb0ef41Sopenharmony_ci
401cb0ef41Sopenharmony_ciclass ExecutionProc(base.TestProc):
411cb0ef41Sopenharmony_ci  """Last processor in the chain. Instead of passing tests further it creates
421cb0ef41Sopenharmony_ci  commands and output processors, executes them in multiple worker processes and
431cb0ef41Sopenharmony_ci  sends results to the previous processor.
441cb0ef41Sopenharmony_ci  """
451cb0ef41Sopenharmony_ci
461cb0ef41Sopenharmony_ci  def __init__(self, jobs, outproc_factory=None):
471cb0ef41Sopenharmony_ci    super(ExecutionProc, self).__init__()
481cb0ef41Sopenharmony_ci    self._pool = pool.Pool(jobs, notify_fun=self.notify_previous)
491cb0ef41Sopenharmony_ci    self._outproc_factory = outproc_factory or (lambda t: t.output_proc)
501cb0ef41Sopenharmony_ci    self._tests = {}
511cb0ef41Sopenharmony_ci
521cb0ef41Sopenharmony_ci  def connect_to(self, next_proc):
531cb0ef41Sopenharmony_ci    assert False, 'ExecutionProc cannot be connected to anything'
541cb0ef41Sopenharmony_ci
551cb0ef41Sopenharmony_ci  def run(self):
561cb0ef41Sopenharmony_ci    it = self._pool.imap_unordered(
571cb0ef41Sopenharmony_ci        fn=run_job,
581cb0ef41Sopenharmony_ci        gen=[],
591cb0ef41Sopenharmony_ci        process_context_fn=create_process_context,
601cb0ef41Sopenharmony_ci        process_context_args=[self._prev_requirement],
611cb0ef41Sopenharmony_ci    )
621cb0ef41Sopenharmony_ci    for pool_result in it:
631cb0ef41Sopenharmony_ci      self._unpack_result(pool_result)
641cb0ef41Sopenharmony_ci
651cb0ef41Sopenharmony_ci  def next_test(self, test):
661cb0ef41Sopenharmony_ci    if self.is_stopped:
671cb0ef41Sopenharmony_ci      return False
681cb0ef41Sopenharmony_ci
691cb0ef41Sopenharmony_ci    test_id = test.procid
701cb0ef41Sopenharmony_ci    cmd = test.get_command()
711cb0ef41Sopenharmony_ci    self._tests[test_id] = test, cmd
721cb0ef41Sopenharmony_ci
731cb0ef41Sopenharmony_ci    outproc = self._outproc_factory(test)
741cb0ef41Sopenharmony_ci    self._pool.add([Job(test_id, cmd, outproc, test.keep_output)])
751cb0ef41Sopenharmony_ci
761cb0ef41Sopenharmony_ci    return True
771cb0ef41Sopenharmony_ci
781cb0ef41Sopenharmony_ci  def result_for(self, test, result):
791cb0ef41Sopenharmony_ci    assert False, 'ExecutionProc cannot receive results'
801cb0ef41Sopenharmony_ci
811cb0ef41Sopenharmony_ci  def stop(self):
821cb0ef41Sopenharmony_ci    super(ExecutionProc, self).stop()
831cb0ef41Sopenharmony_ci    self._pool.abort()
841cb0ef41Sopenharmony_ci
851cb0ef41Sopenharmony_ci  def _unpack_result(self, pool_result):
861cb0ef41Sopenharmony_ci    if pool_result.heartbeat:
871cb0ef41Sopenharmony_ci      self.heartbeat()
881cb0ef41Sopenharmony_ci      return
891cb0ef41Sopenharmony_ci
901cb0ef41Sopenharmony_ci    job_result = pool_result.value
911cb0ef41Sopenharmony_ci    test_id, result = job_result
921cb0ef41Sopenharmony_ci
931cb0ef41Sopenharmony_ci    test, result.cmd = self._tests[test_id]
941cb0ef41Sopenharmony_ci    del self._tests[test_id]
951cb0ef41Sopenharmony_ci    self._send_result(test, result)
96