176e6818aSopenharmony_ci#!/usr/bin/env python3
276e6818aSopenharmony_ci# coding=utf-8
376e6818aSopenharmony_ci
476e6818aSopenharmony_ci#
576e6818aSopenharmony_ci# Copyright (c) 2022 Huawei Device Co., Ltd.
676e6818aSopenharmony_ci# Licensed under the Apache License, Version 2.0 (the "License");
776e6818aSopenharmony_ci# you may not use this file except in compliance with the License.
876e6818aSopenharmony_ci# You may obtain a copy of the License at
976e6818aSopenharmony_ci#
1076e6818aSopenharmony_ci#     http://www.apache.org/licenses/LICENSE-2.0
1176e6818aSopenharmony_ci#
1276e6818aSopenharmony_ci# Unless required by applicable law or agreed to in writing, software
1376e6818aSopenharmony_ci# distributed under the License is distributed on an "AS IS" BASIS,
1476e6818aSopenharmony_ci# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1576e6818aSopenharmony_ci# See the License for the specific language governing permissions and
1676e6818aSopenharmony_ci# limitations under the License.
1776e6818aSopenharmony_ci#
1876e6818aSopenharmony_ci
1976e6818aSopenharmony_ciimport os
2076e6818aSopenharmony_ciimport platform
2176e6818aSopenharmony_ciimport shutil
2276e6818aSopenharmony_ciimport stat
2376e6818aSopenharmony_ciimport sys
2476e6818aSopenharmony_ciimport time
2576e6818aSopenharmony_ciimport zipfile
2676e6818aSopenharmony_ciimport hashlib
2776e6818aSopenharmony_ci
2876e6818aSopenharmony_cifrom devicetest.core.exception import DeviceTestError
2976e6818aSopenharmony_cifrom devicetest.core.variables import get_project_path
3076e6818aSopenharmony_cifrom devicetest.error import ErrorMessage
3176e6818aSopenharmony_cifrom devicetest.error import ErrorCategory
3276e6818aSopenharmony_cifrom devicetest.log.logger import DeviceTestLog as log
3376e6818aSopenharmony_ci
3476e6818aSopenharmony_ci
3576e6818aSopenharmony_cidef get_template_path(template_file_path, isdir=None):
3676e6818aSopenharmony_ci    """
3776e6818aSopenharmony_ci    @param template_file_path: Obtains the absolute path of the template screen cap path.
3876e6818aSopenharmony_ci    @param isdir: Obtain the directory: True; Obtain the file: False;
3976e6818aSopenharmony_ci                  None: Ignore the file type
4076e6818aSopenharmony_ci    """
4176e6818aSopenharmony_ci    template_file_path = template_file_path.replace("\\", "/")
4276e6818aSopenharmony_ci    if os.path.isabs(template_file_path) \
4376e6818aSopenharmony_ci            and (not isdir and os.path.isfile(template_file_path)):
4476e6818aSopenharmony_ci        return os.path.abspath(template_file_path)
4576e6818aSopenharmony_ci
4676e6818aSopenharmony_ci    # remove first str '/'
4776e6818aSopenharmony_ci    if not os.path.isfile(template_file_path) and template_file_path.startswith("/"):
4876e6818aSopenharmony_ci        template_file_path = template_file_path[1:]
4976e6818aSopenharmony_ci
5076e6818aSopenharmony_ci    _fol = None
5176e6818aSopenharmony_ci    if template_file_path.startswith("resource"):
5276e6818aSopenharmony_ci        path = template_file_path[9:]
5376e6818aSopenharmony_ci        from xdevice import EnvPool
5476e6818aSopenharmony_ci        if EnvPool.resource_path is not None:
5576e6818aSopenharmony_ci            folder = os.path.abspath(EnvPool.resource_path)
5676e6818aSopenharmony_ci            _fol = travesal_folder(folder, path, isdir)
5776e6818aSopenharmony_ci            if _fol is None:
5876e6818aSopenharmony_ci                log.debug("Not found [%s] in env pool path %s, "
5976e6818aSopenharmony_ci                          "continue to find template in resource path." % (
6076e6818aSopenharmony_ci                              path, folder))
6176e6818aSopenharmony_ci        if _fol is None:
6276e6818aSopenharmony_ci            ecotest_resource_path = getattr(sys, "ecotest_resource_path", "")
6376e6818aSopenharmony_ci            if ecotest_resource_path is not None:
6476e6818aSopenharmony_ci                folder = os.path.abspath(ecotest_resource_path)
6576e6818aSopenharmony_ci                _fol = travesal_folder(folder, path, isdir)
6676e6818aSopenharmony_ci                if _fol is None:
6776e6818aSopenharmony_ci                    log.debug("Not found [%s] in resource path %s, "
6876e6818aSopenharmony_ci                              "continue to find template in other path." % (
6976e6818aSopenharmony_ci                                  path, folder))
7076e6818aSopenharmony_ci    else:
7176e6818aSopenharmony_ci        _fol = get_resource_path(template_file_path)
7276e6818aSopenharmony_ci    log.debug("get template path:{}".format(_fol))
7376e6818aSopenharmony_ci    return _fol
7476e6818aSopenharmony_ci
7576e6818aSopenharmony_ci
7676e6818aSopenharmony_cidef get_resource_path(resource_file_path, isdir=None):
7776e6818aSopenharmony_ci    """
7876e6818aSopenharmony_ci    @param resource_file_path: Obtains the absolute path of the resource file.
7976e6818aSopenharmony_ci    @param isdir: Obtain the directory: True; Obtain the file: False;
8076e6818aSopenharmony_ci                  None: Ignore the file type
8176e6818aSopenharmony_ci    """
8276e6818aSopenharmony_ci    resource_file_path = resource_file_path.replace("\\", "/")
8376e6818aSopenharmony_ci    if os.path.isabs(resource_file_path) \
8476e6818aSopenharmony_ci            and ((isdir is None and os.path.exists(resource_file_path))
8576e6818aSopenharmony_ci                 or (not isdir and os.path.isfile(resource_file_path))
8676e6818aSopenharmony_ci                 or (isdir and os.path.isdir(resource_file_path))):
8776e6818aSopenharmony_ci        return os.path.abspath(resource_file_path)
8876e6818aSopenharmony_ci
8976e6818aSopenharmony_ci    _fol = None
9076e6818aSopenharmony_ci    from xdevice import EnvPool
9176e6818aSopenharmony_ci    if EnvPool.resource_path is not None:
9276e6818aSopenharmony_ci        folder = os.path.abspath(EnvPool.resource_path)
9376e6818aSopenharmony_ci        _fol = travesal_folder(folder, resource_file_path, isdir)
9476e6818aSopenharmony_ci        if _fol is None:
9576e6818aSopenharmony_ci            log.debug("Not found [%s] in env pool path %s, "
9676e6818aSopenharmony_ci                      "continue to find in project resource path." % (
9776e6818aSopenharmony_ci                          resource_file_path, folder))
9876e6818aSopenharmony_ci
9976e6818aSopenharmony_ci    if _fol is None:
10076e6818aSopenharmony_ci        ecotest_resource_path = getattr(sys, "ecotest_resource_path", "")
10176e6818aSopenharmony_ci        if ecotest_resource_path is not None:
10276e6818aSopenharmony_ci            folder = os.path.abspath(ecotest_resource_path)
10376e6818aSopenharmony_ci            _fol = travesal_folder(folder, resource_file_path, isdir)
10476e6818aSopenharmony_ci            if _fol is None:
10576e6818aSopenharmony_ci                log.debug("Not found [%s] in ecotest path %s, "
10676e6818aSopenharmony_ci                          "continue to find in suit path." % (
10776e6818aSopenharmony_ci                              resource_file_path, folder))
10876e6818aSopenharmony_ci
10976e6818aSopenharmony_ci    from devicetest.core.variables import DeccVariable
11076e6818aSopenharmony_ci    if _fol is None:
11176e6818aSopenharmony_ci        folder = os.path.abspath(DeccVariable.project.resource_path)
11276e6818aSopenharmony_ci        _fol = travesal_folder(folder, resource_file_path, isdir)
11376e6818aSopenharmony_ci        if _fol is None:
11476e6818aSopenharmony_ci            log.debug("Not found [%s] in product path %s, "
11576e6818aSopenharmony_ci                      "continue to find in project resource path." % (
11676e6818aSopenharmony_ci                          resource_file_path, folder))
11776e6818aSopenharmony_ci
11876e6818aSopenharmony_ci    if _fol is None:
11976e6818aSopenharmony_ci        folder = os.path.abspath(DeccVariable.project.test_suite_path)
12076e6818aSopenharmony_ci        _fol = travesal_folder(folder, resource_file_path, isdir)
12176e6818aSopenharmony_ci        if _fol is None:
12276e6818aSopenharmony_ci            log.debug("Not found [%s] in product path %s, "
12376e6818aSopenharmony_ci                      "continue to find in suit resource path." % (
12476e6818aSopenharmony_ci                          resource_file_path, folder))
12576e6818aSopenharmony_ci
12676e6818aSopenharmony_ci    if _fol is None:
12776e6818aSopenharmony_ci        folder = os.path.abspath(get_project_path())
12876e6818aSopenharmony_ci        _fol = travesal_folder(folder, resource_file_path, isdir)
12976e6818aSopenharmony_ci        if _fol is None:
13076e6818aSopenharmony_ci            log.debug("Not found [%s] in product path %s, "
13176e6818aSopenharmony_ci                      "continue to find in project path." % (
13276e6818aSopenharmony_ci                          resource_file_path, folder))
13376e6818aSopenharmony_ci
13476e6818aSopenharmony_ci    if _fol is None:
13576e6818aSopenharmony_ci        err_msg = ErrorMessage.Common.Code_0201008.format(resource_file_path)
13676e6818aSopenharmony_ci        log.error(err_msg)
13776e6818aSopenharmony_ci        raise DeviceTestError(err_msg)
13876e6818aSopenharmony_ci    log.debug("get resource path:{}".format(_fol))
13976e6818aSopenharmony_ci    return _fol
14076e6818aSopenharmony_ci
14176e6818aSopenharmony_ci
14276e6818aSopenharmony_cidef travesal_folder(folder, folder_file_path, isdir=False):
14376e6818aSopenharmony_ci    folder_file = os.path.join(folder, folder_file_path)
14476e6818aSopenharmony_ci    if (isdir is None and os.path.exists(folder_file)) \
14576e6818aSopenharmony_ci            or (not isdir and os.path.isfile(folder_file)) \
14676e6818aSopenharmony_ci            or (isdir and os.path.isdir(folder_file)):
14776e6818aSopenharmony_ci        return os.path.abspath(folder_file)
14876e6818aSopenharmony_ci
14976e6818aSopenharmony_ci    if not os.path.exists(folder):
15076e6818aSopenharmony_ci        return None
15176e6818aSopenharmony_ci
15276e6818aSopenharmony_ci    for child in os.listdir(folder):
15376e6818aSopenharmony_ci        if child == ".svn":
15476e6818aSopenharmony_ci            continue
15576e6818aSopenharmony_ci
15676e6818aSopenharmony_ci        folder_file = os.path.join(folder, child)
15776e6818aSopenharmony_ci        if os.path.isdir(folder_file):
15876e6818aSopenharmony_ci            if (isdir is None or isdir) \
15976e6818aSopenharmony_ci                    and folder_file.endswith(os.sep + folder_file_path):
16076e6818aSopenharmony_ci                return folder_file
16176e6818aSopenharmony_ci            else:
16276e6818aSopenharmony_ci                folder_ret = travesal_folder(folder_file,
16376e6818aSopenharmony_ci                                             folder_file_path, isdir)
16476e6818aSopenharmony_ci                if folder_ret is not None:
16576e6818aSopenharmony_ci                    return folder_ret
16676e6818aSopenharmony_ci        elif os.path.isfile(folder_file) \
16776e6818aSopenharmony_ci                and folder_file.endswith(os.sep + folder_file_path) \
16876e6818aSopenharmony_ci                and (isdir is None or not isdir):
16976e6818aSopenharmony_ci            return folder_file
17076e6818aSopenharmony_ci
17176e6818aSopenharmony_ci    return None
17276e6818aSopenharmony_ci
17376e6818aSopenharmony_ci
17476e6818aSopenharmony_cidef os_open_file_write(file_path, content, mode="w"):
17576e6818aSopenharmony_ci    try:
17676e6818aSopenharmony_ci        flags = os.O_WRONLY | os.O_CREAT
17776e6818aSopenharmony_ci        modes = stat.S_IWUSR | stat.S_IRUSR
17876e6818aSopenharmony_ci        dir_path = os.path.dirname(file_path)
17976e6818aSopenharmony_ci        if not os.path.isdir(dir_path):
18076e6818aSopenharmony_ci            os.makedirs(dir_path)
18176e6818aSopenharmony_ci        with os.fdopen(os.open(file_path, flags, modes), mode) as fout:
18276e6818aSopenharmony_ci            fout.write(content)
18376e6818aSopenharmony_ci    except Exception as error:
18476e6818aSopenharmony_ci        err_msg = ErrorMessage.Common.Code_0201009
18576e6818aSopenharmony_ci        log.error(err_msg, is_traceback=True)
18676e6818aSopenharmony_ci        raise DeviceTestError(err_msg) from error
18776e6818aSopenharmony_ci
18876e6818aSopenharmony_ci
18976e6818aSopenharmony_cidef os_open_file_read(file_path, mode="r"):
19076e6818aSopenharmony_ci    try:
19176e6818aSopenharmony_ci        flags = os.O_RDONLY
19276e6818aSopenharmony_ci        modes = stat.S_IWUSR | stat.S_IRUSR
19376e6818aSopenharmony_ci        with os.fdopen(os.open(file_path, flags, modes), mode) as fout:
19476e6818aSopenharmony_ci            return fout.read()
19576e6818aSopenharmony_ci    except FileNotFoundError as error:
19676e6818aSopenharmony_ci        err_msg = ErrorMessage.Common.Code_0201001.format(ErrorCategory.Environment, file_path)
19776e6818aSopenharmony_ci        log.error(err_msg, is_traceback=True)
19876e6818aSopenharmony_ci        raise DeviceTestError(err_msg) from error
19976e6818aSopenharmony_ci    except Exception as error:
20076e6818aSopenharmony_ci        err_msg = ErrorMessage.Common.Code_0201010
20176e6818aSopenharmony_ci        log.error(err_msg, is_traceback=True)
20276e6818aSopenharmony_ci        raise DeviceTestError(err_msg) from error
20376e6818aSopenharmony_ci
20476e6818aSopenharmony_ci
20576e6818aSopenharmony_cidef save_file(file_path, content):
20676e6818aSopenharmony_ci    os_open_file_write(file_path, content, "wb")
20776e6818aSopenharmony_ci
20876e6818aSopenharmony_ci
20976e6818aSopenharmony_cidef create_dir(create_path):
21076e6818aSopenharmony_ci    """
21176e6818aSopenharmony_ci    Creates a directory if it does not exist already.
21276e6818aSopenharmony_ci    Args:
21376e6818aSopenharmony_ci        create_path: The path of the directory to create.
21476e6818aSopenharmony_ci    """
21576e6818aSopenharmony_ci    full_path = os.path.abspath(os.path.expanduser(create_path))
21676e6818aSopenharmony_ci    if not os.path.exists(full_path):
21776e6818aSopenharmony_ci        os.makedirs(full_path, exist_ok=True)  # exist_ok=True
21876e6818aSopenharmony_ci
21976e6818aSopenharmony_ci
22076e6818aSopenharmony_cidef to_file(filename, content):
22176e6818aSopenharmony_ci    """
22276e6818aSopenharmony_ci    generate files
22376e6818aSopenharmony_ci    """
22476e6818aSopenharmony_ci    dirname = os.path.dirname(filename)
22576e6818aSopenharmony_ci    if not os.path.isdir(dirname):
22676e6818aSopenharmony_ci        os.makedirs(dirname)
22776e6818aSopenharmony_ci    os_open_file_write(filename, content, "wb")
22876e6818aSopenharmony_ci
22976e6818aSopenharmony_ci
23076e6818aSopenharmony_cidef delfile(filename):
23176e6818aSopenharmony_ci    try:
23276e6818aSopenharmony_ci        os.remove(filename)
23376e6818aSopenharmony_ci    except Exception as exception:
23476e6818aSopenharmony_ci        log.error(exception)
23576e6818aSopenharmony_ci        if os.path.isfile(filename):
23676e6818aSopenharmony_ci            if "nt" in sys.builtin_module_names:
23776e6818aSopenharmony_ci                os.remove(filename)
23876e6818aSopenharmony_ci            else:
23976e6818aSopenharmony_ci                shutil.rmtree(filename)
24076e6818aSopenharmony_ci
24176e6818aSopenharmony_ci    for _ in range(5):
24276e6818aSopenharmony_ci        if os.path.isfile(filename):
24376e6818aSopenharmony_ci            time.sleep(0.1)
24476e6818aSopenharmony_ci            continue
24576e6818aSopenharmony_ci        else:
24676e6818aSopenharmony_ci            break
24776e6818aSopenharmony_ci
24876e6818aSopenharmony_ci    if os.path.isfile(filename):
24976e6818aSopenharmony_ci        log.error("Delete file %s failed." % filename)
25076e6818aSopenharmony_ci
25176e6818aSopenharmony_ci
25276e6818aSopenharmony_cidef delfolder(dirname):
25376e6818aSopenharmony_ci    try:
25476e6818aSopenharmony_ci        shutil.rmtree(dirname)
25576e6818aSopenharmony_ci    except Exception as _:
25676e6818aSopenharmony_ci        if os.path.isdir(dirname):
25776e6818aSopenharmony_ci            shutil.rmtree(dirname)
25876e6818aSopenharmony_ci
25976e6818aSopenharmony_ci    for _ in range(5):
26076e6818aSopenharmony_ci        if os.path.isdir(dirname):
26176e6818aSopenharmony_ci            time.sleep(0.1)
26276e6818aSopenharmony_ci            continue
26376e6818aSopenharmony_ci        else:
26476e6818aSopenharmony_ci            break
26576e6818aSopenharmony_ci
26676e6818aSopenharmony_ci    if os.path.isdir(dirname):
26776e6818aSopenharmony_ci        log.error("Delete folder %s failed." % dirname)
26876e6818aSopenharmony_ci
26976e6818aSopenharmony_ci
27076e6818aSopenharmony_cidef copy_to_folder(src, des):
27176e6818aSopenharmony_ci    """Copy a folder and its children or a file to another folder.
27276e6818aSopenharmony_ci    """
27376e6818aSopenharmony_ci    src = os.path.normpath(src)
27476e6818aSopenharmony_ci    des = os.path.normpath(des)
27576e6818aSopenharmony_ci    if not os.path.exists(src):
27676e6818aSopenharmony_ci        log.error("No found [%s]" % src)
27776e6818aSopenharmony_ci        return
27876e6818aSopenharmony_ci    if not os.path.exists(des):
27976e6818aSopenharmony_ci        create_dir(des)
28076e6818aSopenharmony_ci    if not os.path.isdir(des):
28176e6818aSopenharmony_ci        log.error("[%s] is not a folder." % des)
28276e6818aSopenharmony_ci        return
28376e6818aSopenharmony_ci
28476e6818aSopenharmony_ci    if not os.path.isdir(src):
28576e6818aSopenharmony_ci        shutil.copy(src, des)
28676e6818aSopenharmony_ci        return
28776e6818aSopenharmony_ci    os.chdir(src)
28876e6818aSopenharmony_ci    src_file_list = [os.path.join(src, src_file)
28976e6818aSopenharmony_ci                     for src_file in os.listdir(des)]
29076e6818aSopenharmony_ci    for source in src_file_list:
29176e6818aSopenharmony_ci        if os.path.isfile(source):
29276e6818aSopenharmony_ci            shutil.copy(source, des)
29376e6818aSopenharmony_ci        if os.path.isdir(source):
29476e6818aSopenharmony_ci            _, src_name = os.path.split(source)
29576e6818aSopenharmony_ci            shutil.copytree(source, os.path.join(des, src_name))
29676e6818aSopenharmony_ci
29776e6818aSopenharmony_ci
29876e6818aSopenharmony_cidef delete_file_folder(src):
29976e6818aSopenharmony_ci    """
30076e6818aSopenharmony_ci    @summary: Delete files or directories.
30176e6818aSopenharmony_ci    """
30276e6818aSopenharmony_ci
30376e6818aSopenharmony_ci    if os.path.isfile(src):
30476e6818aSopenharmony_ci        delfile(src)
30576e6818aSopenharmony_ci    elif os.path.isdir(src):
30676e6818aSopenharmony_ci        delfolder(src)
30776e6818aSopenharmony_ci
30876e6818aSopenharmony_ci
30976e6818aSopenharmony_cidef get_file_md5(file_name: str):
31076e6818aSopenharmony_ci    """
31176e6818aSopenharmony_ci    @summary: Get MD5 hash of a file.
31276e6818aSopenharmony_ci    :param file_name:
31376e6818aSopenharmony_ci    :return:
31476e6818aSopenharmony_ci    """
31576e6818aSopenharmony_ci    if platform.system() == "Windows":
31676e6818aSopenharmony_ci        flags = os.O_RDONLY | os.O_BINARY
31776e6818aSopenharmony_ci    else:
31876e6818aSopenharmony_ci        flags = os.O_RDONLY
31976e6818aSopenharmony_ci    fd = os.open(file_name, flags, 0o644)
32076e6818aSopenharmony_ci    m = hashlib.md5()  # 创建md5对象
32176e6818aSopenharmony_ci    with os.fdopen(fd, mode="rb") as f:
32276e6818aSopenharmony_ci        while True:
32376e6818aSopenharmony_ci            data = f.read(4096)
32476e6818aSopenharmony_ci            if not data:
32576e6818aSopenharmony_ci                break
32676e6818aSopenharmony_ci            m.update(data)  # 更新md5对象
32776e6818aSopenharmony_ci    return m.hexdigest()
32876e6818aSopenharmony_ci
32976e6818aSopenharmony_ci
33076e6818aSopenharmony_cidef compress_and_remove(folder_path: str, archive_name: str) -> bool:
33176e6818aSopenharmony_ci    """
33276e6818aSopenharmony_ci    @summary: Compress and remove a file.
33376e6818aSopenharmony_ci    :param folder_path:
33476e6818aSopenharmony_ci    :param archive_name:
33576e6818aSopenharmony_ci    :return:
33676e6818aSopenharmony_ci    """
33776e6818aSopenharmony_ci    try:
33876e6818aSopenharmony_ci        shutil.make_archive(archive_name, 'zip', folder_path)
33976e6818aSopenharmony_ci        shutil.rmtree(folder_path)
34076e6818aSopenharmony_ci        return True
34176e6818aSopenharmony_ci    except Exception as e:
34276e6818aSopenharmony_ci        log.error(e)
34376e6818aSopenharmony_ci        return False
34476e6818aSopenharmony_ci
34576e6818aSopenharmony_ci
34676e6818aSopenharmony_cidef unzip_file(zip_path, extract_to):
34776e6818aSopenharmony_ci    """
34876e6818aSopenharmony_ci    解压指定的 ZIP 文件到目标目录
34976e6818aSopenharmony_ci
35076e6818aSopenharmony_ci    :param zip_path: ZIP 文件路径
35176e6818aSopenharmony_ci    :param extract_to: 解压到的目标目录
35276e6818aSopenharmony_ci    """
35376e6818aSopenharmony_ci    # 确保目标目录存在
35476e6818aSopenharmony_ci    if not os.path.exists(extract_to):
35576e6818aSopenharmony_ci        os.makedirs(extract_to)
35676e6818aSopenharmony_ci
35776e6818aSopenharmony_ci    # 打开 ZIP 文件
35876e6818aSopenharmony_ci    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
35976e6818aSopenharmony_ci        # 解压所有文件到目标目录
36076e6818aSopenharmony_ci        zip_ref.extractall(extract_to)
361