1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18import os 19import time 20 21from devicetest.core.variables import DeccVariable 22from devicetest.log.logger import DeviceTestLog as log 23from devicetest.utils.file_util import create_dir 24from xdevice import stop_standing_subprocess 25from xdevice import DeviceConnectorType 26from xdevice import TestDeviceState 27 28LOCAL_IP = "127.0.0.1" 29LOCAL_PORT = 6001 30URL = "/" 31FORWARD_PORT = 9501 32 33 34class ScreenAgent: 35 SCREEN_AGENT_MAP = {} 36 37 def __init__(self, device): 38 self._device = device 39 self.log = device.log 40 self.proc = None 41 self.thread = None 42 self.local_port = None 43 self.is_server_started = False 44 45 def __del__(self): 46 self.terminate() 47 48 @classmethod 49 def get_instance(cls, _device): 50 _device.log.debug("in get instance.") 51 instance_sn = _device.device_sn 52 if instance_sn in ScreenAgent.SCREEN_AGENT_MAP: 53 return ScreenAgent.SCREEN_AGENT_MAP[instance_sn] 54 55 agent = ScreenAgent(_device) 56 ScreenAgent.SCREEN_AGENT_MAP[instance_sn] = agent 57 _device.log.debug("out get instance.") 58 return agent 59 60 @classmethod 61 def remove_instance(cls, _device): 62 _sn = _device.device_sn 63 if _sn in ScreenAgent.SCREEN_AGENT_MAP: 64 ScreenAgent.SCREEN_AGENT_MAP[_sn].terminate() 65 del ScreenAgent.SCREEN_AGENT_MAP[_sn] 66 67 @classmethod 68 def get_screenshot_dir(cls): 69 base_path = DeccVariable.cur_case().case_screenshot_dir 70 return os.path.join(base_path, DeccVariable.cur_case().suite_name, DeccVariable.cur_case().name) 71 72 @classmethod 73 def get_take_picture_path(cls, _device, picture_name, 74 ext=".png", exe_type="takeImage"): 75 """新增参数exeType,默认值为takeImage;可取值takeImage/dumpWindow""" 76 if os.path.isfile(picture_name): 77 folder = os.path.dirname(picture_name) 78 create_dir(folder) 79 return picture_name, os.path.basename(picture_name) 80 81 folder = cls.get_screenshot_dir() 82 create_dir(folder) 83 if picture_name.endswith(ext): 84 picture_name = picture_name.strip(ext) 85 86 if exe_type == "takeImage": 87 save_name = "{}.{}{}{}".format( 88 _device.device_sn.replace("?", "sn").replace(":", "_"), picture_name, 89 DeccVariable.cur_case().image_num, ext) 90 elif exe_type == "videoRecord": 91 save_name = "{}.{}{}{}".format( 92 _device.device_sn.replace("?", "sn").replace(":", "_"), picture_name, 93 DeccVariable.cur_case().video_num, ext) 94 elif exe_type == "stepImage": 95 save_name = "{}.{}{}".format( 96 _device.device_sn.replace("?", "sn").replace(":", "_"), picture_name, ext) 97 else: 98 save_name = "{}.{}{}{}".format( 99 _device.device_sn.replace("?", "sn").replace(":", "_"), picture_name, 100 DeccVariable.cur_case().dump_xml_num, ext) 101 102 fol_path = os.path.join(folder, save_name) 103 if exe_type == "takeImage": 104 DeccVariable.cur_case().image_num += 1 105 elif exe_type == "videoRecord": 106 DeccVariable.cur_case().video_num += 1 107 else: 108 if exe_type != "stepImage": 109 DeccVariable.cur_case().dump_xml_num += 1 110 return fol_path, save_name 111 112 @classmethod 113 def screen_take_picture(cls, args, result, _ta=None, is_raise_exception=True): 114 # When the phone is off, you can set the screenshot off function 115 pass 116 117 @classmethod 118 def _do_capture(cls, _device, link, path, title, ext=".png"): 119 # 设备处于断开状态,不执行截图 120 if hasattr(_device, 'test_device_state') and _device.test_device_state != TestDeviceState.ONLINE: 121 _device.log.warning("device is offline") 122 return '', '' 123 124 if hasattr(_device, "capture"): 125 # 截图需要设备对象实现capture方法 126 link, path = _device.capture(link, path, ext) 127 # 压缩图片为80% 128 cls.compress_image(path) 129 else: 130 _device.log.debug("The device not implement capture function, don't capture!") 131 if path and link: 132 _device.log.info( 133 '<a href="{}" target="_blank">Screenshot: {}' 134 '<img style="display: block;" {} title="{}" src="{}"/>' 135 '</a>'.format(link, path, cls.resize_image(path), title, link)) 136 return path, link 137 138 @classmethod 139 def __screen_and_save_picture(cls, _device, name, ext=".png", exe_type="takeImage"): 140 """ 141 @summary: 截取设备屏幕图片并保存 142 @param name: 保存的图片名称,通过getTakePicturePath方法获取保存全路径 143 ext: 保存图片后缀,支持".png"、".jpg"格式 144 """ 145 path, link = cls.get_image_dir_path(_device, name, ext, exe_type=exe_type) 146 # 截图文件后缀在方法内可能发生更改 147 return cls._do_capture(_device, link, path, name, ext) 148 149 @classmethod 150 def capture_step_picture(cls, file_name, step_name, _device, ext=".png"): 151 """截取step步骤图片并保存 152 file_name: str, 保存的图片名称 153 step_name: str, step步骤名称 154 _device : object, the device object to capture 155 ext : str, 保存图片后缀,支持".png"、".jpg"格式 156 """ 157 try: 158 path, save_name = cls.get_take_picture_path(_device, file_name, ext, exe_type="stepImage") 159 link = os.path.join(DeccVariable.cur_case().name, save_name) 160 # 截图文件后缀在方法内可能发生更改 161 return cls._do_capture(_device, link, path, step_name, ext) 162 except Exception as e: 163 log.error(f"take screenshot on step failed, reason: {e}") 164 return '', '' 165 166 @classmethod 167 def compress_image(cls, img_path, ratio=0.5, quality=80): 168 try: 169 import cv2 170 import numpy as np 171 pic = cv2.imdecode(np.fromfile(img_path, dtype=np.uint8), -1) 172 height, width, deep = pic.shape 173 width, height = (width * ratio, height * ratio) 174 pic = cv2.resize(pic, (int(width), int(height))) 175 params = [cv2.IMWRITE_JPEG_QUALITY, quality] 176 cv2.imencode('.jpeg', pic, params=params)[1].tofile(img_path) 177 except (ImportError, NameError): 178 pass 179 180 @classmethod 181 def get_image_dir_path(cls, _device, name, ext=".png", exe_type="takeImage"): 182 """ 183 增加了 exeType参数,默认为takeImage;可取值:takeImage/dumpWindow 184 """ 185 try: 186 if hasattr(_device, "is_oh") or hasattr(_device, "is_mac"): 187 phone_time = _device.execute_shell_command("date '+%Y%m%d_%H%M%S'").strip() 188 else: 189 phone_time = _device.connector.shell("date '+%Y%m%d_%H%M%S'").strip() 190 except Exception as exception: 191 _device.log.error("get date exception error") 192 _device.log.debug("get date exception: {}".format(exception)) 193 else: 194 name = "{}.{}".format(phone_time, name) 195 path, save_name = cls.get_take_picture_path(_device, name, ext, exe_type) 196 link = os.path.join(DeccVariable.cur_case().name, save_name) 197 return path, link 198 199 @classmethod 200 def resize_image(cls, file_path, max_height=480, file_type="image"): 201 width, height = 1080, 1920 202 ratio = 1 203 try: 204 if os.path.exists(file_path): 205 if file_type == "image": 206 from PIL import Image 207 img = Image.open(file_path) 208 width, height = img.width, img.height 209 img.close() 210 elif file_type == "video": 211 import cv2 212 try: 213 video_info = cv2.VideoCapture(file_path) 214 width = int(video_info.get(cv2.CAP_PROP_FRAME_WIDTH)) 215 height = int(video_info.get(cv2.CAP_PROP_FRAME_HEIGHT)) 216 video_info.release() 217 except Exception as e: 218 log.warning("get video width and height error: {}, use default".format(e)) 219 if width == 0 or height == 0: 220 width, height = 1080, 1920 221 if height < max_height: 222 return 'width="%d" height="%d"' % (width, height) 223 ratio = max_height / height 224 except ImportError: 225 log.error("Pillow or opencv-python is not installed ") 226 except ZeroDivisionError: 227 log.error("shot image height is 0") 228 return 'width="%d" height="%d"' % (width * ratio, max_height) 229 230 def terminate(self): 231 if self.local_port is not None and isinstance(self.local_port, int): 232 if hasattr(self._device, "is_oh") or \ 233 self._device.usb_type == DeviceConnectorType.hdc: 234 self._device.connector_command('fport rm tcp:{}'.format(self.local_port)) 235 else: 236 self._device.connector_command('forward --remove tcp:{}'.format(self.local_port)) 237 if self.proc is not None: 238 stop_standing_subprocess(self.proc) 239 if self.thread is not None: 240 start = time.time() 241 # 任务结束要等图片生成完 242 while self.thread.isAlive() and time.time() - start < 3: 243 time.sleep(0.1) 244