176e6818aSopenharmony_ci#!/usr/bin/env python3
276e6818aSopenharmony_ci# coding=utf-8
376e6818aSopenharmony_ci
476e6818aSopenharmony_ci#
576e6818aSopenharmony_ci# Copyright (c) 2022 Huawei Device Co., Ltd.
676e6818aSopenharmony_ci# Licensed under the Apache License, Version 2.0 (the "License");
776e6818aSopenharmony_ci# you may not use this file except in compliance with the License.
876e6818aSopenharmony_ci# You may obtain a copy of the License at
976e6818aSopenharmony_ci#
1076e6818aSopenharmony_ci#     http://www.apache.org/licenses/LICENSE-2.0
1176e6818aSopenharmony_ci#
1276e6818aSopenharmony_ci# Unless required by applicable law or agreed to in writing, software
1376e6818aSopenharmony_ci# distributed under the License is distributed on an "AS IS" BASIS,
1476e6818aSopenharmony_ci# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1576e6818aSopenharmony_ci# See the License for the specific language governing permissions and
1676e6818aSopenharmony_ci# limitations under the License.
1776e6818aSopenharmony_ci#
1876e6818aSopenharmony_ciimport os
1976e6818aSopenharmony_ciimport time
2076e6818aSopenharmony_ci
2176e6818aSopenharmony_cifrom devicetest.core.variables import DeccVariable
2276e6818aSopenharmony_cifrom devicetest.log.logger import DeviceTestLog as log
2376e6818aSopenharmony_cifrom devicetest.utils.file_util import create_dir
2476e6818aSopenharmony_cifrom xdevice import stop_standing_subprocess
2576e6818aSopenharmony_cifrom xdevice import DeviceConnectorType
2676e6818aSopenharmony_cifrom xdevice import TestDeviceState
2776e6818aSopenharmony_ci
2876e6818aSopenharmony_ciLOCAL_IP = "127.0.0.1"
2976e6818aSopenharmony_ciLOCAL_PORT = 6001
3076e6818aSopenharmony_ciURL = "/"
3176e6818aSopenharmony_ciFORWARD_PORT = 9501
3276e6818aSopenharmony_ci
3376e6818aSopenharmony_ci
3476e6818aSopenharmony_ciclass ScreenAgent:
3576e6818aSopenharmony_ci    SCREEN_AGENT_MAP = {}
3676e6818aSopenharmony_ci
3776e6818aSopenharmony_ci    def __init__(self, device):
3876e6818aSopenharmony_ci        self._device = device
3976e6818aSopenharmony_ci        self.log = device.log
4076e6818aSopenharmony_ci        self.proc = None
4176e6818aSopenharmony_ci        self.thread = None
4276e6818aSopenharmony_ci        self.local_port = None
4376e6818aSopenharmony_ci        self.is_server_started = False
4476e6818aSopenharmony_ci
4576e6818aSopenharmony_ci    def __del__(self):
4676e6818aSopenharmony_ci        self.terminate()
4776e6818aSopenharmony_ci
4876e6818aSopenharmony_ci    @classmethod
4976e6818aSopenharmony_ci    def get_instance(cls, _device):
5076e6818aSopenharmony_ci        _device.log.debug("in get instance.")
5176e6818aSopenharmony_ci        instance_sn = _device.device_sn
5276e6818aSopenharmony_ci        if instance_sn in ScreenAgent.SCREEN_AGENT_MAP:
5376e6818aSopenharmony_ci            return ScreenAgent.SCREEN_AGENT_MAP[instance_sn]
5476e6818aSopenharmony_ci
5576e6818aSopenharmony_ci        agent = ScreenAgent(_device)
5676e6818aSopenharmony_ci        ScreenAgent.SCREEN_AGENT_MAP[instance_sn] = agent
5776e6818aSopenharmony_ci        _device.log.debug("out get instance.")
5876e6818aSopenharmony_ci        return agent
5976e6818aSopenharmony_ci
6076e6818aSopenharmony_ci    @classmethod
6176e6818aSopenharmony_ci    def remove_instance(cls, _device):
6276e6818aSopenharmony_ci        _sn = _device.device_sn
6376e6818aSopenharmony_ci        if _sn in ScreenAgent.SCREEN_AGENT_MAP:
6476e6818aSopenharmony_ci            ScreenAgent.SCREEN_AGENT_MAP[_sn].terminate()
6576e6818aSopenharmony_ci            del ScreenAgent.SCREEN_AGENT_MAP[_sn]
6676e6818aSopenharmony_ci
6776e6818aSopenharmony_ci    @classmethod
6876e6818aSopenharmony_ci    def get_screenshot_dir(cls):
6976e6818aSopenharmony_ci        base_path = DeccVariable.cur_case().case_screenshot_dir
7076e6818aSopenharmony_ci        return os.path.join(base_path, DeccVariable.cur_case().suite_name, DeccVariable.cur_case().name)
7176e6818aSopenharmony_ci
7276e6818aSopenharmony_ci    @classmethod
7376e6818aSopenharmony_ci    def get_take_picture_path(cls, _device, picture_name,
7476e6818aSopenharmony_ci                              ext=".png", exe_type="takeImage"):
7576e6818aSopenharmony_ci        """新增参数exeType,默认值为takeImage;可取值takeImage/dumpWindow"""
7676e6818aSopenharmony_ci        if os.path.isfile(picture_name):
7776e6818aSopenharmony_ci            folder = os.path.dirname(picture_name)
7876e6818aSopenharmony_ci            create_dir(folder)
7976e6818aSopenharmony_ci            return picture_name, os.path.basename(picture_name)
8076e6818aSopenharmony_ci
8176e6818aSopenharmony_ci        folder = cls.get_screenshot_dir()
8276e6818aSopenharmony_ci        create_dir(folder)
8376e6818aSopenharmony_ci        if picture_name.endswith(ext):
8476e6818aSopenharmony_ci            picture_name = picture_name.strip(ext)
8576e6818aSopenharmony_ci
8676e6818aSopenharmony_ci        if exe_type == "takeImage":
8776e6818aSopenharmony_ci            save_name = "{}.{}{}{}".format(
8876e6818aSopenharmony_ci                _device.device_sn.replace("?", "sn").replace(":", "_"), picture_name,
8976e6818aSopenharmony_ci                DeccVariable.cur_case().image_num, ext)
9076e6818aSopenharmony_ci        elif exe_type == "videoRecord":
9176e6818aSopenharmony_ci            save_name = "{}.{}{}{}".format(
9276e6818aSopenharmony_ci                _device.device_sn.replace("?", "sn").replace(":", "_"), picture_name,
9376e6818aSopenharmony_ci                DeccVariable.cur_case().video_num, ext)
9476e6818aSopenharmony_ci        elif exe_type == "stepImage":
9576e6818aSopenharmony_ci            save_name = "{}.{}{}".format(
9676e6818aSopenharmony_ci                _device.device_sn.replace("?", "sn").replace(":", "_"), picture_name, ext)
9776e6818aSopenharmony_ci        else:
9876e6818aSopenharmony_ci            save_name = "{}.{}{}{}".format(
9976e6818aSopenharmony_ci                _device.device_sn.replace("?", "sn").replace(":", "_"), picture_name,
10076e6818aSopenharmony_ci                DeccVariable.cur_case().dump_xml_num, ext)
10176e6818aSopenharmony_ci
10276e6818aSopenharmony_ci        fol_path = os.path.join(folder, save_name)
10376e6818aSopenharmony_ci        if exe_type == "takeImage":
10476e6818aSopenharmony_ci            DeccVariable.cur_case().image_num += 1
10576e6818aSopenharmony_ci        elif exe_type == "videoRecord":
10676e6818aSopenharmony_ci            DeccVariable.cur_case().video_num += 1
10776e6818aSopenharmony_ci        else:
10876e6818aSopenharmony_ci            if exe_type != "stepImage":
10976e6818aSopenharmony_ci                DeccVariable.cur_case().dump_xml_num += 1
11076e6818aSopenharmony_ci        return fol_path, save_name
11176e6818aSopenharmony_ci
11276e6818aSopenharmony_ci    @classmethod
11376e6818aSopenharmony_ci    def screen_take_picture(cls, args, result, _ta=None, is_raise_exception=True):
11476e6818aSopenharmony_ci        # When the phone is off, you can set the screenshot off function
11576e6818aSopenharmony_ci        pass
11676e6818aSopenharmony_ci
11776e6818aSopenharmony_ci    @classmethod
11876e6818aSopenharmony_ci    def _do_capture(cls, _device, link, path, title, ext=".png"):
11976e6818aSopenharmony_ci        # 设备处于断开状态,不执行截图
12076e6818aSopenharmony_ci        if hasattr(_device, 'test_device_state') and _device.test_device_state != TestDeviceState.ONLINE:
12176e6818aSopenharmony_ci            _device.log.warning("device is offline")
12276e6818aSopenharmony_ci            return '', ''
12376e6818aSopenharmony_ci
12476e6818aSopenharmony_ci        if hasattr(_device, "capture"):
12576e6818aSopenharmony_ci            # 截图需要设备对象实现capture方法
12676e6818aSopenharmony_ci            link, path = _device.capture(link, path, ext)
12776e6818aSopenharmony_ci            # 压缩图片为80%
12876e6818aSopenharmony_ci            cls.compress_image(path)
12976e6818aSopenharmony_ci        else:
13076e6818aSopenharmony_ci            _device.log.debug("The device not implement capture function, don't capture!")
13176e6818aSopenharmony_ci        if path and link:
13276e6818aSopenharmony_ci            _device.log.info(
13376e6818aSopenharmony_ci                '<a href="{}" target="_blank">Screenshot: {}'
13476e6818aSopenharmony_ci                '<img style="display: block;" {} title="{}" src="{}"/>'
13576e6818aSopenharmony_ci                '</a>'.format(link, path, cls.resize_image(path), title, link))
13676e6818aSopenharmony_ci        return path, link
13776e6818aSopenharmony_ci
13876e6818aSopenharmony_ci    @classmethod
13976e6818aSopenharmony_ci    def __screen_and_save_picture(cls, _device, name, ext=".png", exe_type="takeImage"):
14076e6818aSopenharmony_ci        """
14176e6818aSopenharmony_ci        @summary: 截取设备屏幕图片并保存
14276e6818aSopenharmony_ci        @param  name: 保存的图片名称,通过getTakePicturePath方法获取保存全路径
14376e6818aSopenharmony_ci                ext: 保存图片后缀,支持".png"、".jpg"格式
14476e6818aSopenharmony_ci        """
14576e6818aSopenharmony_ci        path, link = cls.get_image_dir_path(_device, name, ext, exe_type=exe_type)
14676e6818aSopenharmony_ci        # 截图文件后缀在方法内可能发生更改
14776e6818aSopenharmony_ci        return cls._do_capture(_device, link, path, name, ext)
14876e6818aSopenharmony_ci
14976e6818aSopenharmony_ci    @classmethod
15076e6818aSopenharmony_ci    def capture_step_picture(cls, file_name, step_name, _device, ext=".png"):
15176e6818aSopenharmony_ci        """截取step步骤图片并保存
15276e6818aSopenharmony_ci        file_name: str, 保存的图片名称
15376e6818aSopenharmony_ci        step_name: str, step步骤名称
15476e6818aSopenharmony_ci        _device  : object, the device object to capture
15576e6818aSopenharmony_ci        ext : str, 保存图片后缀,支持".png"、".jpg"格式
15676e6818aSopenharmony_ci        """
15776e6818aSopenharmony_ci        try:
15876e6818aSopenharmony_ci            path, save_name = cls.get_take_picture_path(_device, file_name, ext, exe_type="stepImage")
15976e6818aSopenharmony_ci            link = os.path.join(DeccVariable.cur_case().name, save_name)
16076e6818aSopenharmony_ci            # 截图文件后缀在方法内可能发生更改
16176e6818aSopenharmony_ci            return cls._do_capture(_device, link, path, step_name, ext)
16276e6818aSopenharmony_ci        except Exception as e:
16376e6818aSopenharmony_ci            log.error(f"take screenshot on step failed, reason: {e}")
16476e6818aSopenharmony_ci        return '', ''
16576e6818aSopenharmony_ci
16676e6818aSopenharmony_ci    @classmethod
16776e6818aSopenharmony_ci    def compress_image(cls, img_path, ratio=0.5, quality=80):
16876e6818aSopenharmony_ci        try:
16976e6818aSopenharmony_ci            import cv2
17076e6818aSopenharmony_ci            import numpy as np
17176e6818aSopenharmony_ci            pic = cv2.imdecode(np.fromfile(img_path, dtype=np.uint8), -1)
17276e6818aSopenharmony_ci            height, width, deep = pic.shape
17376e6818aSopenharmony_ci            width, height = (width * ratio, height * ratio)
17476e6818aSopenharmony_ci            pic = cv2.resize(pic, (int(width), int(height)))
17576e6818aSopenharmony_ci            params = [cv2.IMWRITE_JPEG_QUALITY, quality]
17676e6818aSopenharmony_ci            cv2.imencode('.jpeg', pic, params=params)[1].tofile(img_path)
17776e6818aSopenharmony_ci        except (ImportError, NameError):
17876e6818aSopenharmony_ci            pass
17976e6818aSopenharmony_ci
18076e6818aSopenharmony_ci    @classmethod
18176e6818aSopenharmony_ci    def get_image_dir_path(cls, _device, name, ext=".png", exe_type="takeImage"):
18276e6818aSopenharmony_ci        """
18376e6818aSopenharmony_ci        增加了 exeType参数,默认为takeImage;可取值:takeImage/dumpWindow
18476e6818aSopenharmony_ci        """
18576e6818aSopenharmony_ci        try:
18676e6818aSopenharmony_ci            if hasattr(_device, "is_oh") or hasattr(_device, "is_mac"):
18776e6818aSopenharmony_ci                phone_time = _device.execute_shell_command("date '+%Y%m%d_%H%M%S'").strip()
18876e6818aSopenharmony_ci            else:
18976e6818aSopenharmony_ci                phone_time = _device.connector.shell("date '+%Y%m%d_%H%M%S'").strip()
19076e6818aSopenharmony_ci        except Exception as exception:
19176e6818aSopenharmony_ci            _device.log.error("get date exception error")
19276e6818aSopenharmony_ci            _device.log.debug("get date exception: {}".format(exception))
19376e6818aSopenharmony_ci        else:
19476e6818aSopenharmony_ci            name = "{}.{}".format(phone_time, name)
19576e6818aSopenharmony_ci        path, save_name = cls.get_take_picture_path(_device, name, ext, exe_type)
19676e6818aSopenharmony_ci        link = os.path.join(DeccVariable.cur_case().name, save_name)
19776e6818aSopenharmony_ci        return path, link
19876e6818aSopenharmony_ci
19976e6818aSopenharmony_ci    @classmethod
20076e6818aSopenharmony_ci    def resize_image(cls, file_path, max_height=480, file_type="image"):
20176e6818aSopenharmony_ci        width, height = 1080, 1920
20276e6818aSopenharmony_ci        ratio = 1
20376e6818aSopenharmony_ci        try:
20476e6818aSopenharmony_ci            if os.path.exists(file_path):
20576e6818aSopenharmony_ci                if file_type == "image":
20676e6818aSopenharmony_ci                    from PIL import Image
20776e6818aSopenharmony_ci                    img = Image.open(file_path)
20876e6818aSopenharmony_ci                    width, height = img.width, img.height
20976e6818aSopenharmony_ci                    img.close()
21076e6818aSopenharmony_ci                elif file_type == "video":
21176e6818aSopenharmony_ci                    import cv2
21276e6818aSopenharmony_ci                    try:
21376e6818aSopenharmony_ci                        video_info = cv2.VideoCapture(file_path)
21476e6818aSopenharmony_ci                        width = int(video_info.get(cv2.CAP_PROP_FRAME_WIDTH))
21576e6818aSopenharmony_ci                        height = int(video_info.get(cv2.CAP_PROP_FRAME_HEIGHT))
21676e6818aSopenharmony_ci                        video_info.release()
21776e6818aSopenharmony_ci                    except Exception as e:
21876e6818aSopenharmony_ci                        log.warning("get video width and height error: {}, use default".format(e))
21976e6818aSopenharmony_ci                    if width == 0 or height == 0:
22076e6818aSopenharmony_ci                        width, height = 1080, 1920
22176e6818aSopenharmony_ci            if height < max_height:
22276e6818aSopenharmony_ci                return 'width="%d" height="%d"' % (width, height)
22376e6818aSopenharmony_ci            ratio = max_height / height
22476e6818aSopenharmony_ci        except ImportError:
22576e6818aSopenharmony_ci            log.error("Pillow or opencv-python is not installed ")
22676e6818aSopenharmony_ci        except ZeroDivisionError:
22776e6818aSopenharmony_ci            log.error("shot image height is 0")
22876e6818aSopenharmony_ci        return 'width="%d" height="%d"' % (width * ratio, max_height)
22976e6818aSopenharmony_ci
23076e6818aSopenharmony_ci    def terminate(self):
23176e6818aSopenharmony_ci        if self.local_port is not None and isinstance(self.local_port, int):
23276e6818aSopenharmony_ci            if hasattr(self._device, "is_oh") or \
23376e6818aSopenharmony_ci                    self._device.usb_type == DeviceConnectorType.hdc:
23476e6818aSopenharmony_ci                self._device.connector_command('fport rm tcp:{}'.format(self.local_port))
23576e6818aSopenharmony_ci            else:
23676e6818aSopenharmony_ci                self._device.connector_command('forward --remove tcp:{}'.format(self.local_port))
23776e6818aSopenharmony_ci        if self.proc is not None:
23876e6818aSopenharmony_ci            stop_standing_subprocess(self.proc)
23976e6818aSopenharmony_ci        if self.thread is not None:
24076e6818aSopenharmony_ci            start = time.time()
24176e6818aSopenharmony_ci            # 任务结束要等图片生成完
24276e6818aSopenharmony_ci            while self.thread.isAlive() and time.time() - start < 3:
24376e6818aSopenharmony_ci                time.sleep(0.1)
244