176e6818aSopenharmony_ci#!/usr/bin/env python3 276e6818aSopenharmony_ci# coding=utf-8 376e6818aSopenharmony_ci 476e6818aSopenharmony_ci# 576e6818aSopenharmony_ci# Copyright (c) 2022 Huawei Device Co., Ltd. 676e6818aSopenharmony_ci# Licensed under the Apache License, Version 2.0 (the "License"); 776e6818aSopenharmony_ci# you may not use this file except in compliance with the License. 876e6818aSopenharmony_ci# You may obtain a copy of the License at 976e6818aSopenharmony_ci# 1076e6818aSopenharmony_ci# http://www.apache.org/licenses/LICENSE-2.0 1176e6818aSopenharmony_ci# 1276e6818aSopenharmony_ci# Unless required by applicable law or agreed to in writing, software 1376e6818aSopenharmony_ci# distributed under the License is distributed on an "AS IS" BASIS, 1476e6818aSopenharmony_ci# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1576e6818aSopenharmony_ci# See the License for the specific language governing permissions and 1676e6818aSopenharmony_ci# limitations under the License. 1776e6818aSopenharmony_ci# 1876e6818aSopenharmony_ciimport os 1976e6818aSopenharmony_ciimport time 2076e6818aSopenharmony_ci 2176e6818aSopenharmony_cifrom devicetest.core.variables import DeccVariable 2276e6818aSopenharmony_cifrom devicetest.log.logger import DeviceTestLog as log 2376e6818aSopenharmony_cifrom devicetest.utils.file_util import create_dir 2476e6818aSopenharmony_cifrom xdevice import stop_standing_subprocess 2576e6818aSopenharmony_cifrom xdevice import DeviceConnectorType 2676e6818aSopenharmony_cifrom xdevice import TestDeviceState 2776e6818aSopenharmony_ci 2876e6818aSopenharmony_ciLOCAL_IP = "127.0.0.1" 2976e6818aSopenharmony_ciLOCAL_PORT = 6001 3076e6818aSopenharmony_ciURL = "/" 3176e6818aSopenharmony_ciFORWARD_PORT = 9501 3276e6818aSopenharmony_ci 3376e6818aSopenharmony_ci 3476e6818aSopenharmony_ciclass ScreenAgent: 3576e6818aSopenharmony_ci SCREEN_AGENT_MAP = {} 3676e6818aSopenharmony_ci 3776e6818aSopenharmony_ci def __init__(self, device): 3876e6818aSopenharmony_ci self._device = device 3976e6818aSopenharmony_ci self.log = device.log 4076e6818aSopenharmony_ci self.proc = None 4176e6818aSopenharmony_ci self.thread = None 4276e6818aSopenharmony_ci self.local_port = None 4376e6818aSopenharmony_ci self.is_server_started = False 4476e6818aSopenharmony_ci 4576e6818aSopenharmony_ci def __del__(self): 4676e6818aSopenharmony_ci self.terminate() 4776e6818aSopenharmony_ci 4876e6818aSopenharmony_ci @classmethod 4976e6818aSopenharmony_ci def get_instance(cls, _device): 5076e6818aSopenharmony_ci _device.log.debug("in get instance.") 5176e6818aSopenharmony_ci instance_sn = _device.device_sn 5276e6818aSopenharmony_ci if instance_sn in ScreenAgent.SCREEN_AGENT_MAP: 5376e6818aSopenharmony_ci return ScreenAgent.SCREEN_AGENT_MAP[instance_sn] 5476e6818aSopenharmony_ci 5576e6818aSopenharmony_ci agent = ScreenAgent(_device) 5676e6818aSopenharmony_ci ScreenAgent.SCREEN_AGENT_MAP[instance_sn] = agent 5776e6818aSopenharmony_ci _device.log.debug("out get instance.") 5876e6818aSopenharmony_ci return agent 5976e6818aSopenharmony_ci 6076e6818aSopenharmony_ci @classmethod 6176e6818aSopenharmony_ci def remove_instance(cls, _device): 6276e6818aSopenharmony_ci _sn = _device.device_sn 6376e6818aSopenharmony_ci if _sn in ScreenAgent.SCREEN_AGENT_MAP: 6476e6818aSopenharmony_ci ScreenAgent.SCREEN_AGENT_MAP[_sn].terminate() 6576e6818aSopenharmony_ci del ScreenAgent.SCREEN_AGENT_MAP[_sn] 6676e6818aSopenharmony_ci 6776e6818aSopenharmony_ci @classmethod 6876e6818aSopenharmony_ci def get_screenshot_dir(cls): 6976e6818aSopenharmony_ci base_path = DeccVariable.cur_case().case_screenshot_dir 7076e6818aSopenharmony_ci return os.path.join(base_path, DeccVariable.cur_case().suite_name, DeccVariable.cur_case().name) 7176e6818aSopenharmony_ci 7276e6818aSopenharmony_ci @classmethod 7376e6818aSopenharmony_ci def get_take_picture_path(cls, _device, picture_name, 7476e6818aSopenharmony_ci ext=".png", exe_type="takeImage"): 7576e6818aSopenharmony_ci """新增参数exeType,默认值为takeImage;可取值takeImage/dumpWindow""" 7676e6818aSopenharmony_ci if os.path.isfile(picture_name): 7776e6818aSopenharmony_ci folder = os.path.dirname(picture_name) 7876e6818aSopenharmony_ci create_dir(folder) 7976e6818aSopenharmony_ci return picture_name, os.path.basename(picture_name) 8076e6818aSopenharmony_ci 8176e6818aSopenharmony_ci folder = cls.get_screenshot_dir() 8276e6818aSopenharmony_ci create_dir(folder) 8376e6818aSopenharmony_ci if picture_name.endswith(ext): 8476e6818aSopenharmony_ci picture_name = picture_name.strip(ext) 8576e6818aSopenharmony_ci 8676e6818aSopenharmony_ci if exe_type == "takeImage": 8776e6818aSopenharmony_ci save_name = "{}.{}{}{}".format( 8876e6818aSopenharmony_ci _device.device_sn.replace("?", "sn").replace(":", "_"), picture_name, 8976e6818aSopenharmony_ci DeccVariable.cur_case().image_num, ext) 9076e6818aSopenharmony_ci elif exe_type == "videoRecord": 9176e6818aSopenharmony_ci save_name = "{}.{}{}{}".format( 9276e6818aSopenharmony_ci _device.device_sn.replace("?", "sn").replace(":", "_"), picture_name, 9376e6818aSopenharmony_ci DeccVariable.cur_case().video_num, ext) 9476e6818aSopenharmony_ci elif exe_type == "stepImage": 9576e6818aSopenharmony_ci save_name = "{}.{}{}".format( 9676e6818aSopenharmony_ci _device.device_sn.replace("?", "sn").replace(":", "_"), picture_name, ext) 9776e6818aSopenharmony_ci else: 9876e6818aSopenharmony_ci save_name = "{}.{}{}{}".format( 9976e6818aSopenharmony_ci _device.device_sn.replace("?", "sn").replace(":", "_"), picture_name, 10076e6818aSopenharmony_ci DeccVariable.cur_case().dump_xml_num, ext) 10176e6818aSopenharmony_ci 10276e6818aSopenharmony_ci fol_path = os.path.join(folder, save_name) 10376e6818aSopenharmony_ci if exe_type == "takeImage": 10476e6818aSopenharmony_ci DeccVariable.cur_case().image_num += 1 10576e6818aSopenharmony_ci elif exe_type == "videoRecord": 10676e6818aSopenharmony_ci DeccVariable.cur_case().video_num += 1 10776e6818aSopenharmony_ci else: 10876e6818aSopenharmony_ci if exe_type != "stepImage": 10976e6818aSopenharmony_ci DeccVariable.cur_case().dump_xml_num += 1 11076e6818aSopenharmony_ci return fol_path, save_name 11176e6818aSopenharmony_ci 11276e6818aSopenharmony_ci @classmethod 11376e6818aSopenharmony_ci def screen_take_picture(cls, args, result, _ta=None, is_raise_exception=True): 11476e6818aSopenharmony_ci # When the phone is off, you can set the screenshot off function 11576e6818aSopenharmony_ci pass 11676e6818aSopenharmony_ci 11776e6818aSopenharmony_ci @classmethod 11876e6818aSopenharmony_ci def _do_capture(cls, _device, link, path, title, ext=".png"): 11976e6818aSopenharmony_ci # 设备处于断开状态,不执行截图 12076e6818aSopenharmony_ci if hasattr(_device, 'test_device_state') and _device.test_device_state != TestDeviceState.ONLINE: 12176e6818aSopenharmony_ci _device.log.warning("device is offline") 12276e6818aSopenharmony_ci return '', '' 12376e6818aSopenharmony_ci 12476e6818aSopenharmony_ci if hasattr(_device, "capture"): 12576e6818aSopenharmony_ci # 截图需要设备对象实现capture方法 12676e6818aSopenharmony_ci link, path = _device.capture(link, path, ext) 12776e6818aSopenharmony_ci # 压缩图片为80% 12876e6818aSopenharmony_ci cls.compress_image(path) 12976e6818aSopenharmony_ci else: 13076e6818aSopenharmony_ci _device.log.debug("The device not implement capture function, don't capture!") 13176e6818aSopenharmony_ci if path and link: 13276e6818aSopenharmony_ci _device.log.info( 13376e6818aSopenharmony_ci '<a href="{}" target="_blank">Screenshot: {}' 13476e6818aSopenharmony_ci '<img style="display: block;" {} title="{}" src="{}"/>' 13576e6818aSopenharmony_ci '</a>'.format(link, path, cls.resize_image(path), title, link)) 13676e6818aSopenharmony_ci return path, link 13776e6818aSopenharmony_ci 13876e6818aSopenharmony_ci @classmethod 13976e6818aSopenharmony_ci def __screen_and_save_picture(cls, _device, name, ext=".png", exe_type="takeImage"): 14076e6818aSopenharmony_ci """ 14176e6818aSopenharmony_ci @summary: 截取设备屏幕图片并保存 14276e6818aSopenharmony_ci @param name: 保存的图片名称,通过getTakePicturePath方法获取保存全路径 14376e6818aSopenharmony_ci ext: 保存图片后缀,支持".png"、".jpg"格式 14476e6818aSopenharmony_ci """ 14576e6818aSopenharmony_ci path, link = cls.get_image_dir_path(_device, name, ext, exe_type=exe_type) 14676e6818aSopenharmony_ci # 截图文件后缀在方法内可能发生更改 14776e6818aSopenharmony_ci return cls._do_capture(_device, link, path, name, ext) 14876e6818aSopenharmony_ci 14976e6818aSopenharmony_ci @classmethod 15076e6818aSopenharmony_ci def capture_step_picture(cls, file_name, step_name, _device, ext=".png"): 15176e6818aSopenharmony_ci """截取step步骤图片并保存 15276e6818aSopenharmony_ci file_name: str, 保存的图片名称 15376e6818aSopenharmony_ci step_name: str, step步骤名称 15476e6818aSopenharmony_ci _device : object, the device object to capture 15576e6818aSopenharmony_ci ext : str, 保存图片后缀,支持".png"、".jpg"格式 15676e6818aSopenharmony_ci """ 15776e6818aSopenharmony_ci try: 15876e6818aSopenharmony_ci path, save_name = cls.get_take_picture_path(_device, file_name, ext, exe_type="stepImage") 15976e6818aSopenharmony_ci link = os.path.join(DeccVariable.cur_case().name, save_name) 16076e6818aSopenharmony_ci # 截图文件后缀在方法内可能发生更改 16176e6818aSopenharmony_ci return cls._do_capture(_device, link, path, step_name, ext) 16276e6818aSopenharmony_ci except Exception as e: 16376e6818aSopenharmony_ci log.error(f"take screenshot on step failed, reason: {e}") 16476e6818aSopenharmony_ci return '', '' 16576e6818aSopenharmony_ci 16676e6818aSopenharmony_ci @classmethod 16776e6818aSopenharmony_ci def compress_image(cls, img_path, ratio=0.5, quality=80): 16876e6818aSopenharmony_ci try: 16976e6818aSopenharmony_ci import cv2 17076e6818aSopenharmony_ci import numpy as np 17176e6818aSopenharmony_ci pic = cv2.imdecode(np.fromfile(img_path, dtype=np.uint8), -1) 17276e6818aSopenharmony_ci height, width, deep = pic.shape 17376e6818aSopenharmony_ci width, height = (width * ratio, height * ratio) 17476e6818aSopenharmony_ci pic = cv2.resize(pic, (int(width), int(height))) 17576e6818aSopenharmony_ci params = [cv2.IMWRITE_JPEG_QUALITY, quality] 17676e6818aSopenharmony_ci cv2.imencode('.jpeg', pic, params=params)[1].tofile(img_path) 17776e6818aSopenharmony_ci except (ImportError, NameError): 17876e6818aSopenharmony_ci pass 17976e6818aSopenharmony_ci 18076e6818aSopenharmony_ci @classmethod 18176e6818aSopenharmony_ci def get_image_dir_path(cls, _device, name, ext=".png", exe_type="takeImage"): 18276e6818aSopenharmony_ci """ 18376e6818aSopenharmony_ci 增加了 exeType参数,默认为takeImage;可取值:takeImage/dumpWindow 18476e6818aSopenharmony_ci """ 18576e6818aSopenharmony_ci try: 18676e6818aSopenharmony_ci if hasattr(_device, "is_oh") or hasattr(_device, "is_mac"): 18776e6818aSopenharmony_ci phone_time = _device.execute_shell_command("date '+%Y%m%d_%H%M%S'").strip() 18876e6818aSopenharmony_ci else: 18976e6818aSopenharmony_ci phone_time = _device.connector.shell("date '+%Y%m%d_%H%M%S'").strip() 19076e6818aSopenharmony_ci except Exception as exception: 19176e6818aSopenharmony_ci _device.log.error("get date exception error") 19276e6818aSopenharmony_ci _device.log.debug("get date exception: {}".format(exception)) 19376e6818aSopenharmony_ci else: 19476e6818aSopenharmony_ci name = "{}.{}".format(phone_time, name) 19576e6818aSopenharmony_ci path, save_name = cls.get_take_picture_path(_device, name, ext, exe_type) 19676e6818aSopenharmony_ci link = os.path.join(DeccVariable.cur_case().name, save_name) 19776e6818aSopenharmony_ci return path, link 19876e6818aSopenharmony_ci 19976e6818aSopenharmony_ci @classmethod 20076e6818aSopenharmony_ci def resize_image(cls, file_path, max_height=480, file_type="image"): 20176e6818aSopenharmony_ci width, height = 1080, 1920 20276e6818aSopenharmony_ci ratio = 1 20376e6818aSopenharmony_ci try: 20476e6818aSopenharmony_ci if os.path.exists(file_path): 20576e6818aSopenharmony_ci if file_type == "image": 20676e6818aSopenharmony_ci from PIL import Image 20776e6818aSopenharmony_ci img = Image.open(file_path) 20876e6818aSopenharmony_ci width, height = img.width, img.height 20976e6818aSopenharmony_ci img.close() 21076e6818aSopenharmony_ci elif file_type == "video": 21176e6818aSopenharmony_ci import cv2 21276e6818aSopenharmony_ci try: 21376e6818aSopenharmony_ci video_info = cv2.VideoCapture(file_path) 21476e6818aSopenharmony_ci width = int(video_info.get(cv2.CAP_PROP_FRAME_WIDTH)) 21576e6818aSopenharmony_ci height = int(video_info.get(cv2.CAP_PROP_FRAME_HEIGHT)) 21676e6818aSopenharmony_ci video_info.release() 21776e6818aSopenharmony_ci except Exception as e: 21876e6818aSopenharmony_ci log.warning("get video width and height error: {}, use default".format(e)) 21976e6818aSopenharmony_ci if width == 0 or height == 0: 22076e6818aSopenharmony_ci width, height = 1080, 1920 22176e6818aSopenharmony_ci if height < max_height: 22276e6818aSopenharmony_ci return 'width="%d" height="%d"' % (width, height) 22376e6818aSopenharmony_ci ratio = max_height / height 22476e6818aSopenharmony_ci except ImportError: 22576e6818aSopenharmony_ci log.error("Pillow or opencv-python is not installed ") 22676e6818aSopenharmony_ci except ZeroDivisionError: 22776e6818aSopenharmony_ci log.error("shot image height is 0") 22876e6818aSopenharmony_ci return 'width="%d" height="%d"' % (width * ratio, max_height) 22976e6818aSopenharmony_ci 23076e6818aSopenharmony_ci def terminate(self): 23176e6818aSopenharmony_ci if self.local_port is not None and isinstance(self.local_port, int): 23276e6818aSopenharmony_ci if hasattr(self._device, "is_oh") or \ 23376e6818aSopenharmony_ci self._device.usb_type == DeviceConnectorType.hdc: 23476e6818aSopenharmony_ci self._device.connector_command('fport rm tcp:{}'.format(self.local_port)) 23576e6818aSopenharmony_ci else: 23676e6818aSopenharmony_ci self._device.connector_command('forward --remove tcp:{}'.format(self.local_port)) 23776e6818aSopenharmony_ci if self.proc is not None: 23876e6818aSopenharmony_ci stop_standing_subprocess(self.proc) 23976e6818aSopenharmony_ci if self.thread is not None: 24076e6818aSopenharmony_ci start = time.time() 24176e6818aSopenharmony_ci # 任务结束要等图片生成完 24276e6818aSopenharmony_ci while self.thread.isAlive() and time.time() - start < 3: 24376e6818aSopenharmony_ci time.sleep(0.1) 244