1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18import os
19import time
20
21from devicetest.core.variables import DeccVariable
22from devicetest.log.logger import DeviceTestLog as log
23from devicetest.utils.file_util import create_dir
24from xdevice import stop_standing_subprocess
25from xdevice import DeviceConnectorType
26from xdevice import TestDeviceState
27
28LOCAL_IP = "127.0.0.1"
29LOCAL_PORT = 6001
30URL = "/"
31FORWARD_PORT = 9501
32
33
34class ScreenAgent:
35    SCREEN_AGENT_MAP = {}
36
37    def __init__(self, device):
38        self._device = device
39        self.log = device.log
40        self.proc = None
41        self.thread = None
42        self.local_port = None
43        self.is_server_started = False
44
45    def __del__(self):
46        self.terminate()
47
48    @classmethod
49    def get_instance(cls, _device):
50        _device.log.debug("in get instance.")
51        instance_sn = _device.device_sn
52        if instance_sn in ScreenAgent.SCREEN_AGENT_MAP:
53            return ScreenAgent.SCREEN_AGENT_MAP[instance_sn]
54
55        agent = ScreenAgent(_device)
56        ScreenAgent.SCREEN_AGENT_MAP[instance_sn] = agent
57        _device.log.debug("out get instance.")
58        return agent
59
60    @classmethod
61    def remove_instance(cls, _device):
62        _sn = _device.device_sn
63        if _sn in ScreenAgent.SCREEN_AGENT_MAP:
64            ScreenAgent.SCREEN_AGENT_MAP[_sn].terminate()
65            del ScreenAgent.SCREEN_AGENT_MAP[_sn]
66
67    @classmethod
68    def get_screenshot_dir(cls):
69        base_path = DeccVariable.cur_case().case_screenshot_dir
70        return os.path.join(base_path, DeccVariable.cur_case().suite_name, DeccVariable.cur_case().name)
71
72    @classmethod
73    def get_take_picture_path(cls, _device, picture_name,
74                              ext=".png", exe_type="takeImage"):
75        """新增参数exeType,默认值为takeImage;可取值takeImage/dumpWindow"""
76        if os.path.isfile(picture_name):
77            folder = os.path.dirname(picture_name)
78            create_dir(folder)
79            return picture_name, os.path.basename(picture_name)
80
81        folder = cls.get_screenshot_dir()
82        create_dir(folder)
83        if picture_name.endswith(ext):
84            picture_name = picture_name.strip(ext)
85
86        if exe_type == "takeImage":
87            save_name = "{}.{}{}{}".format(
88                _device.device_sn.replace("?", "sn").replace(":", "_"), picture_name,
89                DeccVariable.cur_case().image_num, ext)
90        elif exe_type == "videoRecord":
91            save_name = "{}.{}{}{}".format(
92                _device.device_sn.replace("?", "sn").replace(":", "_"), picture_name,
93                DeccVariable.cur_case().video_num, ext)
94        elif exe_type == "stepImage":
95            save_name = "{}.{}{}".format(
96                _device.device_sn.replace("?", "sn").replace(":", "_"), picture_name, ext)
97        else:
98            save_name = "{}.{}{}{}".format(
99                _device.device_sn.replace("?", "sn").replace(":", "_"), picture_name,
100                DeccVariable.cur_case().dump_xml_num, ext)
101
102        fol_path = os.path.join(folder, save_name)
103        if exe_type == "takeImage":
104            DeccVariable.cur_case().image_num += 1
105        elif exe_type == "videoRecord":
106            DeccVariable.cur_case().video_num += 1
107        else:
108            if exe_type != "stepImage":
109                DeccVariable.cur_case().dump_xml_num += 1
110        return fol_path, save_name
111
112    @classmethod
113    def screen_take_picture(cls, args, result, _ta=None, is_raise_exception=True):
114        # When the phone is off, you can set the screenshot off function
115        pass
116
117    @classmethod
118    def _do_capture(cls, _device, link, path, title, ext=".png"):
119        # 设备处于断开状态,不执行截图
120        if hasattr(_device, 'test_device_state') and _device.test_device_state != TestDeviceState.ONLINE:
121            _device.log.warning("device is offline")
122            return '', ''
123
124        if hasattr(_device, "capture"):
125            # 截图需要设备对象实现capture方法
126            link, path = _device.capture(link, path, ext)
127            # 压缩图片为80%
128            cls.compress_image(path)
129        else:
130            _device.log.debug("The device not implement capture function, don't capture!")
131        if path and link:
132            _device.log.info(
133                '<a href="{}" target="_blank">Screenshot: {}'
134                '<img style="display: block;" {} title="{}" src="{}"/>'
135                '</a>'.format(link, path, cls.resize_image(path), title, link))
136        return path, link
137
138    @classmethod
139    def __screen_and_save_picture(cls, _device, name, ext=".png", exe_type="takeImage"):
140        """
141        @summary: 截取设备屏幕图片并保存
142        @param  name: 保存的图片名称,通过getTakePicturePath方法获取保存全路径
143                ext: 保存图片后缀,支持".png"、".jpg"格式
144        """
145        path, link = cls.get_image_dir_path(_device, name, ext, exe_type=exe_type)
146        # 截图文件后缀在方法内可能发生更改
147        return cls._do_capture(_device, link, path, name, ext)
148
149    @classmethod
150    def capture_step_picture(cls, file_name, step_name, _device, ext=".png"):
151        """截取step步骤图片并保存
152        file_name: str, 保存的图片名称
153        step_name: str, step步骤名称
154        _device  : object, the device object to capture
155        ext : str, 保存图片后缀,支持".png"、".jpg"格式
156        """
157        try:
158            path, save_name = cls.get_take_picture_path(_device, file_name, ext, exe_type="stepImage")
159            link = os.path.join(DeccVariable.cur_case().name, save_name)
160            # 截图文件后缀在方法内可能发生更改
161            return cls._do_capture(_device, link, path, step_name, ext)
162        except Exception as e:
163            log.error(f"take screenshot on step failed, reason: {e}")
164        return '', ''
165
166    @classmethod
167    def compress_image(cls, img_path, ratio=0.5, quality=80):
168        try:
169            import cv2
170            import numpy as np
171            pic = cv2.imdecode(np.fromfile(img_path, dtype=np.uint8), -1)
172            height, width, deep = pic.shape
173            width, height = (width * ratio, height * ratio)
174            pic = cv2.resize(pic, (int(width), int(height)))
175            params = [cv2.IMWRITE_JPEG_QUALITY, quality]
176            cv2.imencode('.jpeg', pic, params=params)[1].tofile(img_path)
177        except (ImportError, NameError):
178            pass
179
180    @classmethod
181    def get_image_dir_path(cls, _device, name, ext=".png", exe_type="takeImage"):
182        """
183        增加了 exeType参数,默认为takeImage;可取值:takeImage/dumpWindow
184        """
185        try:
186            if hasattr(_device, "is_oh") or hasattr(_device, "is_mac"):
187                phone_time = _device.execute_shell_command("date '+%Y%m%d_%H%M%S'").strip()
188            else:
189                phone_time = _device.connector.shell("date '+%Y%m%d_%H%M%S'").strip()
190        except Exception as exception:
191            _device.log.error("get date exception error")
192            _device.log.debug("get date exception: {}".format(exception))
193        else:
194            name = "{}.{}".format(phone_time, name)
195        path, save_name = cls.get_take_picture_path(_device, name, ext, exe_type)
196        link = os.path.join(DeccVariable.cur_case().name, save_name)
197        return path, link
198
199    @classmethod
200    def resize_image(cls, file_path, max_height=480, file_type="image"):
201        width, height = 1080, 1920
202        ratio = 1
203        try:
204            if os.path.exists(file_path):
205                if file_type == "image":
206                    from PIL import Image
207                    img = Image.open(file_path)
208                    width, height = img.width, img.height
209                    img.close()
210                elif file_type == "video":
211                    import cv2
212                    try:
213                        video_info = cv2.VideoCapture(file_path)
214                        width = int(video_info.get(cv2.CAP_PROP_FRAME_WIDTH))
215                        height = int(video_info.get(cv2.CAP_PROP_FRAME_HEIGHT))
216                        video_info.release()
217                    except Exception as e:
218                        log.warning("get video width and height error: {}, use default".format(e))
219                    if width == 0 or height == 0:
220                        width, height = 1080, 1920
221            if height < max_height:
222                return 'width="%d" height="%d"' % (width, height)
223            ratio = max_height / height
224        except ImportError:
225            log.error("Pillow or opencv-python is not installed ")
226        except ZeroDivisionError:
227            log.error("shot image height is 0")
228        return 'width="%d" height="%d"' % (width * ratio, max_height)
229
230    def terminate(self):
231        if self.local_port is not None and isinstance(self.local_port, int):
232            if hasattr(self._device, "is_oh") or \
233                    self._device.usb_type == DeviceConnectorType.hdc:
234                self._device.connector_command('fport rm tcp:{}'.format(self.local_port))
235            else:
236                self._device.connector_command('forward --remove tcp:{}'.format(self.local_port))
237        if self.proc is not None:
238            stop_standing_subprocess(self.proc)
239        if self.thread is not None:
240            start = time.time()
241            # 任务结束要等图片生成完
242            while self.thread.isAlive() and time.time() - start < 3:
243                time.sleep(0.1)
244