1#encoding=utf-8
2'''
3======================================================================================
4版权 (C) 2015-2020, Huawei Technologies Co., HUTAF xDevice
5========================================================================================
6@FileName:    time_info.py
7@Function:    时间格式化方法,以及timeout注释
8@Author:
9@Date:
10======================================================================================
11'''
12
13import datetime
14import sys
15import threading
16import time
17import platform
18from util.log_info import logger
19thead = []
20
21
22def get_timestamp():
23    return time.time()
24
25def get_time_str_info(time_stamp):
26    st = time.localtime(time_stamp)
27    result = time.strftime("%Y-%m-%d %H:%M:%S", st)
28    return result
29
30def time_info_to_timestamp(timeinfo):
31    st = time.strptime(timeinfo, "%Y-%m-%d %H:%M:%S")
32    return time.mktime(st)
33
34def get_now_time_str_info():
35    now = datetime.datetime.now()
36    time_str = now.strftime("%Y-%m-%d %H:%M:%S")
37    microsecond = now.microsecond
38    result = "%s,%s" % (time_str, microsecond)
39    return result
40
41def get_now_time_info():
42    now = datetime.datetime.now()
43    time_str = now.strftime("%Y-%m-%d %H:%M:%S")
44    return time_str
45
46class KThread(threading.Thread):
47    def __init__(self, *args, **kwargs):
48        threading.Thread.__init__(self, *args, **kwargs)
49        self.killed = False
50
51    def start(self):
52        self.__run_backup = self.run
53        self.run = self.__run      # Force the Thread to install our trace.
54        threading.Thread.start(self)
55
56    def __run(self):
57        sys.settrace(self.global_trace)
58        self.__run_backup()
59        self.run = self.__run_backup
60
61    def global_trace(self, frame, why, arg):
62        if why == "call":
63            return self.local_trace
64        else:
65            return None
66
67    def local_trace(self, frame, why, arg):
68        if self.killed:
69            if why == "line":
70                raise SystemExit()
71        return self.local_trace
72
73    def kill(self):
74        self.killed = True
75
76class Timeout(Exception):
77    def __init__(self, arg):
78        for i in thead:
79            if i is not None:
80                i.kill()
81                i = None
82        super(Timeout, self).__init__(arg)
83
84def timeout(seconds = 300):
85    def timeout_decorator(func):
86        def _new_func(oldfunc, result, oldfunc_args, oldfunc_kwargs):
87            result.append(oldfunc(*oldfunc_args, **oldfunc_kwargs))
88
89        def _(*args, **kwargs):
90            result = []
91            new_kwargs = { # create new args for _new_func, because we want to get the func return val to result list
92                "oldfunc": func,
93                "result": result,
94                "oldfunc_args": args,
95                "oldfunc_kwargs": kwargs
96            }
97            thd = KThread(target=_new_func, args=(), kwargs=new_kwargs)
98            thd.setDaemon(True)
99            thd.start()
100            thd.join(seconds)
101            system_type = platform.system()
102            if system_type == "Windows":
103                alive = thd.isAlive()
104            else:
105                alive = thd.is_alive()
106            thd.kill()
107            if alive:
108                raise Timeout(u"function run too long, timeout %d seconds." % seconds)
109            else:
110                if len(result) == 0:
111                    logger.info(u"function run too long, timeout %d seconds." % seconds)
112                    raise Exception("TESTCASE FAILED")
113                else:
114                    return result[0]
115        _.__name__ = func.__name__
116        _.__doc__ = func.__doc__
117        return _
118    return timeout_decorator