17db96d56Sopenharmony_ciimport logging 27db96d56Sopenharmony_ciimport collections 37db96d56Sopenharmony_ci 47db96d56Sopenharmony_cifrom .case import _BaseTestCaseContext 57db96d56Sopenharmony_ci 67db96d56Sopenharmony_ci 77db96d56Sopenharmony_ci_LoggingWatcher = collections.namedtuple("_LoggingWatcher", 87db96d56Sopenharmony_ci ["records", "output"]) 97db96d56Sopenharmony_ci 107db96d56Sopenharmony_ciclass _CapturingHandler(logging.Handler): 117db96d56Sopenharmony_ci """ 127db96d56Sopenharmony_ci A logging handler capturing all (raw and formatted) logging output. 137db96d56Sopenharmony_ci """ 147db96d56Sopenharmony_ci 157db96d56Sopenharmony_ci def __init__(self): 167db96d56Sopenharmony_ci logging.Handler.__init__(self) 177db96d56Sopenharmony_ci self.watcher = _LoggingWatcher([], []) 187db96d56Sopenharmony_ci 197db96d56Sopenharmony_ci def flush(self): 207db96d56Sopenharmony_ci pass 217db96d56Sopenharmony_ci 227db96d56Sopenharmony_ci def emit(self, record): 237db96d56Sopenharmony_ci self.watcher.records.append(record) 247db96d56Sopenharmony_ci msg = self.format(record) 257db96d56Sopenharmony_ci self.watcher.output.append(msg) 267db96d56Sopenharmony_ci 277db96d56Sopenharmony_ci 287db96d56Sopenharmony_ciclass _AssertLogsContext(_BaseTestCaseContext): 297db96d56Sopenharmony_ci """A context manager for assertLogs() and assertNoLogs() """ 307db96d56Sopenharmony_ci 317db96d56Sopenharmony_ci LOGGING_FORMAT = "%(levelname)s:%(name)s:%(message)s" 327db96d56Sopenharmony_ci 337db96d56Sopenharmony_ci def __init__(self, test_case, logger_name, level, no_logs): 347db96d56Sopenharmony_ci _BaseTestCaseContext.__init__(self, test_case) 357db96d56Sopenharmony_ci self.logger_name = logger_name 367db96d56Sopenharmony_ci if level: 377db96d56Sopenharmony_ci self.level = logging._nameToLevel.get(level, level) 387db96d56Sopenharmony_ci else: 397db96d56Sopenharmony_ci self.level = logging.INFO 407db96d56Sopenharmony_ci self.msg = None 417db96d56Sopenharmony_ci self.no_logs = no_logs 427db96d56Sopenharmony_ci 437db96d56Sopenharmony_ci def __enter__(self): 447db96d56Sopenharmony_ci if isinstance(self.logger_name, logging.Logger): 457db96d56Sopenharmony_ci logger = self.logger = self.logger_name 467db96d56Sopenharmony_ci else: 477db96d56Sopenharmony_ci logger = self.logger = logging.getLogger(self.logger_name) 487db96d56Sopenharmony_ci formatter = logging.Formatter(self.LOGGING_FORMAT) 497db96d56Sopenharmony_ci handler = _CapturingHandler() 507db96d56Sopenharmony_ci handler.setLevel(self.level) 517db96d56Sopenharmony_ci handler.setFormatter(formatter) 527db96d56Sopenharmony_ci self.watcher = handler.watcher 537db96d56Sopenharmony_ci self.old_handlers = logger.handlers[:] 547db96d56Sopenharmony_ci self.old_level = logger.level 557db96d56Sopenharmony_ci self.old_propagate = logger.propagate 567db96d56Sopenharmony_ci logger.handlers = [handler] 577db96d56Sopenharmony_ci logger.setLevel(self.level) 587db96d56Sopenharmony_ci logger.propagate = False 597db96d56Sopenharmony_ci if self.no_logs: 607db96d56Sopenharmony_ci return 617db96d56Sopenharmony_ci return handler.watcher 627db96d56Sopenharmony_ci 637db96d56Sopenharmony_ci def __exit__(self, exc_type, exc_value, tb): 647db96d56Sopenharmony_ci self.logger.handlers = self.old_handlers 657db96d56Sopenharmony_ci self.logger.propagate = self.old_propagate 667db96d56Sopenharmony_ci self.logger.setLevel(self.old_level) 677db96d56Sopenharmony_ci 687db96d56Sopenharmony_ci if exc_type is not None: 697db96d56Sopenharmony_ci # let unexpected exceptions pass through 707db96d56Sopenharmony_ci return False 717db96d56Sopenharmony_ci 727db96d56Sopenharmony_ci if self.no_logs: 737db96d56Sopenharmony_ci # assertNoLogs 747db96d56Sopenharmony_ci if len(self.watcher.records) > 0: 757db96d56Sopenharmony_ci self._raiseFailure( 767db96d56Sopenharmony_ci "Unexpected logs found: {!r}".format( 777db96d56Sopenharmony_ci self.watcher.output 787db96d56Sopenharmony_ci ) 797db96d56Sopenharmony_ci ) 807db96d56Sopenharmony_ci 817db96d56Sopenharmony_ci else: 827db96d56Sopenharmony_ci # assertLogs 837db96d56Sopenharmony_ci if len(self.watcher.records) == 0: 847db96d56Sopenharmony_ci self._raiseFailure( 857db96d56Sopenharmony_ci "no logs of level {} or higher triggered on {}" 867db96d56Sopenharmony_ci .format(logging.getLevelName(self.level), self.logger.name)) 87