1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import platform
21import shutil
22import stat
23import sys
24import time
25import zipfile
26import hashlib
27
28from devicetest.core.exception import DeviceTestError
29from devicetest.core.variables import get_project_path
30from devicetest.error import ErrorMessage
31from devicetest.error import ErrorCategory
32from devicetest.log.logger import DeviceTestLog as log
33
34
35def get_template_path(template_file_path, isdir=None):
36    """
37    @param template_file_path: Obtains the absolute path of the template screen cap path.
38    @param isdir: Obtain the directory: True; Obtain the file: False;
39                  None: Ignore the file type
40    """
41    template_file_path = template_file_path.replace("\\", "/")
42    if os.path.isabs(template_file_path) \
43            and (not isdir and os.path.isfile(template_file_path)):
44        return os.path.abspath(template_file_path)
45
46    # remove first str '/'
47    if not os.path.isfile(template_file_path) and template_file_path.startswith("/"):
48        template_file_path = template_file_path[1:]
49
50    _fol = None
51    if template_file_path.startswith("resource"):
52        path = template_file_path[9:]
53        from xdevice import EnvPool
54        if EnvPool.resource_path is not None:
55            folder = os.path.abspath(EnvPool.resource_path)
56            _fol = travesal_folder(folder, path, isdir)
57            if _fol is None:
58                log.debug("Not found [%s] in env pool path %s, "
59                          "continue to find template in resource path." % (
60                              path, folder))
61        if _fol is None:
62            ecotest_resource_path = getattr(sys, "ecotest_resource_path", "")
63            if ecotest_resource_path is not None:
64                folder = os.path.abspath(ecotest_resource_path)
65                _fol = travesal_folder(folder, path, isdir)
66                if _fol is None:
67                    log.debug("Not found [%s] in resource path %s, "
68                              "continue to find template in other path." % (
69                                  path, folder))
70    else:
71        _fol = get_resource_path(template_file_path)
72    log.debug("get template path:{}".format(_fol))
73    return _fol
74
75
76def get_resource_path(resource_file_path, isdir=None):
77    """
78    @param resource_file_path: Obtains the absolute path of the resource file.
79    @param isdir: Obtain the directory: True; Obtain the file: False;
80                  None: Ignore the file type
81    """
82    resource_file_path = resource_file_path.replace("\\", "/")
83    if os.path.isabs(resource_file_path) \
84            and ((isdir is None and os.path.exists(resource_file_path))
85                 or (not isdir and os.path.isfile(resource_file_path))
86                 or (isdir and os.path.isdir(resource_file_path))):
87        return os.path.abspath(resource_file_path)
88
89    _fol = None
90    from xdevice import EnvPool
91    if EnvPool.resource_path is not None:
92        folder = os.path.abspath(EnvPool.resource_path)
93        _fol = travesal_folder(folder, resource_file_path, isdir)
94        if _fol is None:
95            log.debug("Not found [%s] in env pool path %s, "
96                      "continue to find in project resource path." % (
97                          resource_file_path, folder))
98
99    if _fol is None:
100        ecotest_resource_path = getattr(sys, "ecotest_resource_path", "")
101        if ecotest_resource_path is not None:
102            folder = os.path.abspath(ecotest_resource_path)
103            _fol = travesal_folder(folder, resource_file_path, isdir)
104            if _fol is None:
105                log.debug("Not found [%s] in ecotest path %s, "
106                          "continue to find in suit path." % (
107                              resource_file_path, folder))
108
109    from devicetest.core.variables import DeccVariable
110    if _fol is None:
111        folder = os.path.abspath(DeccVariable.project.resource_path)
112        _fol = travesal_folder(folder, resource_file_path, isdir)
113        if _fol is None:
114            log.debug("Not found [%s] in product path %s, "
115                      "continue to find in project resource path." % (
116                          resource_file_path, folder))
117
118    if _fol is None:
119        folder = os.path.abspath(DeccVariable.project.test_suite_path)
120        _fol = travesal_folder(folder, resource_file_path, isdir)
121        if _fol is None:
122            log.debug("Not found [%s] in product path %s, "
123                      "continue to find in suit resource path." % (
124                          resource_file_path, folder))
125
126    if _fol is None:
127        folder = os.path.abspath(get_project_path())
128        _fol = travesal_folder(folder, resource_file_path, isdir)
129        if _fol is None:
130            log.debug("Not found [%s] in product path %s, "
131                      "continue to find in project path." % (
132                          resource_file_path, folder))
133
134    if _fol is None:
135        err_msg = ErrorMessage.Common.Code_0201008.format(resource_file_path)
136        log.error(err_msg)
137        raise DeviceTestError(err_msg)
138    log.debug("get resource path:{}".format(_fol))
139    return _fol
140
141
142def travesal_folder(folder, folder_file_path, isdir=False):
143    folder_file = os.path.join(folder, folder_file_path)
144    if (isdir is None and os.path.exists(folder_file)) \
145            or (not isdir and os.path.isfile(folder_file)) \
146            or (isdir and os.path.isdir(folder_file)):
147        return os.path.abspath(folder_file)
148
149    if not os.path.exists(folder):
150        return None
151
152    for child in os.listdir(folder):
153        if child == ".svn":
154            continue
155
156        folder_file = os.path.join(folder, child)
157        if os.path.isdir(folder_file):
158            if (isdir is None or isdir) \
159                    and folder_file.endswith(os.sep + folder_file_path):
160                return folder_file
161            else:
162                folder_ret = travesal_folder(folder_file,
163                                             folder_file_path, isdir)
164                if folder_ret is not None:
165                    return folder_ret
166        elif os.path.isfile(folder_file) \
167                and folder_file.endswith(os.sep + folder_file_path) \
168                and (isdir is None or not isdir):
169            return folder_file
170
171    return None
172
173
174def os_open_file_write(file_path, content, mode="w"):
175    try:
176        flags = os.O_WRONLY | os.O_CREAT
177        modes = stat.S_IWUSR | stat.S_IRUSR
178        dir_path = os.path.dirname(file_path)
179        if not os.path.isdir(dir_path):
180            os.makedirs(dir_path)
181        with os.fdopen(os.open(file_path, flags, modes), mode) as fout:
182            fout.write(content)
183    except Exception as error:
184        err_msg = ErrorMessage.Common.Code_0201009
185        log.error(err_msg, is_traceback=True)
186        raise DeviceTestError(err_msg) from error
187
188
189def os_open_file_read(file_path, mode="r"):
190    try:
191        flags = os.O_RDONLY
192        modes = stat.S_IWUSR | stat.S_IRUSR
193        with os.fdopen(os.open(file_path, flags, modes), mode) as fout:
194            return fout.read()
195    except FileNotFoundError as error:
196        err_msg = ErrorMessage.Common.Code_0201001.format(ErrorCategory.Environment, file_path)
197        log.error(err_msg, is_traceback=True)
198        raise DeviceTestError(err_msg) from error
199    except Exception as error:
200        err_msg = ErrorMessage.Common.Code_0201010
201        log.error(err_msg, is_traceback=True)
202        raise DeviceTestError(err_msg) from error
203
204
205def save_file(file_path, content):
206    os_open_file_write(file_path, content, "wb")
207
208
209def create_dir(create_path):
210    """
211    Creates a directory if it does not exist already.
212    Args:
213        create_path: The path of the directory to create.
214    """
215    full_path = os.path.abspath(os.path.expanduser(create_path))
216    if not os.path.exists(full_path):
217        os.makedirs(full_path, exist_ok=True)  # exist_ok=True
218
219
220def to_file(filename, content):
221    """
222    generate files
223    """
224    dirname = os.path.dirname(filename)
225    if not os.path.isdir(dirname):
226        os.makedirs(dirname)
227    os_open_file_write(filename, content, "wb")
228
229
230def delfile(filename):
231    try:
232        os.remove(filename)
233    except Exception as exception:
234        log.error(exception)
235        if os.path.isfile(filename):
236            if "nt" in sys.builtin_module_names:
237                os.remove(filename)
238            else:
239                shutil.rmtree(filename)
240
241    for _ in range(5):
242        if os.path.isfile(filename):
243            time.sleep(0.1)
244            continue
245        else:
246            break
247
248    if os.path.isfile(filename):
249        log.error("Delete file %s failed." % filename)
250
251
252def delfolder(dirname):
253    try:
254        shutil.rmtree(dirname)
255    except Exception as _:
256        if os.path.isdir(dirname):
257            shutil.rmtree(dirname)
258
259    for _ in range(5):
260        if os.path.isdir(dirname):
261            time.sleep(0.1)
262            continue
263        else:
264            break
265
266    if os.path.isdir(dirname):
267        log.error("Delete folder %s failed." % dirname)
268
269
270def copy_to_folder(src, des):
271    """Copy a folder and its children or a file to another folder.
272    """
273    src = os.path.normpath(src)
274    des = os.path.normpath(des)
275    if not os.path.exists(src):
276        log.error("No found [%s]" % src)
277        return
278    if not os.path.exists(des):
279        create_dir(des)
280    if not os.path.isdir(des):
281        log.error("[%s] is not a folder." % des)
282        return
283
284    if not os.path.isdir(src):
285        shutil.copy(src, des)
286        return
287    os.chdir(src)
288    src_file_list = [os.path.join(src, src_file)
289                     for src_file in os.listdir(des)]
290    for source in src_file_list:
291        if os.path.isfile(source):
292            shutil.copy(source, des)
293        if os.path.isdir(source):
294            _, src_name = os.path.split(source)
295            shutil.copytree(source, os.path.join(des, src_name))
296
297
298def delete_file_folder(src):
299    """
300    @summary: Delete files or directories.
301    """
302
303    if os.path.isfile(src):
304        delfile(src)
305    elif os.path.isdir(src):
306        delfolder(src)
307
308
309def get_file_md5(file_name: str):
310    """
311    @summary: Get MD5 hash of a file.
312    :param file_name:
313    :return:
314    """
315    if platform.system() == "Windows":
316        flags = os.O_RDONLY | os.O_BINARY
317    else:
318        flags = os.O_RDONLY
319    fd = os.open(file_name, flags, 0o644)
320    m = hashlib.md5()  # 创建md5对象
321    with os.fdopen(fd, mode="rb") as f:
322        while True:
323            data = f.read(4096)
324            if not data:
325                break
326            m.update(data)  # 更新md5对象
327    return m.hexdigest()
328
329
330def compress_and_remove(folder_path: str, archive_name: str) -> bool:
331    """
332    @summary: Compress and remove a file.
333    :param folder_path:
334    :param archive_name:
335    :return:
336    """
337    try:
338        shutil.make_archive(archive_name, 'zip', folder_path)
339        shutil.rmtree(folder_path)
340        return True
341    except Exception as e:
342        log.error(e)
343        return False
344
345
346def unzip_file(zip_path, extract_to):
347    """
348    解压指定的 ZIP 文件到目标目录
349
350    :param zip_path: ZIP 文件路径
351    :param extract_to: 解压到的目标目录
352    """
353    # 确保目标目录存在
354    if not os.path.exists(extract_to):
355        os.makedirs(extract_to)
356
357    # 打开 ZIP 文件
358    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
359        # 解压所有文件到目标目录
360        zip_ref.extractall(extract_to)
361