16881f68fSopenharmony_ci#!/usr/bin/env python3
26881f68fSopenharmony_ci
36881f68fSopenharmony_ciimport sys
46881f68fSopenharmony_ciimport pytest
56881f68fSopenharmony_ciimport time
66881f68fSopenharmony_ciimport re
76881f68fSopenharmony_ciimport os
86881f68fSopenharmony_ciimport threading
96881f68fSopenharmony_ci
106881f68fSopenharmony_ci
116881f68fSopenharmony_ci# If a test fails, wait a moment before retrieving the captured
126881f68fSopenharmony_ci# stdout/stderr. When using a server process, this makes sure that we capture
136881f68fSopenharmony_ci# any potential output of the server that comes *after* a test has failed. For
146881f68fSopenharmony_ci# example, if a request handler raises an exception, the server first signals an
156881f68fSopenharmony_ci# error to FUSE (causing the test to fail), and then logs the exception. Without
166881f68fSopenharmony_ci# the extra delay, the exception will go into nowhere.
176881f68fSopenharmony_ci@pytest.hookimpl(hookwrapper=True)
186881f68fSopenharmony_cidef pytest_pyfunc_call(pyfuncitem):
196881f68fSopenharmony_ci    outcome = yield
206881f68fSopenharmony_ci    failed = outcome.excinfo is not None
216881f68fSopenharmony_ci    if failed:
226881f68fSopenharmony_ci        time.sleep(1)
236881f68fSopenharmony_ci
246881f68fSopenharmony_ci
256881f68fSopenharmony_ciclass OutputChecker:
266881f68fSopenharmony_ci    '''Check output data for suspicious patterns.
276881f68fSopenharmony_ci
286881f68fSopenharmony_ci    Everything written to check_output.fd will be scanned for suspicious
296881f68fSopenharmony_ci    messages and then written to sys.stdout.
306881f68fSopenharmony_ci    '''
316881f68fSopenharmony_ci
326881f68fSopenharmony_ci    def __init__(self):
336881f68fSopenharmony_ci        (fd_r, fd_w) = os.pipe()
346881f68fSopenharmony_ci        self.fd = fd_w
356881f68fSopenharmony_ci        self._false_positives = []
366881f68fSopenharmony_ci        self._buf = bytearray()
376881f68fSopenharmony_ci        self._thread = threading.Thread(target=self._loop, daemon=True, args=(fd_r,))
386881f68fSopenharmony_ci        self._thread.start()
396881f68fSopenharmony_ci
406881f68fSopenharmony_ci    def register_output(self, pattern, count=1, flags=re.MULTILINE):
416881f68fSopenharmony_ci        '''Register *pattern* as false positive for output checking
426881f68fSopenharmony_ci
436881f68fSopenharmony_ci        This prevents the test from failing because the output otherwise
446881f68fSopenharmony_ci        appears suspicious.
456881f68fSopenharmony_ci        '''
466881f68fSopenharmony_ci
476881f68fSopenharmony_ci        self._false_positives.append((pattern, flags, count))
486881f68fSopenharmony_ci
496881f68fSopenharmony_ci    def _loop(self, ifd):
506881f68fSopenharmony_ci        BUFSIZE = 128*1024
516881f68fSopenharmony_ci        ofd = sys.stdout.fileno()
526881f68fSopenharmony_ci        while True:
536881f68fSopenharmony_ci            buf = os.read(ifd, BUFSIZE)
546881f68fSopenharmony_ci            if not buf:
556881f68fSopenharmony_ci                break
566881f68fSopenharmony_ci            os.write(ofd, buf)
576881f68fSopenharmony_ci            self._buf += buf
586881f68fSopenharmony_ci
596881f68fSopenharmony_ci    def _check(self):
606881f68fSopenharmony_ci        os.close(self.fd)
616881f68fSopenharmony_ci        self._thread.join()
626881f68fSopenharmony_ci
636881f68fSopenharmony_ci        buf = self._buf.decode('utf8', errors='replace')
646881f68fSopenharmony_ci
656881f68fSopenharmony_ci        # Strip out false positives
666881f68fSopenharmony_ci        for (pattern, flags, count) in self._false_positives:
676881f68fSopenharmony_ci            cp = re.compile(pattern, flags)
686881f68fSopenharmony_ci            (buf, cnt) = cp.subn('', buf, count=count)
696881f68fSopenharmony_ci
706881f68fSopenharmony_ci        patterns = [ r'\b{}\b'.format(x) for x in
716881f68fSopenharmony_ci                     ('exception', 'error', 'warning', 'fatal', 'traceback',
726881f68fSopenharmony_ci                        'fault', 'crash(?:ed)?', 'abort(?:ed)',
736881f68fSopenharmony_ci                        'uninitiali[zs]ed') ]
746881f68fSopenharmony_ci        patterns += ['^==[0-9]+== ']
756881f68fSopenharmony_ci
766881f68fSopenharmony_ci        for pattern in patterns:
776881f68fSopenharmony_ci            cp = re.compile(pattern, re.IGNORECASE | re.MULTILINE)
786881f68fSopenharmony_ci            hit = cp.search(buf)
796881f68fSopenharmony_ci            if hit:
806881f68fSopenharmony_ci                raise AssertionError('Suspicious output to stderr (matched "%s")'
816881f68fSopenharmony_ci                                     % hit.group(0))
826881f68fSopenharmony_ci
836881f68fSopenharmony_ci@pytest.fixture()
846881f68fSopenharmony_cidef output_checker(request):
856881f68fSopenharmony_ci    checker = OutputChecker()
866881f68fSopenharmony_ci    yield checker
876881f68fSopenharmony_ci    checker._check()
886881f68fSopenharmony_ci
896881f68fSopenharmony_ci
906881f68fSopenharmony_ci# Make test outcome available to fixtures
916881f68fSopenharmony_ci# (from https://github.com/pytest-dev/pytest/issues/230)
926881f68fSopenharmony_ci@pytest.hookimpl(hookwrapper=True, tryfirst=True)
936881f68fSopenharmony_cidef pytest_runtest_makereport(item, call):
946881f68fSopenharmony_ci    outcome = yield
956881f68fSopenharmony_ci    rep = outcome.get_result()
966881f68fSopenharmony_ci    setattr(item, "rep_" + rep.when, rep)
976881f68fSopenharmony_ci    return rep
98