1ba991379Sopenharmony_ci#encoding=utf-8 2ba991379Sopenharmony_ci''' 3ba991379Sopenharmony_ci====================================================================================== 4ba991379Sopenharmony_ci版权 (C) 2015-2020, Huawei Technologies Co., HUTAF xDevice 5ba991379Sopenharmony_ci======================================================================================== 6ba991379Sopenharmony_ci@FileName: time_info.py 7ba991379Sopenharmony_ci@Function: 时间格式化方法,以及timeout注释 8ba991379Sopenharmony_ci@Author: 9ba991379Sopenharmony_ci@Date: 10ba991379Sopenharmony_ci====================================================================================== 11ba991379Sopenharmony_ci''' 12ba991379Sopenharmony_ci 13ba991379Sopenharmony_ciimport datetime 14ba991379Sopenharmony_ciimport sys 15ba991379Sopenharmony_ciimport threading 16ba991379Sopenharmony_ciimport time 17ba991379Sopenharmony_ciimport platform 18ba991379Sopenharmony_cifrom util.log_info import logger 19ba991379Sopenharmony_cithead = [] 20ba991379Sopenharmony_ci 21ba991379Sopenharmony_ci 22ba991379Sopenharmony_cidef get_timestamp(): 23ba991379Sopenharmony_ci return time.time() 24ba991379Sopenharmony_ci 25ba991379Sopenharmony_cidef get_time_str_info(time_stamp): 26ba991379Sopenharmony_ci st = time.localtime(time_stamp) 27ba991379Sopenharmony_ci result = time.strftime("%Y-%m-%d %H:%M:%S", st) 28ba991379Sopenharmony_ci return result 29ba991379Sopenharmony_ci 30ba991379Sopenharmony_cidef time_info_to_timestamp(timeinfo): 31ba991379Sopenharmony_ci st = time.strptime(timeinfo, "%Y-%m-%d %H:%M:%S") 32ba991379Sopenharmony_ci return time.mktime(st) 33ba991379Sopenharmony_ci 34ba991379Sopenharmony_cidef get_now_time_str_info(): 35ba991379Sopenharmony_ci now = datetime.datetime.now() 36ba991379Sopenharmony_ci time_str = now.strftime("%Y-%m-%d %H:%M:%S") 37ba991379Sopenharmony_ci microsecond = now.microsecond 38ba991379Sopenharmony_ci result = "%s,%s" % (time_str, microsecond) 39ba991379Sopenharmony_ci return result 40ba991379Sopenharmony_ci 41ba991379Sopenharmony_cidef get_now_time_info(): 42ba991379Sopenharmony_ci now = datetime.datetime.now() 43ba991379Sopenharmony_ci time_str = now.strftime("%Y-%m-%d %H:%M:%S") 44ba991379Sopenharmony_ci return time_str 45ba991379Sopenharmony_ci 46ba991379Sopenharmony_ciclass KThread(threading.Thread): 47ba991379Sopenharmony_ci def __init__(self, *args, **kwargs): 48ba991379Sopenharmony_ci threading.Thread.__init__(self, *args, **kwargs) 49ba991379Sopenharmony_ci self.killed = False 50ba991379Sopenharmony_ci 51ba991379Sopenharmony_ci def start(self): 52ba991379Sopenharmony_ci self.__run_backup = self.run 53ba991379Sopenharmony_ci self.run = self.__run # Force the Thread to install our trace. 54ba991379Sopenharmony_ci threading.Thread.start(self) 55ba991379Sopenharmony_ci 56ba991379Sopenharmony_ci def __run(self): 57ba991379Sopenharmony_ci sys.settrace(self.global_trace) 58ba991379Sopenharmony_ci self.__run_backup() 59ba991379Sopenharmony_ci self.run = self.__run_backup 60ba991379Sopenharmony_ci 61ba991379Sopenharmony_ci def global_trace(self, frame, why, arg): 62ba991379Sopenharmony_ci if why == "call": 63ba991379Sopenharmony_ci return self.local_trace 64ba991379Sopenharmony_ci else: 65ba991379Sopenharmony_ci return None 66ba991379Sopenharmony_ci 67ba991379Sopenharmony_ci def local_trace(self, frame, why, arg): 68ba991379Sopenharmony_ci if self.killed: 69ba991379Sopenharmony_ci if why == "line": 70ba991379Sopenharmony_ci raise SystemExit() 71ba991379Sopenharmony_ci return self.local_trace 72ba991379Sopenharmony_ci 73ba991379Sopenharmony_ci def kill(self): 74ba991379Sopenharmony_ci self.killed = True 75ba991379Sopenharmony_ci 76ba991379Sopenharmony_ciclass Timeout(Exception): 77ba991379Sopenharmony_ci def __init__(self, arg): 78ba991379Sopenharmony_ci for i in thead: 79ba991379Sopenharmony_ci if i is not None: 80ba991379Sopenharmony_ci i.kill() 81ba991379Sopenharmony_ci i = None 82ba991379Sopenharmony_ci super(Timeout, self).__init__(arg) 83ba991379Sopenharmony_ci 84ba991379Sopenharmony_cidef timeout(seconds = 300): 85ba991379Sopenharmony_ci def timeout_decorator(func): 86ba991379Sopenharmony_ci def _new_func(oldfunc, result, oldfunc_args, oldfunc_kwargs): 87ba991379Sopenharmony_ci result.append(oldfunc(*oldfunc_args, **oldfunc_kwargs)) 88ba991379Sopenharmony_ci 89ba991379Sopenharmony_ci def _(*args, **kwargs): 90ba991379Sopenharmony_ci result = [] 91ba991379Sopenharmony_ci new_kwargs = { # create new args for _new_func, because we want to get the func return val to result list 92ba991379Sopenharmony_ci "oldfunc": func, 93ba991379Sopenharmony_ci "result": result, 94ba991379Sopenharmony_ci "oldfunc_args": args, 95ba991379Sopenharmony_ci "oldfunc_kwargs": kwargs 96ba991379Sopenharmony_ci } 97ba991379Sopenharmony_ci thd = KThread(target=_new_func, args=(), kwargs=new_kwargs) 98ba991379Sopenharmony_ci thd.setDaemon(True) 99ba991379Sopenharmony_ci thd.start() 100ba991379Sopenharmony_ci thd.join(seconds) 101ba991379Sopenharmony_ci system_type = platform.system() 102ba991379Sopenharmony_ci if system_type == "Windows": 103ba991379Sopenharmony_ci alive = thd.isAlive() 104ba991379Sopenharmony_ci else: 105ba991379Sopenharmony_ci alive = thd.is_alive() 106ba991379Sopenharmony_ci thd.kill() 107ba991379Sopenharmony_ci if alive: 108ba991379Sopenharmony_ci raise Timeout(u"function run too long, timeout %d seconds." % seconds) 109ba991379Sopenharmony_ci else: 110ba991379Sopenharmony_ci if len(result) == 0: 111ba991379Sopenharmony_ci logger.info(u"function run too long, timeout %d seconds." % seconds) 112ba991379Sopenharmony_ci raise Exception("TESTCASE FAILED") 113ba991379Sopenharmony_ci else: 114ba991379Sopenharmony_ci return result[0] 115ba991379Sopenharmony_ci _.__name__ = func.__name__ 116ba991379Sopenharmony_ci _.__doc__ = func.__doc__ 117ba991379Sopenharmony_ci return _ 118ba991379Sopenharmony_ci return timeout_decorator