1#!/usr/bin/env python3
2# coding: utf-8
3
4"""
5Copyright (c) 2023 Huawei Device Co., Ltd.
6Licensed under the Apache License, Version 2.0 (the "License");
7you may not use this file except in compliance with the License.
8You may obtain a copy of the License at
9
10    http://www.apache.org/licenses/LICENSE-2.0
11
12Unless required by applicable law or agreed to in writing, software
13distributed under the License is distributed on an "AS IS" BASIS,
14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15See the License for the specific language governing permissions and
16limitations under the License.
17
18Description: execute test tasks
19"""
20import copy
21import json
22import logging
23import os
24import re
25import shutil
26import signal
27import subprocess
28import tarfile
29import time
30import zipfile
31from contextlib import contextmanager
32
33import json5
34
35import options
36import utils
37
38
39class FullTest:
40    @staticmethod
41    def prepare_full_task(task, test_name):
42        if test_name in task.full_compilation_info:
43            full_task = task.full_compilation_info[test_name]
44        else:
45            full_task = options.FullCompilationInfo()
46            full_task.name = test_name
47            task.full_compilation_info[test_name] = full_task
48        return full_task
49
50    @staticmethod
51    def full_compile(task, is_debug):
52        test_name = "full_compile"
53        logging.info(f"==========> Running {test_name} for task: {task.name}")
54        clean_compile(task)
55
56        full_task = FullTest.prepare_full_task(task, test_name)
57        [stdout, stderr] = compile_project(task, is_debug)
58        passed = validate(full_task, task, is_debug, stdout, stderr, f'{test_name}')
59        if passed:
60            backup_compile_output(task, is_debug)
61
62        return passed
63
64    @staticmethod
65    def compile_full_import_ordinary_ohpm_package(task, is_debug):
66        test_name = 'import_ordinary_ohpm_package'
67        clean_compile(task)
68        full_task = FullTest.prepare_full_task(task, test_name)
69        info = full_task.debug_info if is_debug else full_task.release_info
70        logging.info(f"==========> Running {test_name} for task: {task.name}")
71
72        try:
73            modify_file = os.path.join(task.path, *task.inc_modify_file)
74            patch_content = (options.configs.get('patch_content').get('ohpm_package_patch')
75                             .get('ordinary_package'))
76            head_content = patch_content.get('head')
77            tail_content = patch_content.get('tail')
78            utils.add_content_to_file(modify_file, head_content, tail_content)
79            build_passed, build_time = is_build_module_successful(task, is_debug, info,
80                                                                  '', f'full_compile_{test_name}')
81            if not build_passed:
82                return
83            package_name = patch_content.get('name')
84            is_included = is_npm_txt_included_ohpm_package(info, task, is_debug, package_name)
85            if is_included:
86                info.result = options.TaskResult.passed
87                info.time = build_time
88        finally:
89            utils.remove_content_from_file(modify_file, head_content, tail_content)
90
91    @staticmethod
92    def compile_full_import_special_ohpm_package(task, is_debug):
93        test_name = 'import_special_ohpm_package'
94        clean_compile(task)
95        full_task = FullTest.prepare_full_task(task, test_name)
96        info = full_task.debug_info if is_debug else full_task.release_info
97        logging.info(f"==========> Running {test_name} for task: {task.name}")
98
99        try:
100            modify_file = os.path.join(task.path, *task.inc_modify_file)
101            patch_content = (options.configs.get('patch_content').get('ohpm_package_patch')
102                             .get('special_package'))
103            head_content = patch_content.get('head')
104            tail_content = patch_content.get('tail')
105            utils.add_content_to_file(modify_file, head_content, tail_content)
106            build_passed, build_time = is_build_module_successful(task, is_debug, info,
107                                                                  '', f'full_compile_{test_name}')
108            if not build_passed:
109                logging.error(f'Test:{test_name} failed,due to full compilation failed')
110                return
111            package_name = patch_content.get('name')
112            disasm_file_path = get_disasm_abc_file(task, info, 'Hap')
113            is_contained = utils.file_contains_specified_fields(disasm_file_path, package_name)
114            if is_contained:
115                info.result = options.TaskResult.passed
116                info.time = build_time
117        finally:
118            utils.remove_content_from_file(modify_file, head_content, tail_content)
119
120    @staticmethod
121    def compile_full_import_static_library(task, is_debug):
122        test_name = 'import_static_library'
123        clean_compile(task)
124        full_task = FullTest.prepare_full_task(task, test_name)
125        info = full_task.debug_info if is_debug else full_task.release_info
126        logging.info(f"==========> Running {test_name} for task: {task.name}")
127
128        with manage_module_import_and_export_handle(task, 'Har'):
129            build_passed, build_time = is_build_module_successful(task, is_debug, info, '',
130                                                                  f'full_compile_{test_name}')
131            if not build_passed:
132                logging.error(f'Test:{test_name} failed,due to full compilation failed')
133                return
134            pa_file = get_disasm_abc_file(task, info, 'Hap')
135            if not pa_file:
136                return
137            is_packaged = is_package_modules_to_module_abc(task, pa_file, task.har_module)
138            if is_packaged:
139                info.result = options.TaskResult.passed
140                info.time = build_time
141            else:
142                info.result = options.TaskResult.failed
143                info.error_message = f'Har was not properly packaged into module abc'
144
145    @staticmethod
146    def compile_full_import_share_library(task, is_debug):
147        test_name = 'import_share_library'
148        clean_compile(task)
149        full_task = FullTest.prepare_full_task(task, test_name)
150        info = full_task.debug_info if is_debug else full_task.release_info
151        logging.info(f"==========> Running {test_name} for task: {task.name}")
152
153        with manage_module_import_and_export_handle(task, 'Hsp'):
154            build_passed, build_time = is_build_module_successful(task, is_debug, info, '',
155                                                                  f'full_compile_{test_name}')
156            if not build_passed:
157                logging.error(f'Test:{test_name} failed,due to full compilation failed')
158                return
159            pa_file = get_disasm_abc_file(task, info, 'Hap')
160            if not pa_file:
161                return
162            is_packaged = is_package_modules_to_module_abc(task, pa_file, task.hsp_module)
163            if not is_packaged:
164                info.result = options.TaskResult.passed
165                info.time = build_time
166            else:
167                info.result = options.TaskResult.failed
168                info.error_message = f'Unexpected changes have occurred.Hsp should not be packaged into module abc'
169
170    @staticmethod
171    def compile_full_import_so_file(task, is_debug):
172        test_name = 'import_so_file'
173        clean_compile(task)
174        full_task = FullTest.prepare_full_task(task, test_name)
175        info = full_task.debug_info if is_debug else full_task.release_info
176        logging.info(f"==========> Running {test_name} for task: {task.name}")
177
178        with manage_module_import_and_export_handle(task, 'Cpp'):
179            [stdout, stderr] = compile_project(task, is_debug)
180            is_success, build_time = is_compile_success(stdout)
181            if not is_success:
182                logging.error(f'Test:{test_name} failed,due to full compilation failed')
183                info.result = options.TaskResult.failed
184                info.error_message = stderr
185                return
186            validate(full_task, task, is_debug, stdout, stderr)
187
188    @staticmethod
189    def compile_full_has_syntax_error_in_js(task, is_debug):
190        test_name = 'has_syntax_error_in_js'
191        clean_compile(task)
192        full_task = FullTest.prepare_full_task(task, test_name)
193        info = full_task.debug_info if is_debug else full_task.release_info
194        logging.info(f"==========> Running {test_name} for task: {task.name}")
195
196        try:
197            add_or_delete_js_file(task, 1, True)
198            patch_lines_error = options.configs.get('patch_content').get('patch_lines_error')
199            expected_error = patch_lines_error.get('expected_error')
200            [stdout, stderr] = compile_project(task, is_debug)
201            is_passed = is_get_expected_error(info, stderr, expected_error)
202            if is_passed:
203                info.result = options.TaskResult.passed
204
205        finally:
206            add_or_delete_js_file(task, 0)
207
208    @staticmethod
209    def compile_full_use_normalize_ohmurl(task, is_debug):
210        test_name = 'use_normalize_ohmurl'
211        clean_compile(task)
212        full_task = FullTest.prepare_full_task(task, test_name)
213        info = full_task.debug_info if is_debug else full_task.release_info
214        logging.info(f"==========> Running {test_name} for task: {task.name}")
215
216        modify_normalize_ohmurl_options(task, 1)
217        try:
218            with manage_module_import_and_export_handle(task, 'Har'):
219                is_build_passed, build_time = is_build_module_successful(task, is_debug, info,
220                                                                         '', f'full_compile_{test_name}')
221                if not is_build_passed:
222                    logging.error(f'Test:{test_name},full compilation failed with use normalize ohmurl option')
223                    return
224                is_passed = is_normalized_ohm_url(task, is_debug, info)
225                if is_passed:
226                    info.result = options.TaskResult.passed
227                    info.time = build_time
228        finally:
229            modify_normalize_ohmurl_options(task, 0)
230
231    @staticmethod
232    def compile_full_module_name_is_inconsistent(task, is_debug):
233        test_name = 'module_name_is_inconsistent'
234        clean_compile(task)
235        full_task = FullTest.prepare_full_task(task, test_name)
236        info = full_task.debug_info if is_debug else full_task.release_info
237        logging.info(f"==========> Running {test_name} for task: {task.name}")
238
239        oh_package_json_path = os.path.join(task.path, *task.hap_module_path, 'oh-package.json5')
240        json_data = {}
241        try:
242            with open(oh_package_json_path, 'r+', encoding='utf-8') as json_file:
243                json_data = json5.load(json_file)
244                bak_data = copy.deepcopy(json_data)
245                dependencies_dic = json_data["dependencies"]
246                patch_lines = options.configs.get('patch_content').get('patch_lines_1')
247                dependency_name = utils.extract_library_names(patch_lines.get('har').get('head'))
248                module_name = task.har_module.capitalize()
249                dependencies_dic[dependency_name] = os.path.normpath(f"file:../{module_name}")
250                json_file.seek(0)
251                json.dump(json_data, json_file, indent=4)
252                json_file.truncate()
253            sync_project(task)
254
255            with manage_module_import_and_export_handle(task, 'Har'):
256                is_build_successful, build_time = is_build_module_successful(task, is_debug, info,
257                    '', 'full_compile_module_name_is_inconsistent')
258                if is_build_successful:
259                    info.result = options.TaskResult.passed
260                    info.time = build_time
261        finally:
262            with open(oh_package_json_path, 'w', encoding='utf-8') as json_file:
263                json.dump(bak_data, json_file, indent=4)
264            sync_project(task)
265
266
267class IncrementalTest:
268    @staticmethod
269    def validate_module_name_change(task, inc_task, is_debug, stdout, stderr, new_module_name):
270        output_file = get_compile_output_file_path(task, is_debug, options.OutputType.unsigned)
271        output_dir = os.path.dirname(output_file)
272        output_file_name = os.path.basename(output_file)
273        output_file_name_items = output_file_name.split(
274            '-')  # hap name format: entry-default.hap
275        output_file_name_items[0] = new_module_name
276        output_file_name = '-'.join(output_file_name_items)
277        new_module_name_output_file = os.path.join(
278            output_dir, output_file_name)
279
280        logging.debug(f"new module hap file: {new_module_name_output_file}")
281
282        passed = validate(inc_task, task, is_debug, stdout,
283                          stderr, 'incremental_compile_change_module_name',
284                          new_module_name_output_file)
285        logging.debug(f"validate new module hap file, passed {passed}")
286        if not passed:
287            return
288
289        if is_debug:
290            inc_info = inc_task.debug_info
291        else:
292            inc_info = inc_task.release_info
293        uncompressed_output_file = new_module_name_output_file + '.uncompressed'
294        with zipfile.ZipFile(new_module_name_output_file, 'r') as zip_ref:
295            zip_ref.extractall(uncompressed_output_file)
296
297        abc_path = os.path.join(uncompressed_output_file, 'ets')
298        modules_abc_path = os.path.join(abc_path, 'modules.abc')
299        modules_pa = disasm_abc(task, modules_abc_path)
300        if not modules_pa or not os.path.exists(modules_pa):
301            inc_info.result = options.TaskResult.failed
302            inc_info.error_message = 'ark_disasm failed, module name change not verified'
303            return
304
305        func_str = ''
306        with open(modules_pa, 'r', encoding='utf-8') as pa:
307            line = pa.readline()
308            while line:
309                if '.function' in line.strip():
310                    func_str = line.strip()
311                    break
312                line = pa.readline()
313
314        func_define_items = func_str.split('.')
315        if not new_module_name in func_define_items:
316            inc_info.result = options.TaskResult.failed
317            inc_info.error_message = f'expected entry name {new_module_name} in function name, \
318                                     actual function name: {func_str}'
319
320        shutil.rmtree(uncompressed_output_file)
321
322    @staticmethod
323    def is_file_in_modified_files(task_type, backup_file_relative_path, modified_cache_files):
324        if 'stage' in task_type:
325            return backup_file_relative_path in modified_cache_files
326        else:
327            non_temporary_path = backup_file_relative_path.split("temporary")[
328                1].lstrip(os.path.sep)
329            logging.debug(f"non_temporary_path: {non_temporary_path}")
330            for file in modified_cache_files:
331                logging.debug(f"modified_cache_files file: {file}")
332                if non_temporary_path in file:
333                    return True
334        return False
335
336    @staticmethod
337    def validate_compile_incremental_file(task, inc_task, is_debug, modified_files, module=''):
338        module_path = utils.get_module_path(task, module)
339        if is_debug:
340            cache_path = os.path.join(
341                task.path, *module_path, *task.build_path, *task.cache_path, 'debug')
342            backup_path = task.backup_info.cache_debug
343            inc_info = inc_task.debug_info
344        else:
345            cache_path = os.path.join(
346                task.path, *module_path, *task.build_path, *task.cache_path, 'release')
347            backup_path = task.backup_info.cache_release
348            inc_info = inc_task.release_info
349
350        validate_cache_file(task, inc_info, modified_files, cache_path, backup_path)
351
352    @staticmethod
353    def prepare_incremental_task(task, test_name):
354        if test_name in task.incre_compilation_info:
355            inc_task = task.incre_compilation_info[test_name]
356        else:
357            inc_task = options.IncCompilationInfo()
358            inc_task.name = test_name
359            task.incre_compilation_info[test_name] = inc_task
360        return inc_task
361
362    @staticmethod
363    def compile_incremental_no_modify(task, is_debug):
364        test_name = 'no_change'
365        inc_task = IncrementalTest.prepare_incremental_task(task, test_name)
366
367        logging.info(f"==========> Running {test_name} for task: {task.name}")
368        [stdout, stderr] = compile_project(task, is_debug)
369        passed = validate(inc_task, task, is_debug, stdout, stderr, 'incremental_compile_no_change')
370        if passed:
371            IncrementalTest.validate_compile_incremental_file(
372                task, inc_task, is_debug, [])
373
374    @staticmethod
375    def compile_incremental_add_oneline(task, is_debug):
376        test_name = 'add_oneline'
377        inc_task = IncrementalTest.prepare_incremental_task(task, test_name)
378
379        logging.info(f"==========> Running {test_name} for task: {task.name}")
380        modify_file_item = task.inc_modify_file
381        modify_file = os.path.join(task.path, *modify_file_item)
382        modify_file_backup = modify_file + ".bak"
383        shutil.copyfile(modify_file, modify_file_backup)
384
385        with open(modify_file, 'a', encoding='utf-8') as file:
386            file.write(options.configs.get('patch_content').get(
387                'patch_lines_2').get('tail'))
388
389        [stdout, stderr] = compile_project(task, is_debug)
390        passed = validate(inc_task, task, is_debug, stdout, stderr, 'incremental_compile_add_oneline')
391        if passed:
392            modified_files = [os.path.join(*modify_file_item)]
393            IncrementalTest.validate_compile_incremental_file(
394                task, inc_task, is_debug, modified_files)
395
396        shutil.move(modify_file_backup, modify_file)
397
398    @staticmethod
399    def compile_incremental_add_file(task, is_debug):
400        test_name = 'add_file'
401        inc_task = IncrementalTest.prepare_incremental_task(task, test_name)
402
403        logging.info(f"==========> Running {test_name} for task: {task.name}")
404        modify_file_item = task.inc_modify_file
405        modify_file = os.path.join(task.path, *modify_file_item)
406        modify_file_backup = modify_file + ".bak"
407        shutil.copyfile(modify_file, modify_file_backup)
408
409        modify_dir = os.path.dirname(modify_file)
410        if 'js' in task.type:
411            patch_content = options.configs.get(
412                'patch_content').get('patch_new_file_js')
413            new_file_name = patch_content.get('name')
414            new_file_content = patch_content.get('content')
415        else:
416            patch_content = options.configs.get(
417                'patch_content').get('patch_new_file_ets')
418            new_file_name = patch_content.get('name')
419            new_file_content = patch_content.get('content')
420        new_file = os.path.join(modify_dir, new_file_name)
421
422        with open(new_file, 'w', encoding='utf-8') as file:
423            file.writelines(new_file_content)
424
425        with open(modify_file, 'r+', encoding='utf-8') as file:
426            old_content = file.read()
427            file.seek(0)
428            patch_lines = options.configs.get(
429                'patch_content').get('patch_lines_1').get('js')
430            file.write(patch_lines.get('head'))
431            file.write(old_content)
432            file.write(patch_lines.get('tail'))
433
434        [stdout, stderr] = compile_project(task, is_debug)
435        passed = validate(inc_task, task, is_debug, stdout, stderr, 'incremental_compile_add_file')
436        if passed:
437            modified_files = [os.path.join(*modify_file_item)]
438            IncrementalTest.validate_compile_incremental_file(
439                task, inc_task, is_debug, modified_files)
440
441        shutil.move(modify_file_backup, modify_file)
442        os.remove(new_file)
443
444    @staticmethod
445    def compile_incremental_add_nonexistent_file(task, is_debug):
446        test_name = 'add_nonexistent_file'
447        inc_task = IncrementalTest.prepare_incremental_task(task, test_name)
448
449        logging.info(f"==========> Running {test_name} for task: {task.name}")
450
451        modify_file_item = task.inc_modify_file
452        modify_file = os.path.join(task.path, *modify_file_item)
453        modify_file_backup = modify_file + ".bak"
454        shutil.copyfile(modify_file, modify_file_backup)
455
456        with open(modify_file, 'r+', encoding='utf-8') as file:
457            old_content = file.read()
458            file.seek(0)
459            patch_lines = options.configs.get(
460                'patch_content').get('patch_lines_1').get('js')
461            file.write(patch_lines.get('head'))
462            file.write(old_content)
463            file.write(patch_lines.get('tail'))
464
465        info = inc_task.debug_info if is_debug else inc_task.release_info
466        expected_errors = options.configs.get('patch_content').get('patch_file_error').get('expected_error')
467        [stdout, stderr] = compile_project(task, is_debug)
468        passed = is_get_expected_error(info, stderr, expected_errors)
469        if passed:
470            logging.info("The first compilation file does not exist. The compilation fails as expected")
471
472            modify_dir = os.path.dirname(modify_file)
473            if 'js' in task.type:
474                patch_content = options.configs.get(
475                    'patch_content').get('patch_new_file_js')
476                new_file_name = patch_content.get('name')
477                new_file_content = patch_content.get('content')
478            else:
479                patch_content = options.configs.get(
480                    'patch_content').get('patch_new_file_ets')
481                new_file_name = patch_content.get('name')
482                new_file_content = patch_content.get('content')
483            new_file = os.path.join(modify_dir, new_file_name)
484
485            with open(new_file, 'w', encoding='utf-8') as file:
486                file.writelines(new_file_content)
487
488            [stdout, stderr] = compile_project(task, is_debug)
489            passed = validate(inc_task, task, is_debug, stdout, stderr, 'incremental_compile_add_nonexistent_file')
490            if passed:
491                modified_files = [os.path.join(*modify_file_item)]
492                IncrementalTest.validate_compile_incremental_file(
493                    task, inc_task, is_debug, modified_files)
494            os.remove(new_file)
495        shutil.move(modify_file_backup, modify_file)
496
497    @staticmethod
498    def compile_incremental_delete_file(task, is_debug):
499        test_name = 'delete_file'
500        inc_task = IncrementalTest.prepare_incremental_task(task, test_name)
501
502        logging.info(f"==========> Running {test_name} for task: {task.name}")
503        # this test is after 'add_file', and in test 'add_file' already done remove file,
504        # so here just call compile
505        [stdout, stderr] = compile_project(task, is_debug)
506        passed = validate(inc_task, task, is_debug, stdout, stderr, 'incremental_compile_delete_file')
507        if passed:
508            modify_file_item = task.inc_modify_file
509            modified_files = [os.path.join(*modify_file_item)]
510            IncrementalTest.validate_compile_incremental_file(
511                task, inc_task, is_debug, modified_files)
512
513    @staticmethod
514    def compile_incremental_reverse_hap_mode(task, is_debug):
515        test_name = 'reverse_hap_mode'
516        inc_task = IncrementalTest.prepare_incremental_task(task, test_name)
517
518        logging.info(f"==========> Running {test_name} for task: {task.name}")
519        hap_mode = not is_debug
520        [stdout, stderr] = compile_project(task, hap_mode)
521        validate(inc_task, task, hap_mode, stdout, stderr, 'incremental_compile_reverse_hap_mode')
522
523    @staticmethod
524    def compile_incremental_modify_module_name(task, is_debug):
525        if 'stage' not in task.type:
526            return
527
528        test_name = 'change_module_name'
529        inc_task = IncrementalTest.prepare_incremental_task(task, test_name)
530
531        logging.info(f"==========> Running {test_name} for task: {task.name}")
532        # modify build-profile.json5
533        profile_file = os.path.join(task.path, 'build-profile.json5')
534        profile_file_backup = profile_file + ".bak"
535        shutil.copyfile(profile_file, profile_file_backup)
536
537        with open(profile_file, 'r', encoding='utf-8') as file:
538            profile_data = json5.load(file)
539        new_module_name = "new_entry"
540        logging.debug(f"profile_data is: {profile_data}")
541        for module in profile_data['modules']:
542            if module['name'] == task.hap_module:
543                module['name'] = new_module_name
544                break
545        with open(profile_file, 'w') as file:
546            json5.dump(profile_data, file)
547
548        # modify module.json5 for stage mode
549        config_file_dir = os.path.join(task.path, *task.hap_module_path, 'src', 'main')
550        config_file = os.path.join(config_file_dir, 'module.json5')
551        config_file_backup = config_file + ".bak"
552        shutil.copyfile(config_file, config_file_backup)
553
554        with open(config_file, 'r') as file:
555            config_data = json5.load(file)
556        config_data['module']['name'] = new_module_name
557        with open(config_file, 'w') as file:
558            json5.dump(config_data, file)
559
560        try:
561            cmd = get_hvigor_compile_cmd(task, is_debug, 'Hap', new_module_name)
562            [stdout, stderr] = compile_project(task, is_debug, cmd)
563            IncrementalTest.validate_module_name_change(
564                task, inc_task, is_debug, stdout, stderr, new_module_name)
565        except Exception as e:
566            logging.exception(e)
567        finally:
568            shutil.move(profile_file_backup, profile_file)
569            shutil.move(config_file_backup, config_file)
570
571    @staticmethod
572    def compile_incremental_build_modify_error_then_fix(task, is_debug):
573        test_name = 'modify_error_then_fix'
574        inc_task = IncrementalTest.prepare_incremental_task(task, test_name)
575        info = inc_task.debug_info if is_debug else inc_task.release_info
576        logging.info(f"==========> Running {test_name} for task: {task.name}")
577
578        modify_file = os.path.join(task.path, *task.inc_modify_file)
579        patch_lines_error = options.configs.get('patch_content').get('patch_lines_error')
580        error_content = patch_lines_error.get('content')
581        with open(modify_file, 'a', encoding='utf-8') as file:
582            file.write(error_content)
583
584        try:
585            [stdout, stderr] = compile_project(task, is_debug)
586            is_passed = is_get_expected_error(info, stderr, patch_lines_error.get('expected_error'))
587            if not is_passed:
588                logging.error(f"task: {task.name}failed to get expected error, skip second build")
589                return
590        finally:
591            utils.remove_content_from_file(modify_file, '',
592                                           patch_lines_error.get('content'))
593        is_build_successful, build_time = is_build_module_successful(task, is_debug,
594            info, '', 'incremental_compile_modify_error_then_fix')
595        if is_build_successful:
596            info.result = options.TaskResult.passed
597            info.time = build_time
598            modify_file_item = task.inc_modify_file
599            modified_files = [os.path.join(*modify_file_item)]
600            IncrementalTest.validate_compile_incremental_file(
601                task, inc_task, is_debug, modified_files)
602
603    @staticmethod
604    def compile_incremental_build_add_error_page(task, is_debug):
605        test_name = 'add_error_page_then_fix'
606        inc_task = IncrementalTest.prepare_incremental_task(task, test_name)
607        info = inc_task.debug_info if is_debug else inc_task.release_info
608        logging.info(f"==========> Running {test_name} for task: {task.name}")
609
610        add_or_delete_page(task, 1, True)
611        try:
612            expected_errors = options.configs.get('patch_content').get('patch_lines_error').get('content')
613            [stdout, stderr] = compile_project(task, is_debug)
614            if not is_get_expected_error(info, stderr, expected_errors):
615                return
616            add_or_delete_page(task, 0, True)
617            second_incremental, build_time = is_build_module_successful(task, is_debug, info,
618                '', 'incremental_compile_add_error_page_then_fix')
619
620            if second_incremental:
621                info.result = options.TaskResult.passed
622                info.time = build_time
623                modify_file_item = task.inc_modify_file
624                modified_files = [os.path.join(*modify_file_item)]
625                IncrementalTest.validate_compile_incremental_file(
626                    task, inc_task, is_debug, modified_files)
627        finally:
628            add_or_delete_page(task, 0)
629
630    @staticmethod
631    def compile_incremental_build_add_error_non_page(task, is_debug):
632        test_name = 'add_error_non_page_then_fix'
633        inc_task = IncrementalTest.prepare_incremental_task(task, test_name)
634        info = inc_task.debug_info if is_debug else inc_task.release_info
635        logging.info(f"==========> Running {test_name} for task: {task.name}")
636
637        add_or_delete_js_file(task, 1, True)
638        try:
639            expected_errors = options.configs.get('patch_content').get('patch_lines_error').get('content')
640            [stdout, stderr] = compile_project(task, is_debug)
641            if not is_get_expected_error(info, stderr, expected_errors):
642                return
643            add_or_delete_js_file(task, 0, True)
644            second_incremental, build_time = is_build_module_successful(task, is_debug, info,
645                '', 'incremental_compile_add_error_non_page_then_fix')
646
647            if second_incremental:
648                info.result = options.TaskResult.passed
649                info.time = build_time
650                modify_file_item = task.inc_modify_file
651                modified_files = [os.path.join(*modify_file_item)]
652                IncrementalTest.validate_compile_incremental_file(
653                    task, inc_task, is_debug, modified_files)
654        finally:
655            add_or_delete_js_file(task, 0)
656
657    @staticmethod
658    def compile_incremental_build_modify_sdk_version(task, is_debug):
659        test_name = 'modify_sdk_version'
660        inc_task = IncrementalTest.prepare_incremental_task(task, test_name)
661        info = inc_task.debug_info if is_debug else inc_task.release_info
662        logging.info(f"==========> Running {test_name} for task: {task.name}")
663
664        try:
665            first_incremental, first_build_time = is_build_module_successful(task, is_debug, info)
666            if not first_incremental:
667                return
668            # The default project uses api12, change it to api11
669            modify_sdk_version(task, 11)
670            second_incremental, second_build_time = is_build_module_successful(task, is_debug, info,
671                '', 'incremental_compile_modify_sdk_version')
672
673            if second_incremental:
674                info.result = options.TaskResult.passed
675                info.time = first_build_time + second_build_time
676        finally:
677            modify_sdk_version(task, 12)
678
679    @staticmethod
680    def compile_incremental_build_entry_then_har(task, is_debug):
681        test_name = 'build_entry_then_har'
682        inc_task = IncrementalTest.prepare_incremental_task(task, test_name)
683        info = inc_task.debug_info if is_debug else inc_task.release_info
684        logging.info(f"==========> Running {test_name} for task: {task.name}")
685
686        with manage_module_import_and_export_handle(task, 'Har'):
687            first_incremental, first_build_time = is_build_module_successful(task, is_debug, info)
688            if not first_incremental:
689                return
690            second_incremental, second_build_time = is_build_module_successful(task, is_debug, info,
691                'Har', 'incremental_compile_build_entry_then_har')
692
693            if second_incremental:
694                info.result = options.TaskResult.passed
695                info.time = first_build_time + second_build_time
696                modify_file_item = task.inc_modify_file
697                modified_files = [os.path.join(*modify_file_item)]
698                IncrementalTest.validate_compile_incremental_file(
699                    task, inc_task, is_debug, modified_files)
700
701    @staticmethod
702    def compile_incremental_build_har_then_entry(task, is_debug):
703        test_name = 'build_har_then_entry'
704        inc_task = IncrementalTest.prepare_incremental_task(task, test_name)
705        info = inc_task.debug_info if is_debug else inc_task.release_info
706        logging.info(f"==========> Running {test_name} for task: {task.name}")
707
708        with manage_module_import_and_export_handle(task, 'Har'):
709            first_incremental, first_build_time = is_build_module_successful(task, is_debug, info, 'Har')
710            if not first_incremental:
711                return
712            second_incremental, second_build_time = is_build_module_successful(task, is_debug, info,
713                '', 'incremental_compile_build_har_then_entry')
714
715            if second_incremental:
716                info.result = options.TaskResult.passed
717                info.time = first_build_time + second_build_time
718                modify_file_item = task.inc_modify_file
719                modified_files = [os.path.join(*modify_file_item)]
720                IncrementalTest.validate_compile_incremental_file(
721                    task, inc_task, is_debug, modified_files)
722
723    @staticmethod
724    def compile_incremental_build_entry_then_hsp(task, is_debug):
725        test_name = 'build_entry_then_hsp'
726        inc_task = IncrementalTest.prepare_incremental_task(task, test_name)
727        info = inc_task.debug_info if is_debug else inc_task.release_info
728        logging.info(f"==========> Running {test_name} for task: {task.name}")
729
730        with manage_module_import_and_export_handle(task, 'Hsp'):
731            first_incremental, first_build_time = is_build_module_successful(task, is_debug, info)
732            if not first_incremental:
733                return
734            second_incremental, second_build_time = is_build_module_successful(task, is_debug, info,
735                'Hsp', 'incremental_compile_build_entry_then_hsp')
736
737            if second_incremental:
738                info.result = options.TaskResult.passed
739                info.time = first_build_time + second_build_time
740                modify_file_item = task.inc_modify_file
741                modified_files = [os.path.join(*modify_file_item)]
742                IncrementalTest.validate_compile_incremental_file(
743                    task, inc_task, is_debug, modified_files)
744
745    @staticmethod
746    def compile_incremental_build_hsp_then_entry(task, is_debug):
747        test_name = 'build_hsp_then_entry'
748        inc_task = IncrementalTest.prepare_incremental_task(task, test_name)
749        info = inc_task.debug_info if is_debug else inc_task.release_info
750        logging.info(f"==========> Running {test_name} for task: {task.name}")
751
752        with manage_module_import_and_export_handle(task, 'Hsp'):
753            first_incremental, first_build_time = is_build_module_successful(task, is_debug, info, 'Hsp')
754            if not first_incremental:
755                return
756            second_incremental, second_build_time = is_build_module_successful(task, is_debug, info,
757                '', 'incremental_compile_build_hsp_then_entry')
758
759            if second_incremental:
760                info.result = options.TaskResult.passed
761                info.time = first_build_time + second_build_time
762                modify_file_item = task.inc_modify_file
763                modified_files = [os.path.join(*modify_file_item)]
764                IncrementalTest.validate_compile_incremental_file(
765                    task, inc_task, is_debug, modified_files)
766
767    @staticmethod
768    def compile_incremental_build_hsp_then_ohos(task, is_debug):
769        if not is_debug or 'ohosTest' not in task.type:
770            return
771        test_name = 'build_hsp_then_ohos'
772        inc_task = IncrementalTest.prepare_incremental_task(task, test_name)
773        info = inc_task.debug_info if is_debug else inc_task.release_info
774        logging.info(f"==========> Running {test_name} for task: {task.name}")
775
776        with manage_module_import_and_export_handle(task, 'Hsp'):
777            first_incremental, first_build_time = is_build_module_successful(task, is_debug, info, 'Hsp')
778            if not first_incremental:
779                return
780            second_incremental, second_build_time = is_build_ohos_test_successful(task, info)
781
782            if second_incremental:
783                info.result = options.TaskResult.passed
784                info.time = first_build_time + second_build_time
785                modify_file_item = task.inc_modify_file
786                modified_files = [os.path.join(*modify_file_item)]
787                IncrementalTest.validate_compile_incremental_file(
788                    task, inc_task, is_debug, modified_files)
789
790    @staticmethod
791    def compile_incremental_build_entry_then_ohos(task, is_debug):
792        if not is_debug:
793            return
794        test_name = 'build_entry_then_ohos'
795        inc_task = IncrementalTest.prepare_incremental_task(task, test_name)
796        info = inc_task.debug_info if is_debug else inc_task.release_info
797        logging.info(f"==========> Running {test_name} for task: {task.name}")
798
799        first_incremental, first_build_time = is_build_module_successful(task, is_debug, info)
800        if not first_incremental:
801            return
802        second_incremental, second_build_time = is_build_ohos_test_successful(task, info)
803
804        if second_build_time:
805            info.result = options.TaskResult.passed
806            info.time = first_build_time + second_build_time
807            modify_file_item = task.inc_modify_file
808            modified_files = [os.path.join(*modify_file_item)]
809            IncrementalTest.validate_compile_incremental_file(
810                task, inc_task, is_debug, modified_files)
811
812    @staticmethod
813    def compile_incremental_build_entry_then_preview_build(task, is_debug):
814        test_name = 'build_entry_then_preview_build'
815        inc_task = IncrementalTest.prepare_incremental_task(task, test_name)
816        info = inc_task.debug_info if is_debug else inc_task.release_info
817        logging.info(f"==========> Running {test_name} for task: {task.name}")
818
819        first_incremental, first_build_time = is_build_module_successful(task, is_debug, inc_task)
820        if not first_incremental:
821            return
822        second_incremental, second_build_time = preview_mode_build(info, task, is_debug)
823
824        if second_incremental:
825            info.result = options.TaskResult.passed
826            info.time = first_build_time + second_build_time
827            modify_file_item = task.inc_modify_file
828            modified_files = [os.path.join(*modify_file_item)]
829            IncrementalTest.validate_compile_incremental_file(
830                task, inc_task, is_debug, modified_files)
831
832
833class BytecodeHarTest:
834    @staticmethod
835    def prepare_bytecode_har_task(task, test_name):
836        if test_name in task.bytecode_har_compilation_info:
837            bytecode_har_task = task.bytecode_har_compilation_info[test_name]
838        else:
839            bytecode_har_task = options.BytecodeHarCompilationInfo()
840            bytecode_har_task.name = test_name
841            task.bytecode_har_compilation_info[test_name] = bytecode_har_task
842        return bytecode_har_task
843
844    @staticmethod
845    def build_bytecode_har(task, is_debug):
846        test_name = 'build_bytecode_har'
847        clean_compile(task)
848        bytecode_har_task = BytecodeHarTest.prepare_bytecode_har_task(task, test_name)
849        info = bytecode_har_task.debug_info if is_debug else bytecode_har_task.release_info
850        logging.info(f"==========> Running {test_name} for task: {task.name}")
851
852        modify_bytecode_har_config(task, 1)
853        try:
854            is_passed, build_time = is_build_module_successful(task, is_debug, info, 'BytecodeHar')
855            if is_passed:
856                info.result = options.TaskResult.passed
857                info.time = build_time
858        finally:
859            modify_bytecode_har_config(task, 0)
860
861    @staticmethod
862    def build_har_then_bytecode_har(task, is_debug):
863        if is_debug:
864            return
865        test_name = 'build_har_then_bytecode_har'
866        clean_compile(task)
867        bytecode_har_task = BytecodeHarTest.prepare_bytecode_har_task(task, test_name)
868        info = bytecode_har_task.release_info
869        logging.info(f"==========> Running {test_name} for task: {task.name}")
870
871        is_passed, build_time = is_build_module_successful(task, is_debug, info, 'Har')
872        if not is_passed:
873            logging.error(f'build {task.har_module} failed')
874            return
875
876        modify_bytecode_har_config(task, 1)
877        try:
878            is_passed, build_time = is_build_module_successful(task, is_debug, info, 'BytecodeHar')
879            if is_passed:
880                info.result = options.TaskResult.passed
881                info.time = build_time
882        finally:
883            modify_bytecode_har_config(task, 0)
884
885    @staticmethod
886    def import_bytecode_static_library(task, is_debug):
887        test_name = 'import_bytecode_static_library'
888        clean_compile(task)
889        bytecode_har_task = BytecodeHarTest.prepare_bytecode_har_task(task, test_name)
890        info = bytecode_har_task.debug_info if is_debug else bytecode_har_task.release_info
891        logging.info(f"==========> Running {test_name} for task: {task.name}")
892
893        modify_bytecode_har_config(task, 1)
894        try:
895            with manage_bytecode_har_dependency(task, is_debug, info, 'Har'):
896                cmd = get_hvigor_compile_cmd(task, is_debug)
897                [stdout, stderr] = compile_project(task, is_debug, cmd)
898                [is_success, time_string] = is_compile_success(stdout)
899                if not is_success:
900                    info.result = options.TaskResult.failed
901                    info.error_message = f'Full compile failed due to build {task.hap_module} module.'
902                    logging.error(f'build {task.hap_module} failed')
903                    return
904                else:
905                    info.result = options.TaskResult.passed
906                    info.time = collect_compile_time(time_string)
907                if options.arguments.run_haps:
908                    runtime_passed = run_compile_output(info, task, is_debug, 'import_bytecode_static_library')
909        finally:
910            modify_bytecode_har_config(task, 0)
911
912
913class ExternalTest:
914    @staticmethod
915    def prepare_current_task(task, test_name):
916        if test_name in task.external_compilation_info:
917            current_task = task.external_compilation_info[test_name]
918        else:
919            current_task = options.ExternalCompilationInfo()
920            current_task.name = test_name
921            task.external_compilation_info[test_name] = current_task
922        return current_task
923
924    @staticmethod
925    def get_external_task():
926        external_task = options.create_test_tasks(options.configs.get('external_haps'))[0]
927        return external_task
928
929    @staticmethod
930    def import_external_share_library(task, is_debug):
931        test_name = 'import_external_share_library'
932        external_task = ExternalTest.get_external_task()
933        clean_compile(task)
934        clean_compile(external_task)
935        current_task = ExternalTest.prepare_current_task(task, test_name)
936        info = current_task.debug_info if is_debug else current_task.release_info
937        logging.info(f"==========> Running {test_name} for task: {task.name}")
938
939        with manage_module_import_and_export_handle(task, 'ExternalHsp'):
940            cmd = get_hvigor_compile_cmd(task, is_debug, '')
941            [stdout, stderr] = compile_project(task, is_debug, cmd)
942            [is_success, time_string] = is_compile_success(stdout)
943            if not is_success:
944                info.result = options.TaskResult.failed
945                info.error_message = f'Full compile failed due to {task.hap_module} module'
946                logging.error(f'Full compile failed due to {task.hap_module} module')
947                return
948            passed = validate_compile_output(info, task, is_debug, '', '')
949            if not passed:
950                info.result = options.TaskResult.failed
951                info.error_message = f'Validate failed due to {task.hap_module} module'
952                logging.error(f'Validate failed due to {task.hap_module} module')
953                return
954            pa_file = get_disasm_abc_file(task, info, 'Hap')
955            if not pa_file:
956                return
957            is_packaged = is_package_modules_to_module_abc(task, pa_file, external_task.hsp_module)
958            if not is_packaged:
959                info.result = options.TaskResult.passed
960                info.time = collect_compile_time(time_string)
961            else:
962                logging.error(f'Unexpected changes have occurred.OutHsp should not be packaged into module abc')
963                info.result = options.TaskResult.failed
964                info.error_message = f'Unexpected changes have occurred.OutHsp should not be packaged into module abc'
965            if options.arguments.run_haps:
966                runtime_passed = run_compile_output(info, task, is_debug, 'import_external_share_library')
967
968    @staticmethod
969    def import_external_static_library(task, is_debug):
970        test_name = 'import_external_static_library'
971        external_task = ExternalTest.get_external_task()
972        clean_compile(task)
973        clean_compile(external_task)
974        current_task = ExternalTest.prepare_current_task(task, test_name)
975        info = current_task.debug_info if is_debug else current_task.release_info
976        logging.info(f"==========> Running {test_name} for task: {task.name}")
977
978        with manage_module_import_and_export_handle(task, 'ExternalHar'):
979            cmd = get_hvigor_compile_cmd(task, is_debug, '')
980            [stdout, stderr] = compile_project(task, is_debug, cmd)
981            [is_success, time_string] = is_compile_success(stdout)
982            if not is_success:
983                info.result = options.TaskResult.failed
984                info.error_message = f'Full compile failed due to {task.hap_module} module'
985                logging.error(f'Full compile failed due to {task.hap_module} module')
986                return
987            passed = validate_compile_output(info, task, is_debug, '', '')
988            if not passed:
989                info.result = options.TaskResult.failed
990                info.error_message = f'Validate failed due to {task.hap_module} module'
991                logging.error(f'Validate failed due to {task.hap_module} module')
992                return
993            pa_file = get_disasm_abc_file(task, info, 'Hap')
994            if not pa_file:
995                return
996            is_packaged = is_package_modules_to_module_abc(task, pa_file, external_task.har_module)
997            if is_packaged:
998                info.result = options.TaskResult.passed
999                info.time = collect_compile_time(time_string)
1000            else:
1001                logging.error(f'OutHar was not properly packaged into module abc')
1002                info.result = options.TaskResult.failed
1003                info.error_message = f'OutHar was not properly packaged into module abc'
1004            if options.arguments.run_haps:
1005                runtime_passed = run_compile_output(info, task, is_debug, 'import_external_static_library')
1006
1007    @staticmethod
1008    def full_compile_external_static_library(task, is_debug):
1009        if is_debug:
1010            return
1011        test_name = 'full_compile_external_static_library'
1012        external_task = ExternalTest.get_external_task()
1013        clean_compile(task)
1014        clean_compile(external_task)
1015        current_task = ExternalTest.prepare_current_task(task, test_name)
1016        info = current_task.release_info
1017        logging.info(f"==========> Running {test_name} for task: {task.name}")
1018
1019        with manage_module_import_and_export_handle(task, 'ExternalHar'):
1020            cmd = get_hvigor_compile_cmd(external_task, is_debug, 'Har')
1021            [stdout, stderr] = compile_project(external_task, is_debug, cmd)
1022            [is_success, time_string] = is_compile_success(stdout)
1023            if not is_success:
1024                info.result = options.TaskResult.failed
1025                info.error_message = f'Full compile failed due to {external_task.har_module} module'
1026                logging.error(f'Full compile failed due to {external_task.har_module} module')
1027                return
1028            passed = validate_compile_output(info, external_task, is_debug, '', 'Har')
1029            if not passed:
1030                info.result = options.TaskResult.failed
1031                info.error_message = f'Validate failed due to {external_task.har_module} module'
1032                logging.error(f'Validate failed due to {external_task.har_module} module')
1033            else:
1034                info.result = options.TaskResult.passed
1035                info.time = collect_compile_time(time_string)
1036
1037    @staticmethod
1038    def full_compile_external_share_library(task, is_debug):
1039        test_name = 'full_compile_external_share_library'
1040        external_task = ExternalTest.get_external_task()
1041        clean_compile(task)
1042        clean_compile(external_task)
1043        current_task = ExternalTest.prepare_current_task(task, test_name)
1044        info = current_task.debug_info if is_debug else current_task.release_info
1045        logging.info(f"==========> Running {test_name} for task: {task.name}")
1046
1047        with manage_module_import_and_export_handle(task, 'ExternalHsp'):
1048            cmd = get_hvigor_compile_cmd(external_task, is_debug, 'Hsp')
1049            [stdout, stderr] = compile_project(external_task, is_debug, cmd)
1050            [is_success, time_string] = is_compile_success(stdout)
1051            if not is_success:
1052                info.result = options.TaskResult.failed
1053                info.error_message = f'Full compile failed due to {external_task.hsp_module} module'
1054                logging.error(f'Full compile failed due to {external_task.hsp_module} module')
1055                return
1056            passed = validate_compile_output(info, external_task, is_debug, '', 'Hsp')
1057            if not passed:
1058                info.result = options.TaskResult.failed
1059                info.error_message = f'Validate failed due to {external_task.hsp_module} module'
1060                logging.error(f'Validate failed due to {external_task.hsp_module} module')
1061            else:
1062                info.result = options.TaskResult.passed
1063                info.time = collect_compile_time(time_string)
1064
1065
1066class PreviewTest:
1067    @staticmethod
1068    def validate_preview_incremental_file(task, preview_task_info, is_debug, modified_files, module=''):
1069        module_path = utils.get_module_path(task, module)
1070        cache_path = os.path.join(
1071            task.path, *module_path, *task.build_path, *task.preview_cache_path, 'debug')
1072        backup_path = task.backup_info.cache_debug
1073
1074        passed = validate_cache_file(task, preview_task_info, modified_files, cache_path, backup_path)
1075        return passed
1076
1077    @staticmethod
1078    def preview_compile(task, is_debug):
1079        test_name = "preview_compile"
1080        preview_task_info = options.CompilationInfo()
1081        task.preview_compilation_info[test_name] = preview_task_info
1082        clean_preview_cache(task)
1083        logging.info(f"==========> Running {test_name} for task: {task.name}")
1084
1085        passed, build_time = preview_mode_build(preview_task_info, task, is_debug, f'preview_compile_{test_name}')
1086        if passed:
1087            preview_task_info.result = options.TaskResult.passed
1088            preview_task_info.time = build_time
1089
1090        return passed
1091
1092    @staticmethod
1093    def compile_preview_build_entry_then_preview(task, is_debug):
1094        test_name = "build_entry_then_preview"
1095        preview_task_info = options.CompilationInfo()
1096        task.preview_compilation_info[test_name] = preview_task_info
1097        logging.info(f"==========> Running {test_name} for task: {task.name}")
1098
1099        build_entry, build_module_time = is_build_module_successful(task, is_debug, preview_task_info)
1100        if not build_entry:
1101            return
1102        build_preview, preview_build_time = preview_mode_build(preview_task_info, task, is_debug)
1103
1104        if build_preview:
1105            preview_task_info.result = options.TaskResult.passed
1106            preview_task_info.time = preview_build_time
1107
1108    @staticmethod
1109    def compile_preview_build_modify_file_name(task, is_debug):
1110        test_name = "build_modify_file_name"
1111        preview_task_info = options.CompilationInfo()
1112        task.preview_compilation_info[test_name] = preview_task_info
1113        logging.info(f"==========> Running {test_name} for task: {task.name}")
1114
1115        modify_file = os.path.join(task.path, *task.inc_modify_file)
1116        patch_content = options.configs.get('patch_content')
1117        patch_new_file_ts = patch_content.get('patch_new_file_ts')
1118        ts_file_name = patch_new_file_ts.get('name')
1119        ts_content = patch_new_file_ts.get('content')
1120        modify_dic = os.path.dirname(modify_file)
1121        ts_file = os.path.join(modify_dic, ts_file_name)
1122
1123        try:
1124            with open(ts_file, 'w', encoding='utf-8') as file:
1125                file.write(ts_content)
1126
1127            path_lines = patch_content.get('patch_lines_1')
1128            ts_path_lines = path_lines.get('ts')
1129            head_contnet = ts_path_lines.get('head')
1130            tail_contnet = ts_path_lines.get('tail')
1131            utils.add_content_to_file(modify_file, head_contnet, tail_contnet)
1132            first_build_passed, first_build_time = is_build_module_successful(task, is_debug, preview_task_info)
1133            if not first_build_passed:
1134                return
1135            ts_file_new_name = patch_new_file_ts.get('new_name')
1136            ts_new_file = os.path.join(modify_dic, ts_file_new_name)
1137            os.rename(ts_file, ts_new_file)
1138            second_build_passed, second_build_time = is_build_module_successful(task, is_debug, preview_task_info)
1139            if second_build_passed:
1140                preview_task_info.result = options.TaskResult.passed
1141                preview_task_info.time = first_build_time + second_build_time
1142        finally:
1143            os.remove(ts_new_file)
1144            utils.remove_content_from_file(modify_file, head_contnet, tail_contnet)
1145
1146    @staticmethod
1147    def compile_preview_build_generate_sourcemap(task, is_debug):
1148        test_name = "build_generate_sourcemap"
1149        preview_task_info = options.CompilationInfo()
1150        task.preview_compilation_info[test_name] = preview_task_info
1151        logging.info(f"==========> Running {test_name} for task: {task.name}")
1152
1153        build_preview, preview_build_time = preview_mode_build(preview_task_info, task, is_debug)
1154        if not build_preview:
1155            return
1156
1157        preview_path = os.path.join(task.path, *task.hap_module_path, *task.preview_path)
1158        preview_cache_path = os.path.join(preview_path, *task.preview_cache_path)
1159        source_map_path = os.path.join(preview_cache_path, 'debug', 'sourceMaps.json') if is_debug \
1160            else os.path.join(preview_cache_path, 'release', 'sourceMaps.json')
1161
1162        if not os.path.exists(source_map_path):
1163            logging.error(f'task: {task.name},source map not found in {source_map_path}')
1164            preview_task_info.result = options.TaskResult.failed
1165            preview_task_info.error_message = f"Source map not found in f{source_map_path}"
1166            return
1167
1168        preview_task_info.result = options.TaskResult.passed
1169        preview_task_info.time = preview_build_time
1170
1171    @staticmethod
1172    def compile_preview_build_tigger_incremental_build(task, is_debug):
1173        test_name = "tigger_incremental_build"
1174        preview_task_info = options.CompilationInfo()
1175        task.preview_compilation_info[test_name] = preview_task_info
1176        logging.info(f"==========> Running {test_name} for task: {task.name}")
1177
1178        backup_preview_cache(task, is_debug)
1179        inc_modify_file = os.path.join(task.path, *task.inc_modify_file)
1180        patch_line = options.configs.get('patch_content').get('patch_lines_2')
1181        utils.add_content_to_file(inc_modify_file, '', patch_line.get('tail'))
1182
1183        try:
1184            build_preview, preview_build_time = preview_mode_build(preview_task_info, task, is_debug)
1185            if not build_preview:
1186                return
1187
1188            passed = PreviewTest.validate_preview_incremental_file(task, preview_task_info, is_debug, inc_modify_file)
1189            if passed:
1190                preview_task_info.result = options.TaskResult.passed
1191                preview_task_info.time = preview_build_time
1192        finally:
1193            utils.remove_content_from_file(inc_modify_file, '', patch_line.get('tail'))
1194
1195    @staticmethod
1196    def compile_preview_build_has_arkui_error(task, is_debug):
1197        test_name = "has_arkui_error"
1198        preview_task_info = options.CompilationInfo()
1199        task.preview_compilation_info[test_name] = preview_task_info
1200        logging.info(f"==========> Running {test_name} for task: {task.name}")
1201
1202        add_or_delete_arkui_component(task, 1, True)
1203        try:
1204            preview_mode_build(preview_task_info, task, is_debug)
1205            cmd = get_preview_mode_compile_cmd(task, is_debug)
1206            [stdout, stderr] = compile_project(task, is_debug, cmd)
1207            expected_errors = options.configs.get('patch_content').get('arkui_patch').get('expected_errors')
1208            is_passed = is_get_expected_error(preview_task_info, stderr, expected_errors)
1209            if is_passed:
1210                preview_task_info.result = options.TaskResult.passed
1211        finally:
1212            add_or_delete_arkui_component(task, 0)
1213
1214    @staticmethod
1215    def compile_preview_build_sdk_path_has_special_char(task, is_debug):
1216        test_name = "sdk_path_has_special_char"
1217        preview_task_info = options.CompilationInfo()
1218        task.preview_compilation_info[test_name] = preview_task_info
1219        logging.info(f"==========> Running {test_name} for task: {task.name}")
1220
1221        sdk_path, api_version = '', ''
1222        profile_file = os.path.join(task.path, 'build-profile.json5')
1223        with open(profile_file, 'r', encoding='utf-8') as file:
1224            profile_data = json5.load(file)
1225            api_version = profile_data['app']['products'][0]['compatibleSdkVersion']
1226            if isinstance(api_version, int):
1227                openharmony_sdk_path = options.configs.get('deveco_openharmony_sdk_path')
1228                sdk_path = os.path.join(openharmony_sdk_path, str(api_version), 'ets', 'build-tools')
1229            else:
1230                harmonyos_sdk_path = options.configs.get('deveco_harmonyos_sdk_path')
1231                api_version_file_map = options.configs.get('api_version_file_name_map')
1232                file_name = api_version_file_map.get(api_version)
1233                sdk_path = os.path.join(harmonyos_sdk_path, file_name, 'openharmony', 'ets', 'build-tools')
1234        # Add a space to sdk file path
1235        last_folder_name = os.path.basename(sdk_path)
1236        new_folder_name = last_folder_name[:2] + " " + last_folder_name[2:]
1237        new_sdk_path = os.path.join(os.path.dirname(sdk_path), new_folder_name)
1238        try:
1239            os.rename(sdk_path, new_sdk_path)
1240            passed, build_time = preview_mode_build(preview_task_info, task, is_debug)
1241
1242            if passed:
1243                preview_task_info.result = options.TaskResult.passed
1244                preview_task_info.time = build_time
1245            else:
1246                preview_task_info.result = options.TaskResult.failed
1247                logging.error(f'Test failed due to adding spaces to the SDK path')
1248        finally:
1249            os.rename(new_sdk_path, sdk_path)
1250
1251    @staticmethod
1252    def compile_preview_build_modify_error_then_fix(task, is_debug):
1253        test_name = "modify_hello_world_then_fix"
1254        preview_task_info = options.CompilationInfo()
1255        task.preview_compilation_info[test_name] = preview_task_info
1256        logging.info(f"==========> Running {test_name} for task: {task.name}")
1257
1258        add_or_delete_arkui_component(task, 1)
1259        arkui_patch = options.configs.get('patch_content').get('arkui_patch')
1260        origin_text = arkui_patch.get('origin_text')
1261        error_text = arkui_patch.get('error_text')
1262        try:
1263            preview_modify_file = os.path.join(task.path, *task.inc_modify_file)
1264            with open(preview_modify_file, 'r+', encoding='utf-8') as file:
1265                old_content = file.read()
1266                new_content = old_content.replace(origin_text, error_text)
1267                file.seek(0)
1268                file.write(new_content)
1269                file.truncate()
1270            cmd = get_preview_mode_compile_cmd(task, is_debug)
1271            [stdout, stderr] = compile_project(task, is_debug, cmd)
1272            expected_errors = arkui_patch.get('expected_errors')
1273            is_passed = is_get_expected_error(preview_task_info, stderr, expected_errors)
1274            if not is_passed:
1275                logging.error(f'task: {task.name}, first build did not get expected errors, skip second build')
1276                return
1277            with open(preview_modify_file, 'r+', encoding='utf-8') as file:
1278                old_content = file.read()
1279                new_content = old_content.replace(error_text, origin_text)
1280                file.seek(0)
1281                file.write(new_content)
1282                file.truncate()
1283            is_build_successful, build_time = preview_mode_build(preview_task_info, task, is_debug)
1284            if is_build_successful:
1285                preview_task_info.result = options.TaskResult.passed
1286                preview_task_info.time = build_time
1287        finally:
1288            add_or_delete_arkui_component(task, 0)
1289
1290
1291class OtherTest:
1292    @staticmethod
1293    def is_abc_same_in_haps(hap_1, hap_2):
1294        hap_1_abc_files = []
1295        hap_2_abc_files = []
1296        with zipfile.ZipFile(hap_1) as zf1, zipfile.ZipFile(hap_2) as zf2:
1297            for file in zf1.namelist():
1298                if file.endswith('.abc'):
1299                    hap_1_abc_files.append(file)
1300            for file in zf2.namelist():
1301                if file.endswith('.abc'):
1302                    hap_2_abc_files.append(file)
1303
1304            hap_1_abc_files.sort()
1305            hap_2_abc_files.sort()
1306
1307            if len(hap_1_abc_files) != len(hap_2_abc_files):
1308                return False
1309
1310            for idx, abc_file in enumerate(hap_1_abc_files):
1311                with zf1.open(abc_file) as f1, zf2.open(hap_2_abc_files[idx]) as f2:
1312                    data1 = f1.read()
1313                    data2 = f2.read()
1314                    if data1 != data2:
1315                        return False
1316
1317        return True
1318
1319    @staticmethod
1320    def verify_binary_consistency(task):
1321        test_name = 'binary_consistency'
1322        test_info = options.CompilationInfo()
1323        task.other_tests[test_name] = test_info
1324        debug_consistency = True
1325        release_consistency = True
1326
1327        logging.info(f"==========> Running {test_name} for task: {task.name}")
1328        if options.arguments.hap_mode in ['all', 'release']:
1329            # will have at lease 1 output from full compile
1330            if len(task.backup_info.output_release) == 1:
1331                compile_project(task, False)
1332                backup_compile_output(task, False)
1333
1334            if len(task.backup_info.output_release) == 2:
1335                release_consistency = OtherTest.is_abc_same_in_haps(task.backup_info.output_release[0],
1336                                                                    task.backup_info.output_release[1])
1337            else:
1338                release_consistency = False
1339            logging.debug(f"release consistency: {release_consistency}")
1340
1341        if options.arguments.hap_mode in ['all', 'debug']:
1342            if len(task.backup_info.output_debug) == 1:
1343                compile_project(task, True)
1344                backup_compile_output(task, True)
1345
1346            if len(task.backup_info.output_debug) == 2:
1347                debug_consistency = OtherTest.is_abc_same_in_haps(task.backup_info.output_debug[0],
1348                                                                  task.backup_info.output_debug[1])
1349            else:
1350                debug_consistency = False
1351            logging.debug(f"debug consistency: {debug_consistency}")
1352
1353        if debug_consistency and release_consistency:
1354            test_info.result = options.TaskResult.passed
1355        else:
1356            test_info.result = options.TaskResult.failed
1357
1358    @staticmethod
1359    def execute_break_compile(task, is_debug):
1360        test_name = 'break_continue_compile'
1361        test_info = options.CompilationInfo()
1362        task.other_tests[test_name] = test_info
1363
1364        logging.info(f"==========> Running {test_name} for task: {task.name}")
1365        clean_compile(task)
1366        cmd = get_hvigor_compile_cmd(task, is_debug)
1367        logging.debug(f'cmd: {cmd}')
1368        logging.debug(f"cmd execution path {task.path}")
1369        process = subprocess.Popen(cmd, shell=False, cwd=task.path,
1370                                   stdout=subprocess.PIPE,
1371                                   stderr=subprocess.STDOUT)
1372
1373        for line in iter(process.stdout.readline, b''):
1374            if b'CompileArkTS' in line:
1375                logging.debug("terminate signal sent")
1376                process.send_signal(signal.SIGTERM)
1377                break
1378
1379        [stdout, stderr] = process.communicate(
1380            timeout=options.arguments.compile_timeout)
1381
1382        logging.debug("first compile: stdcout: %s",
1383                      stdout.decode('utf-8', errors="ignore"))
1384
1385        logging.debug("another compile")
1386        [stdout, stderr] = compile_project(task, is_debug)
1387
1388        [is_success, time_string] = is_compile_success(stdout)
1389        if not is_success:
1390            test_info.result = options.TaskResult.failed
1391            test_info.error_message = stderr
1392        else:
1393            passed = validate_compile_output(test_info, task, is_debug)
1394            if passed:
1395                test_info.result = options.TaskResult.passed
1396        if options.arguments.run_haps:
1397            run_compile_output(test_info, task, True, 'other_tests_break_continue_compile')
1398
1399    @staticmethod
1400    def compile_full_with_error(task, is_debug):
1401        test_name = 'compile_with_error'
1402        test_info = options.CompilationInfo()
1403        task.other_tests[test_name] = test_info
1404
1405        logging.info(f"==========> Running {test_name} for task: {task.name}")
1406        modify_file_item = task.inc_modify_file
1407        modify_file = os.path.join(task.path, *modify_file_item)
1408        modify_file_backup = modify_file + ".bak"
1409        shutil.copyfile(modify_file, modify_file_backup)
1410
1411        patch_lines_error = options.configs.get(
1412            'patch_content').get('patch_lines_error')
1413        with open(modify_file, 'a', encoding='utf-8') as file:
1414            file.write(patch_lines_error.get('content'))
1415
1416        [stdout, stderr] = compile_project(task, is_debug)
1417        expected_errors = patch_lines_error.get('expected_error')
1418
1419        passed = False
1420        for expected_error in expected_errors:
1421            if expected_error in stderr:
1422                passed = True
1423                break
1424
1425        if passed:
1426            test_info.result = options.TaskResult.passed
1427        else:
1428            test_info.result = options.TaskResult.failed
1429            test_info.error_message = f"expected error message: {expected_errors}, but got {stderr}"
1430
1431        shutil.move(modify_file_backup, modify_file)
1432
1433    @staticmethod
1434    def compile_with_exceed_length(task, is_debug):
1435        test_name = 'compile_with_exceed_length'
1436        test_info = options.CompilationInfo()
1437        task.other_tests[test_name] = test_info
1438
1439        logging.info(f"==========> Running {test_name} for task: {task.name}")
1440        # get build-profile.json5
1441        profile_file = os.path.join(
1442            task.path, *task.hap_module_path, 'build-profile.json5')
1443        profile_file_backup = profile_file + ".bak"
1444        shutil.copyfile(profile_file, profile_file_backup)
1445
1446        with open(profile_file, 'r', encoding='utf-8') as file:
1447            profile_data = json5.load(file)
1448
1449        long_str = 'default1234567890123456789012345678901234567890123456789012345678901234567890123456789' + \
1450                   '012345678901234567890123456789'
1451        logging.debug("long_str: %s", long_str)
1452        profile_data['targets'][0]['name'] = long_str
1453
1454        with open(profile_file, 'w', encoding='utf-8') as file:
1455            json5.dump(profile_data, file)
1456
1457        cmd = get_hvigor_compile_cmd(task, is_debug, task.hap_module, long_str)
1458        [stdout, stderr] = compile_project(task, is_debug, cmd)
1459        # Only the Windows platform has a length limit
1460        if utils.is_windows():
1461            expected_error_message = f"Unknown module '{long_str}' in the command line"
1462
1463            if expected_error_message in stderr:
1464                test_info.result = options.TaskResult.passed
1465            else:
1466                test_info.result = options.TaskResult.failed
1467                test_info.error_message = f"expected error message: {expected_error_message}, but got {stderr}"
1468        else:
1469            [is_success, time_string] = is_compile_success(stdout)
1470            if not is_success:
1471                test_info.result = options.TaskResult.failed
1472                test_info.error_message = stderr
1473            else:
1474                passed = validate_compile_output(test_info, task, is_debug)
1475                if passed:
1476                    test_info.result = options.TaskResult.passed
1477
1478        shutil.move(profile_file_backup, profile_file)
1479
1480    @staticmethod
1481    def compile_ohos_test(task):
1482        test_name = 'ohos_test'
1483        test_info = options.CompilationInfo()
1484        task.other_tests[test_name] = test_info
1485
1486        logging.info(f"==========> Running {test_name} for task: {task.name}")
1487        # ohosTest has only debug mode
1488        cmd = [*get_hvigor_path(), '--mode', 'module',
1489               '-p', 'module=entry@ohosTest', 'assembleHap']
1490        [stdout, stderr] = compile_project(task, True, cmd)
1491        [is_success, time_string] = is_compile_success(stdout)
1492        if not is_success:
1493            test_info.result = options.TaskResult.failed
1494            test_info.error_message = stderr
1495        else:
1496            output_file = get_compile_output_file_path(task, '', options.OutputType.unsigned)
1497            output_dir = os.path.dirname(output_file)
1498            output_file_name = os.path.basename(output_file)
1499
1500            ohos_test_str = 'ohosTest'
1501            output_file_name_items = output_file_name.split(
1502                '-')  # hap name format: entry-default-signed.hap
1503            # ohosTest hap format: entry-ohosTest-signed.hap
1504            output_file_name_items[-2] = ohos_test_str
1505            output_file_name = '-'.join(output_file_name_items)
1506
1507            output_dir_items = output_dir.split(os.path.sep)
1508            output_dir_items[-1] = ohos_test_str
1509            if utils.is_windows():
1510                # for windows, need to add an empty string to mark between disk identifier and path
1511                output_dir_items.insert(1, os.path.sep)
1512            elif utils.is_mac():
1513                output_dir_items.insert(0, os.path.sep)
1514            ohos_test_output_file = os.path.join(
1515                *output_dir_items, output_file_name)
1516
1517            passed = validate_compile_output(
1518                test_info, task, True, ohos_test_output_file)
1519            if passed:
1520                test_info.result = options.TaskResult.passed
1521
1522
1523def disasm_abc(task, abc_file):
1524    if not os.path.exists(task.ark_disasm_path):
1525        logging.error("ark_disasm executable not found")
1526        return ''
1527
1528    pa_file = abc_file + '.pa'
1529    cmd = [task.ark_disasm_path, '--verbose', abc_file, pa_file]
1530    logging.debug(f'cmd: {cmd}')
1531    process = subprocess.Popen(
1532        cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1533    [stdout, stderr] = process.communicate(
1534        timeout=options.arguments.compile_timeout)
1535
1536    logging.debug("disasm stdcout: %s",
1537                  stdout.decode('utf-8', errors="ignore"))
1538    logging.warning("disasm: stdcerr: %s",
1539                    stderr.decode('utf-8', errors="ignore"))
1540
1541    return pa_file
1542
1543
1544def is_abc_debug_info_correct(task, abc_file, is_debug):
1545    pa_file = disasm_abc(task, abc_file)
1546    if not os.path.exists(pa_file):
1547        logging.error(f"pa file not exist: {pa_file}")
1548        return False
1549
1550    debug_info_block_str = 'LOCAL_VARIABLE_TABLE'
1551    has_debug_info_block = False
1552    with open(pa_file, 'r', encoding='utf-8') as pa:
1553        line = pa.readline()
1554        while line:
1555            if debug_info_block_str in line.strip():
1556                has_debug_info_block = True
1557                break
1558            line = pa.readline()
1559
1560    if is_debug:
1561        return has_debug_info_block
1562    else:
1563        return not has_debug_info_block
1564
1565
1566def validate_output_for_jsbundle(info, task, uncompressed_output_path, is_debug):
1567    abc_files = []
1568    for root, dirs, files in os.walk(uncompressed_output_path):
1569        for file in files:
1570            if file.endswith('.abc'):
1571                abc_files.append(os.path.join(root, file))
1572
1573    total_size = 0
1574    for file in abc_files:
1575        total_size += os.path.getsize(
1576            os.path.join(uncompressed_output_path, file))
1577        if 'compatible8' not in task.type and not is_abc_debug_info_correct(task, file, is_debug):
1578            # skip compatible8 outputs as disasm may failed
1579            info.result = options.TaskResult.failed
1580            info.error_message = f"{file} debug info not correct"
1581            return False
1582
1583    if total_size == 0:
1584        info.result = options.TaskResult.failed
1585        info.error_message = "abc not found or abc size is 0"
1586        return False
1587    else:
1588        info.abc_size = total_size
1589
1590    if is_debug:
1591        for file in abc_files:
1592            sourcemap_file = file.replace('.abc', '.js.map')
1593            if not os.path.exists(os.path.join(uncompressed_output_path, sourcemap_file)):
1594                info.result = options.TaskResult.failed
1595                info.error_message = "sourcemap not found"
1596                return False
1597
1598    return True
1599
1600
1601def validate_output_for_esmodule(info, task, uncompressed_output_path, is_debug, module = ''):
1602    abc_generated_path = os.path.join(uncompressed_output_path, 'ets')
1603
1604    modules_abc_path = os.path.join(abc_generated_path, 'modules.abc')
1605    if not os.path.exists(modules_abc_path):
1606        info.result = options.TaskResult.failed
1607        info.error_message = "modules.abc not found"
1608        return False
1609
1610    modules_abc_size = os.path.getsize(modules_abc_path)
1611    if modules_abc_size <= 0:
1612        info.result = options.TaskResult.failed
1613        info.error_message = "modules.abc size is 0"
1614        return False
1615    if not is_abc_debug_info_correct(task, modules_abc_path, is_debug):
1616        info.result = options.TaskResult.failed
1617        info.error_message = "modules.abc debug info not correct"
1618        return False
1619    info.abc_size = modules_abc_size
1620
1621    if 'widget' in task.type:
1622        widget_abc_path = os.path.join(abc_generated_path, 'widgets.abc')
1623        if not os.path.exists(widget_abc_path):
1624            info.result = options.TaskResult.failed
1625            info.error_message = "widgets.abc not found"
1626            return False
1627
1628        widgets_abc_size = os.path.getsize(widget_abc_path)
1629        if widgets_abc_size <= 0:
1630            info.result = options.TaskResult.failed
1631            info.error_message = "widgets.abc size is 0"
1632            return False
1633        if not is_abc_debug_info_correct(task, widget_abc_path, is_debug):
1634            info.result = options.TaskResult.failed
1635            info.error_message = "widgets.abc debug info not correct"
1636            return False
1637        info.abc_size += widgets_abc_size
1638
1639    if is_debug:
1640        sourcemap_path = abc_generated_path
1641    elif module == 'Hsp':
1642        sourcemap_path = os.path.join(
1643            task.path, *task.hsp_module_path, *(task.build_path), *(task.cache_path), 'release')
1644    else:
1645        sourcemap_path = os.path.join(
1646            task.path, *task.hap_module_path, *(task.build_path), *(task.cache_path), 'release')
1647    sourcemap_file = os.path.join(sourcemap_path, 'sourceMaps.map')
1648    if not os.path.exists(sourcemap_file):
1649        info.result = options.TaskResult.failed
1650        info.error_message = "sourcemap not found"
1651        return False
1652
1653    return True
1654
1655
1656def collect_compile_time(time_string):
1657    time_min = 0.0
1658    time_second = 0.0
1659    time_millisecond = 0.0
1660
1661    time_items = time_string.split()
1662    for idx, item in enumerate(time_items):
1663        if item == 'min':
1664            time_min = float(time_items[idx - 1]) * 60
1665        if item == 's':
1666            time_second = float(time_items[idx - 1])
1667        if item == 'ms':
1668            time_millisecond = round(float(time_items[idx - 1]) / 1000, 3)
1669
1670    return round(time_min + time_second + time_millisecond, 3)
1671
1672
1673def get_compile_output_file_path(task, module, output_type):
1674    module_path = utils.get_module_path(task, module)
1675    output_path = utils.get_output_path(task, module, output_type)
1676    output_file = os.path.join(task.path, *module_path, *task.build_path, *output_path)
1677
1678    return output_file
1679
1680
1681def validate_compile_output_har(info, task, is_debug, output_file='', module=''):
1682    uncompressed_output_file = get_output_uncompressed_file(task, info, module, options.OutputType.har)
1683
1684    if not uncompressed_output_file:
1685        return False
1686
1687    return True
1688
1689
1690def validate_compile_file_bytecode_har(task, info, module):
1691    module_path = utils.get_module_path(task, module)
1692    uncompressed_path = get_output_uncompressed_file(task, info, module, options.OutputType.har)
1693    modules_abc_path = os.path.join(uncompressed_path, 'ets', 'modules.abc')
1694    if not os.path.exists(modules_abc_path):
1695        return False
1696    is_success = find_file_by_suffix(['.d.ets'], uncompressed_path,
1697                                     'Index.ets', '')
1698    if not is_success:
1699        return False
1700    ets_path = os.path.join(task.path, *module_path, 'src', 'main', 'ets')
1701    for root, dirs, files in os.walk(ets_path):
1702        relative_path = os.path.relpath(root, os.path.join(task.path, *module_path))
1703        for file in files:
1704            if file.endswith('.ets'):
1705                extension_list = ['.d.ets']
1706            elif file.endswith('.ts'):
1707                extension_list = ['.d.ts']
1708            else:
1709                continue
1710            is_success = find_file_by_suffix(extension_list, uncompressed_path, file, relative_path)
1711            if not is_success:
1712                return False
1713    return True
1714
1715
1716def validate_compile_file_har(task, info, module):
1717    module_path = utils.get_module_path(task, module)
1718    uncompressed_path = get_output_uncompressed_file(task, info, module, options.OutputType.har)
1719    is_success = find_file_by_suffix(['.d.ets', '.js'], uncompressed_path,
1720                                     'Index.ets', '')
1721    if not is_success:
1722        return False
1723    ets_path = os.path.join(task.path, *module_path, 'src', 'main', 'ets')
1724    for root, dirs, files in os.walk(ets_path):
1725        relative_path = os.path.relpath(root, os.path.join(task.path, *module_path))
1726        for file in files:
1727            if file.endswith('.ets'):
1728                extension_list = ['.d.ets', '.js']
1729            elif file.endswith('.ts'):
1730                extension_list = ['.d.ts', '.js']
1731            elif file.endswith('.js'):
1732                extension_list = ['.js']
1733            else:
1734                continue
1735            is_success = find_file_by_suffix(extension_list, uncompressed_path, file, relative_path)
1736            if not is_success:
1737                return False
1738    return True
1739
1740
1741def find_file_by_suffix(extension_list, uncompressed_path, filename, relative_path):
1742    origin_extension = os.path.splitext(filename)[-1]
1743    for extension in extension_list:
1744        new_filename = filename.replace(origin_extension, extension)
1745        new_filepath = os.path.join(uncompressed_path, relative_path, new_filename)
1746        if not os.path.exists(new_filepath):
1747            return False
1748    return True
1749
1750
1751def validate_compile_output(info, task, is_debug, output_file='', module=''):
1752    passed = False
1753
1754    if output_file == '':
1755        output_file = get_compile_output_file_path(task, module, options.OutputType.unsigned)
1756
1757    if module == 'BytecodeHar':
1758        # Har declaration files are not generated in debug mode.
1759        if is_debug:
1760            return True
1761        return validate_compile_file_bytecode_har(task, info, module)
1762    if module == 'Har':
1763        if is_debug:
1764            return True
1765        return validate_compile_file_har(task, info, module)
1766
1767    uncompressed_output_file = output_file + '.uncompressed'
1768    if not os.path.exists(output_file):
1769        logging.error("output file for task %s not exists: %s",
1770                      task.name, output_file)
1771        passed = False
1772
1773        info.result = options.TaskResult.failed
1774        info.error_message = f"{module} not found"
1775        return passed
1776    try:
1777        with zipfile.ZipFile(output_file, 'r') as zip_ref:
1778            zip_ref.extractall(uncompressed_output_file)
1779    except Exception as e:
1780        logging.error(f"unzip exception: {e}")
1781        logging.error(
1782            f"uncompressed output file for task {task.name} failed. output file: {output_file}")
1783        passed = False
1784
1785        info.result = options.TaskResult.failed
1786        info.error_message = "Hap uncompressed failed, cannot exam build products"
1787        return passed
1788
1789    if utils.is_esmodule(task.type):
1790        passed = validate_output_for_esmodule(
1791            info, task, uncompressed_output_file, is_debug, module)
1792    else:
1793        passed = validate_output_for_jsbundle(
1794            info, task, uncompressed_output_file, is_debug)
1795
1796    shutil.rmtree(uncompressed_output_file)
1797
1798    return passed
1799
1800
1801def run_compile_output(info, task, is_debug, picture_name='', module=''):
1802    hsp_output_path = task.backup_info.hsp_signed_output_debug if is_debug \
1803        else task.backup_info.hsp_signed_output_release
1804    if len(hsp_output_path) < 1:
1805        backup_hsp_module_compile_signed_package(task, is_debug)
1806
1807    picture_suffix = 'debug'
1808    if not is_debug:
1809        picture_suffix = 'release'
1810    picture_name = f'{picture_name}_{picture_suffix}'
1811
1812    runtime_passed = False
1813    # There is a certain probability of failure when taking screenshots
1814    try_times = 5
1815    for i in range(try_times):
1816        utils.get_running_screenshot(task, picture_name, is_debug, module)
1817        time.sleep(2)
1818        if utils.verify_runtime(task, picture_name):
1819            runtime_passed = True
1820            break
1821        else:
1822            logging.debug(f'get the preview picture failed, retry: {i}/{try_times}')
1823
1824    if not runtime_passed:
1825        logging.error(f'The runtime of the {task.name} is inconsistent with the reference screenshot,'
1826                      f' when running {picture_name}')
1827        info.runtime_result = options.TaskResult.failed
1828        info.error_message = "The runtime result is inconsistent with the reference"
1829    else:
1830        info.runtime_result = options.TaskResult.passed
1831
1832    return runtime_passed
1833
1834
1835# verify preview build picture
1836def verify_preview_picture(info, task, is_debug, picture_name, module=''):
1837    return True
1838
1839
1840def is_compile_success(compile_stdout):
1841    pattern = r"BUILD SUCCESSFUL in (\d+ min )?(\d+ s )?(\d+ ms)?"
1842    match_result = re.search(pattern, compile_stdout)
1843    if not match_result:
1844        return [False, '']
1845
1846    return [True, match_result.group(0)]
1847
1848
1849def validate(compilation_info, task, is_debug, stdout, stderr, picture_name='', output_file=''):
1850    info = {}
1851    if is_debug:
1852        info = compilation_info.debug_info
1853    else:
1854        info = compilation_info.release_info
1855
1856    # ret_code will be 1 if there's stderr, use "COMPILE SUCCESSFUL" as a flag to make a judge
1857    [is_success, time_string] = is_compile_success(stdout)
1858    if not is_success:
1859        info.result = options.TaskResult.failed
1860        info.error_message = stderr
1861        return False
1862
1863    passed = validate_compile_output(info, task, is_debug, output_file)
1864
1865    if options.arguments.run_haps and picture_name:
1866        runtime_passed = run_compile_output(info, task, is_debug, picture_name)
1867
1868    if passed:
1869        info.time = collect_compile_time(time_string)
1870        info.result = options.TaskResult.passed
1871
1872    return passed
1873
1874
1875def get_hvigor_path():
1876    hvigor = []
1877    deveco_path = options.configs.get('deveco_path')
1878    node_js_path = os.path.join(deveco_path, 'tools', 'node')
1879    if utils.is_windows():
1880        node_exe_path = os.path.join(node_js_path, 'node.exe')
1881        hvigor_script_path = os.path.join(deveco_path, 'tools', 'hvigor', 'bin', 'hvigorw.js')
1882        hvigor = [node_exe_path, hvigor_script_path]
1883    else:
1884        hvigor = [os.path.join(deveco_path, 'hvigorw')]
1885        utils.add_executable_permission(hvigor)
1886    return hvigor
1887
1888
1889def get_hvigor_compile_cmd(task, is_debug, module='', module_name='', module_target='default'):
1890    cmd = [*get_hvigor_path()]
1891    build_mode = 'debug' if is_debug else 'release'
1892    if not module:
1893        module = 'Hap'
1894    if module == 'BytecodeHar':
1895        module = 'Har'
1896    if not module_name:
1897        module_name = utils.get_module_name(task, module)
1898    cmd.extend(['--mode', 'module', '-p', 'product=default', '-p', f'module={module_name}@{module_target}', '-p',
1899                f'buildMode={build_mode}', f'assemble{module}',
1900                '--info', '--analyze=advanced', '--module_name', '--incremental', '--daemon'])
1901    return cmd
1902
1903
1904def get_preview_mode_compile_cmd(task, is_debug, module='', module_target='default'):
1905    cmd = [*get_hvigor_path()]
1906    build_mode = 'debug' if is_debug else 'release'
1907    module_name = utils.get_module_name(task, module)
1908    page = os.path.join(*task.inc_modify_file)
1909    if module == 'Har':
1910        page = os.path.join(task.har_modify_file)
1911    elif module == 'Hsp':
1912        page = os.path.join(task.hsp_modify_file)
1913
1914    cmd.extend(['--mode', 'module', '-p', f'module={module_name}@{module_target}', '-p', 'product=default',
1915                '-p', f'buildMode={build_mode}', '-p', 'buildRoot=.preview', '-p', '-p',
1916                f'previewer.replace.page={page}', '-p', 'pageType=page', '-p', 'compileResInc=true',
1917                '-p', 'previewMode=true', 'PreviewBuild', '--watch', '--analyze', '--parallel',
1918                '--incremental', '--daemon'])
1919
1920    return cmd
1921
1922
1923def compile_project(task, is_debug, cmd=None):
1924    if cmd is None:
1925        cmd = get_hvigor_compile_cmd(task, is_debug)
1926
1927    logging.debug(f'cmd: {cmd}')
1928    logging.debug(f"cmd execution path {task.path}")
1929    process = subprocess.Popen(cmd, shell=False, cwd=task.path,
1930                               stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1931    stdout, stderr = process.communicate(
1932        timeout=options.arguments.compile_timeout)
1933    stdout_utf8 = stdout.decode("utf-8", errors="ignore")
1934    stderr_utf8 = stderr.decode("utf-8", errors="ignore")
1935    logging.debug(f"cmd stdout: {stdout_utf8}")
1936    logging.debug(f"cmd stderr: {stderr_utf8}")
1937
1938    return [stdout_utf8, stderr_utf8]
1939
1940
1941def preview_mode_build(info, task, is_debug, picture_name='', module=''):
1942    cmd = get_preview_mode_compile_cmd(task, is_debug, module)
1943    [stdout, stderr] = compile_project(task, is_debug, cmd)
1944
1945    [is_success, time_string] = is_compile_success(stdout)
1946    if not is_success:
1947        info.result = options.TaskResult.failed
1948        info.error_message = f'task: {task.name}, Preview compile failed'
1949        logging.error(f'task: {task.name}, Preview compile failed')
1950        return False, ''
1951
1952    is_get_correct_pic = verify_preview_picture(info, task, is_debug, picture_name, module)
1953    if not is_get_correct_pic:
1954        info.result = options.TaskResult.failed
1955        info.error_message = f'task: {task.name}, Get incorrect picture'
1956        logging.error(f'task: {task.name}, Get incorrect picture')
1957        return False, ''
1958
1959    time_string = collect_compile_time(time_string)
1960    return True, time_string
1961
1962
1963def clean_compile(task):
1964    cmd = [*get_hvigor_path(), 'clean']
1965    logging.debug(f'cmd: {cmd}')
1966    logging.debug(f"cmd execution path {task.path}")
1967    process = subprocess.Popen(cmd, shell=False, cwd=task.path,
1968                               stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1969    out, err = process.communicate(timeout=options.arguments.compile_timeout)
1970
1971
1972def clean_preview_cache(task, module=''):
1973    module_path = utils.get_module_path(task, module)
1974
1975    preview_cache_path = os.path.join(task.path, *module_path, '.preview')
1976    if os.path.exists(preview_cache_path):
1977        shutil.rmtree(preview_cache_path)
1978        logging.debug(f"delete preview cache successfully on this path: {preview_cache_path}")
1979
1980
1981def sync_project(task):
1982    ohpm_bat_path = os.path.join(options.configs.get('deveco_path'), 'tools', 'ohpm', 'bin', 'ohpm.bat')
1983    ohpm_install_cmd_suffix = ' install --all --registry https://repo.harmonyos.com/ohpm/ --strict_ssl true'
1984    ohpm_install_cmd = f'"{ohpm_bat_path}"' + ohpm_install_cmd_suffix
1985    cmd_suffix = '--sync -p product=default -p buildMode=debug --analyze --parallel --incremental --daemon'
1986    cmd = [*get_hvigor_path(), cmd_suffix]
1987    logging.debug(f"cmd execution path {task.path}")
1988    logging.debug(f'ohpm install cmd: {ohpm_install_cmd}')
1989    subprocess.Popen(ohpm_install_cmd, shell=False, cwd=task.path,
1990                     stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1991    logging.debug(f'sync cmd: {cmd}')
1992    subprocess.Popen(cmd, shell=False, cwd=task.path,
1993                     stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1994    # If you don't wait it may cause: current process status is busy, cannot start a build action
1995    time.sleep(5)
1996
1997
1998def compile_full(task, is_debug):
1999    if not FullTest.full_compile(task, is_debug):
2000        return False
2001
2002    FullTest.compile_full_import_ordinary_ohpm_package(task, is_debug)
2003    FullTest.compile_full_import_special_ohpm_package(task, is_debug)
2004    FullTest.compile_full_import_static_library(task, is_debug)
2005    FullTest.compile_full_import_share_library(task, is_debug)
2006    FullTest.compile_full_import_so_file(task, is_debug)
2007    FullTest.compile_full_has_syntax_error_in_js(task, is_debug)
2008    FullTest.compile_full_use_normalize_ohmurl(task, is_debug)
2009    FullTest.compile_full_module_name_is_inconsistent(task, is_debug)
2010
2011    return True
2012
2013
2014def compile_incremental(task, is_debug):
2015    logging.info(
2016        f"==========> Running task: {task.name} in incremental compilation")
2017    clean_compile(task)
2018    [stdout, stderr] = compile_project(task, is_debug)
2019
2020    [is_success, time_string] = is_compile_success(stdout)
2021    if not is_success:
2022        logging.error(
2023            "Incremental compile failed due to first compile failed!")
2024        return
2025
2026    if options.arguments.compile_mode == 'incremental':
2027        passed = validate(task.full_compilation_info,
2028                          task, is_debug, stdout, stderr, 'incremental_compile_first')
2029        if not passed:
2030            logging.error(
2031                "Incremental compile failed due to first compile failed!")
2032            return
2033
2034    backup_compile_output(task, is_debug)
2035    backup_compile_cache(task, is_debug)
2036
2037    IncrementalTest.compile_incremental_no_modify(task, is_debug)
2038    IncrementalTest.compile_incremental_add_oneline(task, is_debug)
2039    IncrementalTest.compile_incremental_add_file(task, is_debug)
2040    IncrementalTest.compile_incremental_add_nonexistent_file(task, is_debug)
2041    IncrementalTest.compile_incremental_delete_file(task, is_debug)
2042
2043    IncrementalTest.compile_incremental_build_modify_error_then_fix(task, is_debug)
2044    IncrementalTest.compile_incremental_build_add_error_page(task, is_debug)
2045    IncrementalTest.compile_incremental_build_add_error_non_page(task, is_debug)
2046    IncrementalTest.compile_incremental_build_entry_then_har(task, is_debug)
2047    IncrementalTest.compile_incremental_build_har_then_entry(task, is_debug)
2048    IncrementalTest.compile_incremental_build_entry_then_hsp(task, is_debug)
2049    IncrementalTest.compile_incremental_build_hsp_then_entry(task, is_debug)
2050    IncrementalTest.compile_incremental_build_hsp_then_ohos(task, is_debug)
2051    IncrementalTest.compile_incremental_build_entry_then_ohos(task, is_debug)
2052    IncrementalTest.compile_incremental_build_entry_then_preview_build(task, is_debug)
2053
2054    # These tests require modifying the test files and synchronizing the project,
2055    # which may result in unexpected modifications
2056    IncrementalTest.compile_incremental_reverse_hap_mode(task, is_debug)
2057    IncrementalTest.compile_incremental_modify_module_name(task, is_debug)
2058    IncrementalTest.compile_incremental_build_modify_sdk_version(task, is_debug)
2059
2060
2061def compile_bytecode_har(task, is_debug):
2062    logging.info(f"==========> Running task: {task.name} in bytecode har compilation")
2063    clean_compile(task)
2064
2065    BytecodeHarTest.build_bytecode_har(task, is_debug)
2066    BytecodeHarTest.build_har_then_bytecode_har(task, is_debug)
2067    BytecodeHarTest.import_bytecode_static_library(task, is_debug)
2068
2069
2070def compile_external(task, is_debug):
2071    logging.info(f"==========> Running task: {task.name} in external compilation")
2072    clean_compile(task)
2073
2074    ExternalTest.import_external_share_library(task, is_debug)
2075    ExternalTest.import_external_static_library(task, is_debug)
2076    ExternalTest.full_compile_external_static_library(task, is_debug)
2077    ExternalTest.full_compile_external_share_library(task, is_debug)
2078
2079
2080def compile_preview(task, is_debug):
2081    clean_preview_cache(task)
2082    if not PreviewTest.preview_compile(task, is_debug):
2083        logging.error('Preview build failed, skip other preview tests')
2084        return
2085
2086    PreviewTest.compile_preview_build_entry_then_preview(task, is_debug)
2087    PreviewTest.compile_preview_build_modify_file_name(task, is_debug)
2088    PreviewTest.compile_preview_build_generate_sourcemap(task, is_debug)
2089    PreviewTest.compile_preview_build_tigger_incremental_build(task, is_debug)
2090    PreviewTest.compile_preview_build_has_arkui_error(task, is_debug)
2091    PreviewTest.compile_preview_build_sdk_path_has_special_char(task, is_debug)
2092    PreviewTest.compile_preview_build_modify_error_then_fix(task, is_debug)
2093
2094
2095def backup_compile_output(task, is_debug):
2096    backup_path = task.backup_info.cache_path
2097    if not os.path.exists(backup_path):
2098        os.mkdir(backup_path)
2099
2100    if is_debug:
2101        if len(task.backup_info.output_debug) == 2:
2102            return
2103
2104        backup_output_path = os.path.join(backup_path, 'output', 'debug')
2105        if not os.path.exists(backup_output_path):
2106            os.makedirs(backup_output_path)
2107
2108    else:
2109        if len(task.backup_info.output_release) == 2:
2110            return
2111
2112        backup_output_path = os.path.join(backup_path, 'output', 'release')
2113        if not os.path.exists(backup_output_path):
2114            os.makedirs(backup_output_path)
2115
2116    output_file = get_compile_output_file_path(task, '', options.OutputType.unsigned)
2117    shutil.copy(output_file, backup_output_path)
2118    backup_output = os.path.join(
2119        backup_output_path, os.path.basename(output_file))
2120    backup_time_output = backup_output + '-' + utils.get_time_string()
2121    shutil.move(backup_output, backup_time_output)
2122
2123    if is_debug:
2124        task.backup_info.output_debug.append(backup_time_output)
2125    else:
2126        task.backup_info.output_release.append(backup_time_output)
2127
2128
2129def backup_hsp_module_compile_signed_package(task, is_debug):
2130    if not options.arguments.run_haps:
2131        return
2132
2133    backup_path = task.backup_info.cache_path
2134    if not os.path.exists(backup_path):
2135        os.mkdir(backup_path)
2136
2137    cmd = get_hvigor_compile_cmd(task, is_debug, 'Hsp')
2138    stdout, stderr = compile_project(task, is_debug, cmd)
2139    passed, build_time = is_compile_success(stdout)
2140    if not passed:
2141        logging.debug(f'cmd: {cmd}')
2142        logging.debug(f"cmd execution path {task.path}")
2143        return
2144
2145    external_task = ExternalTest.get_external_task()
2146    cmd = get_hvigor_compile_cmd(external_task, is_debug, 'Hsp')
2147    stdout, stderr = compile_project(task, is_debug, cmd)
2148    passed, build_time = is_compile_success(stdout)
2149    if not passed:
2150        logging.debug(f'cmd: {cmd}')
2151        logging.debug(f"cmd execution path {task.path}")
2152        return
2153
2154    backup_output_path = os.path.join(backup_path, 'output', 'debug') if is_debug \
2155        else os.path.join(backup_path, 'output', 'release')
2156    if not os.path.exists(backup_output_path):
2157        os.makedirs(backup_output_path)
2158    output_file = get_compile_output_file_path(task, 'Hsp', options.OutputType.signed)
2159    backup_output = os.path.join(backup_output_path, os.path.basename(output_file))
2160    shutil.copy(output_file, backup_output_path)
2161    output_file = get_compile_output_file_path(external_task, 'Hsp', options.OutputType.signed)
2162    external_hsp_backup_output = os.path.join(backup_output_path, os.path.basename(output_file))
2163    shutil.copy(output_file, backup_output_path)
2164
2165    if is_debug:
2166        task.backup_info.hsp_signed_output_debug = backup_output
2167        task.backup_info.external_hsp_signed_output_debug = external_hsp_backup_output
2168    else:
2169        task.backup_info.hsp_signed_output_release = backup_output
2170        task.backup_info.external_hsp_signed_output_release = external_hsp_backup_output
2171
2172
2173def backup_preview_output(task, is_debug, module):
2174    backup_path = task.backup_info.cache_path
2175    if not os.path.exists(backup_path):
2176        os.mkdir(backup_path)
2177
2178    if is_debug:
2179        # We only need two backup files to compare in the ohter tests.
2180        if len(task.backup_info.preview_output_debug) == 2:
2181            return
2182
2183        backup_preview_output_path = os.path.join(backup_path, 'preview', 'debug')
2184        if not os.path.exists(backup_preview_output_path):
2185            os.makedirs(backup_preview_output_path)
2186
2187    if is_debug:
2188        if len(task.backup_info.preview_output_release) == 2:
2189            return
2190
2191        backup_preview_output_path = os.path.join(backup_path, 'preview', 'release')
2192        if not os.path.exists(backup_preview_output_path):
2193            os.makedirs(backup_preview_output_path)
2194
2195    preview_output_path = os.path.join((task.path, module, '.preview'))
2196    shutil.copy(preview_output_path, backup_path)
2197    backup_preview_output_dir = os.path.join(
2198        backup_preview_output_path, os.path.basename(preview_output_path))
2199    preview_backup_time_out = backup_preview_output_path + utils.get_time_string()
2200    shutil.move(backup_preview_output_dir, preview_backup_time_out)
2201    if is_debug:
2202        task.backup_info.output_debug.append(preview_backup_time_out)
2203    else:
2204        task.backup_info.output_release.append(preview_backup_time_out)
2205
2206
2207def backup_compile_cache(task, is_debug):
2208    backup_path = task.backup_info.cache_path
2209    if not os.path.exists(backup_path):
2210        os.mkdir(backup_path)
2211
2212    backup_cache_path = os.path.join(backup_path, 'cache')
2213    if not os.path.exists(backup_cache_path):
2214        os.mkdir(backup_cache_path)
2215    cache_files = os.path.join(
2216        task.path, *task.hap_module_path, *(task.build_path), *(task.cache_path))
2217
2218    if is_debug:
2219        if task.backup_info.cache_debug != '':
2220            return
2221
2222        cache_files = os.path.join(cache_files, 'debug')
2223        backup_cache_file = os.path.join(backup_cache_path, 'debug')
2224        shutil.copytree(cache_files, backup_cache_file)
2225        task.backup_info.cache_debug = backup_cache_file
2226    else:
2227        if task.backup_info.cache_release != '':
2228            return
2229
2230        cache_files = os.path.join(cache_files, 'release')
2231        backup_cache_file = os.path.join(backup_cache_path, 'release')
2232        shutil.copytree(cache_files, backup_cache_file)
2233        task.backup_info.cache_release = backup_cache_file
2234
2235
2236def backup_preview_cache(task, is_debug, module=''):
2237    backup_path = task.backup_info.cache_path
2238    if not os.path.exists(backup_path):
2239        os.mkdir(backup_path)
2240
2241    preview_backup_cache_path = os.path.join(backup_path, 'preview_cache')
2242    if not os.path.exists(preview_backup_cache_path):
2243        os.mkdir(preview_backup_cache_path)
2244    module_path = utils.get_module_path(task, module)
2245    preview_cache_files = os.path.join(
2246        task.path, *module_path, *task.preview_path, *task.preview_cache_path)
2247
2248    if is_debug:
2249        if task.backup_info.preview_cache_debug != '':
2250            return
2251
2252        preview_cache_files = os.path.join(preview_cache_files, 'debug')
2253        preview_backup_cache_file = os.path.join(preview_backup_cache_path, 'debug')
2254        shutil.copytree(preview_cache_files, preview_backup_cache_file)
2255        task.backup_info.preview_cache_debug = preview_backup_cache_file
2256    else:
2257        if task.backup_info.preview_cache_release != '':
2258            return
2259
2260        preview_cache_files = os.path.join(preview_cache_files, 'release')
2261        preview_backup_cache_file = os.path.join(preview_backup_cache_path, 'release')
2262        shutil.copytree(preview_cache_files, preview_backup_cache_file)
2263        task.backup_info.preview_cache_release = preview_backup_cache_file
2264
2265
2266def execute_full_compile(task):
2267    logging.info(
2268        f"==========> Running task: {task.name} in full compilation")
2269    passed = True
2270    if options.arguments.hap_mode in ['all', 'release']:
2271        passed = passed and compile_full(task, False)
2272        clean_compile(task)
2273    if options.arguments.hap_mode in ['all', 'debug']:
2274        passed = passed and compile_full(task, True)
2275        clean_compile(task)
2276    return passed
2277
2278
2279def execute_incremental_compile(task):
2280    logging.info(
2281        f"==========> Running task: {task.name} in incremental compilation")
2282    if options.arguments.hap_mode in ['all', 'release']:
2283        compile_incremental(task, False)
2284        clean_compile(task)
2285    if options.arguments.hap_mode in ['all', 'debug']:
2286        compile_incremental(task, True)
2287        clean_compile(task)
2288
2289
2290def execute_bytecode_har_compile(task):
2291    logging.info(
2292        f"==========> Running task: {task.name} in bytecode har compilation")
2293    if options.arguments.hap_mode in ['all', 'release']:
2294        compile_bytecode_har(task, False)
2295        clean_compile(task)
2296    if options.arguments.hap_mode in ['all', 'debug']:
2297        compile_bytecode_har(task, True)
2298        clean_compile(task)
2299
2300
2301def execute_external_compile(task):
2302    logging.info(
2303        f"==========> Running task: {task.name} in external compilation")
2304    if options.arguments.hap_mode in ['all', 'release']:
2305        compile_external(task, False)
2306        clean_compile(task)
2307    if options.arguments.hap_mode in ['all', 'debug']:
2308        compile_external(task, True)
2309        clean_compile(task)
2310
2311
2312def execute_preview_compile(task):
2313    logging.info(
2314        f"==========> Running task: {task.name} in preview compilation")
2315    compile_preview(task, True)
2316    clean_compile(task)
2317
2318
2319def clean_backup(task):
2320    if os.path.exists(task.backup_info.cache_path):
2321        shutil.rmtree(task.backup_info.cache_path)
2322    return
2323
2324
2325def is_build_module_successful(task, is_debug, info, module='', picture_name=''):
2326    cmd = get_hvigor_compile_cmd(task, is_debug, module)
2327    [stdout, stderr] = compile_project(task, is_debug, cmd)
2328    [is_success, time_string] = is_compile_success(stdout)
2329    if not is_success:
2330        info.result = options.TaskResult.failed
2331        module_name = utils.get_module_name(task, module)
2332        info.error_message = f'Compile failed due to build {module_name} module.'
2333        logging.error(f'build {module_name} failed')
2334        return False, ''
2335
2336    passed = validate_compile_output(info, task, is_debug, '', module)
2337
2338    if options.arguments.run_haps and picture_name:
2339        runtime_passed = run_compile_output(info, task, is_debug, picture_name)
2340
2341    time_string = collect_compile_time(time_string)
2342    return passed, time_string
2343
2344
2345def is_get_expected_error(info, stderr, expect_errors):
2346    passed = False
2347    for expect_error in expect_errors:
2348        if expect_error in stderr:
2349            passed = True
2350            break
2351
2352    if not passed:
2353        logging.error(f"True message: {stderr}, didn't get expected error message: {expect_errors}")
2354        info.result = options.TaskResult.failed
2355        info.error_message = f'Expected error message: {expect_errors}'
2356
2357    return passed
2358
2359
2360def is_build_ohos_test_successful(task, info):
2361    cmd = [*get_hvigor_path(), '--mode', 'module',
2362           '-p', 'module=entry@ohosTest', 'assembleHap']
2363    [stdout, stderr] = compile_project(task, True, cmd)
2364    [is_success, time_string] = is_compile_success(stdout)
2365
2366    if not is_success:
2367        info.result = options.TaskResult.failed
2368        info.error_message = f'Compile failed due to build ohos test.'
2369        logging.error(f'build ohos test failed')
2370        return False, ''
2371
2372    time_string = collect_compile_time(time_string)
2373    return True, time_string
2374
2375
2376def modify_main_pages_json(task, page_path, reverse):
2377    main_pages_json_path = os.path.join(task.path, *task.main_pages_json_path)
2378    with open(main_pages_json_path, 'r+', encoding='utf-8') as json_file:
2379        json_data = json.load(json_file)
2380        pages_dic = json_data['src']
2381        if reverse:
2382            if page_path not in pages_dic:
2383                pages_dic.append(page_path)
2384            logging.info(f'Page {page_path} is already in the list')
2385        else:
2386            if page_path in pages_dic:
2387                pages_dic.remove(page_path)
2388            else:
2389                logging.error(f'Page {page_path} not found in the list')
2390
2391        json_file.seek(0)
2392        json.dump(json_data, json_file, indent=2)
2393        json_file.truncate()
2394    sync_project(task)
2395
2396
2397def add_or_delete_page(task, reverse, is_error=False):
2398    patch_content = options.configs.get(
2399        'patch_content').get('patch_new_file_ets')
2400    patch_lines_error = options.configs.get(
2401        'patch_content').get('patch_lines_error')
2402    page_path = patch_content.get('path')
2403    new_file_page = patch_content.get('name')
2404    new_file_content = patch_content.get('component')
2405    new_file_directory = os.path.dirname(os.path.join(task.path, *task.inc_modify_file))
2406    new_file_path = os.path.join(new_file_directory, new_file_page)
2407
2408    if reverse:
2409        modify_main_pages_json(task, page_path, 1)
2410        with open(new_file_path, 'a', encoding='utf-8') as file:
2411            file.write(new_file_content)
2412            if is_error:
2413                error_content = patch_lines_error.get('content')
2414                file.write(error_content)
2415    else:
2416        if is_error:
2417            utils.remove_content_from_file(new_file_path, '', patch_lines_error.get('content'))
2418        else:
2419            modify_main_pages_json(task, page_path, 0)
2420            os.remove(new_file_path)
2421
2422
2423def add_or_delete_js_file(task, reverse, is_error=False):
2424    modify_file = os.path.join(task.path, *task.inc_modify_file)
2425    patch_content = options.configs.get('patch_content')
2426
2427    patch_new_file_js = patch_content.get('patch_new_file_js')
2428    js_file_name = patch_new_file_js.get('name')
2429    js_content = patch_new_file_js.get('content')
2430    modify_dic = os.path.dirname(modify_file)
2431    js_file_path = os.path.join(modify_dic, js_file_name)
2432    path_lines = patch_content.get('patch_lines_1').get('js')
2433
2434    if reverse:
2435        with open(js_file_path, 'a', encoding='utf-8') as file:
2436            file.write(js_content)
2437            if is_error:
2438                error_content = patch_content.get('patch_lines_error').get('content')
2439                file.write(error_content)
2440
2441        utils.add_content_to_file(modify_file, path_lines.get('head'), path_lines.get('tail'))
2442    else:
2443        if is_error:
2444            tail_content = patch_content.get('patch_lines_error').get('content')
2445            utils.remove_content_from_file(js_file_path, '', tail_content)
2446        else:
2447            os.remove(js_file_path)
2448            utils.remove_content_from_file(modify_file, path_lines.get('head'), path_lines.get('tail'))
2449
2450
2451def modify_normalize_ohmurl_options(task, reverse):
2452    oh_package_json_path = os.path.join(task.path, 'build-profile.json5')
2453    with open(oh_package_json_path, 'r+', encoding='utf-8') as json_file:
2454        json_data = json5.load(json_file)
2455        products = json_data['app']['products'][0]
2456        if 'buildOption' not in products:
2457            products['buildOption'] = {}
2458        build_option = products['buildOption']
2459        if 'strictMode' not in build_option:
2460            build_option['strictMode'] = {}
2461        strict_mode = build_option['strictMode']
2462        # if 'useNormalizedOHMUrl' not in products:
2463        if reverse:
2464            strict_mode['useNormalizedOHMUrl'] = True
2465        else:
2466            strict_mode['useNormalizedOHMUrl'] = False
2467
2468        json_file.seek(0)
2469        json5.dump(json_data, json_file, indent=4, ensure_ascii=False)
2470        json_file.truncate()
2471    sync_project(task)
2472
2473
2474def modify_module_import_handle(task, module, reverse):
2475    modify_file = os.path.join(task.path, *task.inc_modify_file)
2476    modify_file_patch = options.configs.get('patch_content').get('patch_lines_1').get(module.lower())
2477    if reverse:
2478        utils.add_content_to_file(modify_file, modify_file_patch.get('head'),
2479                                  modify_file_patch.get('tail'))
2480    else:
2481        utils.remove_content_from_file(modify_file, modify_file_patch.get('head'),
2482                                       modify_file_patch.get('tail'))
2483
2484
2485@contextmanager
2486def manage_module_import_and_export_handle(task, module_name):
2487    modify_module_import_handle(task, module_name, 1)
2488    try:
2489        yield
2490    finally:
2491        modify_module_import_handle(task, module_name, 0)
2492
2493
2494@contextmanager
2495def manage_bytecode_har_dependency(task, is_debug, info, module):
2496    modify_module_import_handle(task, module, 1)
2497    is_build_module_successful(task, is_debug, info, 'BytecodeHar')
2498    modify_bytecode_module_dependency(task, module, 1)
2499    try:
2500        yield
2501    finally:
2502        modify_bytecode_module_dependency(task, module, 0)
2503        modify_module_import_handle(task, module, 0)
2504
2505
2506def modify_bytecode_module_dependency(task, module, reverse):
2507    oh_package_json_path = os.path.join(task.path, task.hap_module, 'oh-package.json5')
2508    with open(oh_package_json_path, 'r+', encoding='utf-8') as json_file:
2509        json_data = json5.load(json_file)
2510        dependencies_dic = json_data["dependencies"]
2511        patch_lines = options.configs.get('patch_content').get('patch_lines_1')
2512        dependency_name = utils.extract_library_names(patch_lines.get(module.lower()).get('head'))
2513        if reverse:
2514            dependency_path = os.path.join(task.har_module, *task.build_path, *task.har_output_path_har)
2515        else:
2516            dependency_path = utils.get_module_name(task, module)
2517        dependencies_dic[dependency_name] = os.path.normpath(f"file:../{dependency_path}")
2518        json_file.seek(0)
2519        json.dump(json_data, json_file, indent=4)
2520        json_file.truncate()
2521    sync_project(task)
2522
2523
2524def modify_bytecode_har_config(task, reverse):
2525    modify_normalize_ohmurl_options(task, reverse)
2526    module_path = utils.get_module_path(task, 'Har')
2527    har_build_profile_json_path = os.path.join(task.path, *module_path, 'build-profile.json5')
2528    with open(har_build_profile_json_path, 'r+', encoding='utf-8') as json_file:
2529        json_data = json5.load(json_file)
2530        build_option_dic = json_data["buildOption"]
2531        if reverse:
2532            build_option_dic["arkOptions"] = {"byteCodeHar": True}
2533        else:
2534            build_option_dic["arkOptions"] = {"byteCodeHar": False}
2535        json_file.seek(0)
2536        json.dump(json_data, json_file, indent=4)
2537        json_file.truncate()
2538    sync_project(task)
2539
2540
2541def validate_cache_file(task, info, modified_files, cache_path, backup_path):
2542    cache_extension = utils.get_cache_extension(task.type)
2543    modified_cache_files = []
2544    # modified_files is a list of file with relative path to .../debug/release
2545    for file in modified_files:
2546        name, ext = os.path.splitext(file)
2547        modified_cache_files.append(name + cache_extension)
2548
2549    for root, dirs, files in os.walk(cache_path):
2550        for file in files:
2551            if not file.endswith(cache_extension):
2552                continue
2553            file_absolute_path = os.path.join(root, file)
2554            file_relative_path = os.path.relpath(file_absolute_path, cache_path)
2555            backup_file = os.path.join(backup_path, file_relative_path)
2556
2557            if not os.path.exists(backup_file):
2558                logging.debug(f"backup file not exits: {backup_file}")
2559                continue
2560
2561            if utils.is_file_timestamps_same(file_absolute_path, backup_file):
2562                continue
2563
2564            logging.debug(f"found file ${file_relative_path} changed")
2565            is_file_in_list = IncrementalTest.is_file_in_modified_files(
2566                task.type, file_relative_path, modified_cache_files)
2567            logging.debug(f"is file in list: {is_file_in_list}")
2568            if not is_file_in_list:
2569                logging.debug(f"Unexpected file modified: {file_relative_path}")
2570                info.result = options.TaskResult.failed
2571                info.error_message = f'Incremental compile found unexpected file timestamp changed. \
2572                                             Changed file: {file_relative_path}'
2573                return False
2574
2575    return True
2576
2577
2578def get_output_uncompressed_file(task, info, module, output_type=options.OutputType.unsigned):
2579    output_file = get_compile_output_file_path(task, module, output_type)
2580    uncompressed_output_file = output_file + '.uncompressed'
2581    if not os.path.exists(output_file):
2582        logging.error(f"outputfile: {output_file} for task: {task.name} not found")
2583        info.result = options.TaskResult.failed
2584        return ''
2585
2586    try:
2587        if utils.check_zip_file(output_file):
2588            with zipfile.ZipFile(output_file, 'r') as zip_ref:
2589                zip_ref.extractall(uncompressed_output_file)
2590        elif utils.check_gzip_file(output_file):
2591            with tarfile.open(output_file, 'r:gz') as tar_ref:
2592                tar_ref.extractall(uncompressed_output_file)
2593        else:
2594            logging.error(
2595                f"task: {task.name},not the expected file type for output file: {output_file}")
2596            info.result = options.TaskResult.failed
2597            return ''
2598    except Exception as e:
2599        logging.error(e)
2600        logging.error(
2601            f"uncompressed output file for task {task.name} failed. output file: {output_file}")
2602        info.result = options.TaskResult.failed
2603        return ''
2604    if module == 'Har' or module == 'BytecodeHar':
2605        uncompressed_output_file = os.path.join(uncompressed_output_file, 'package')
2606    return uncompressed_output_file
2607
2608
2609def get_disasm_abc_file(task, info, module, uncompressed_output_file=''):
2610    if not uncompressed_output_file:
2611        uncompressed_output_file = get_output_uncompressed_file(task, info, module)
2612
2613    if not os.path.exists(uncompressed_output_file):
2614        info.result = options.TaskResult.failed
2615        info.error_message = "uncompressed file not found"
2616        return ''
2617
2618    abc_path = ''
2619    if utils.is_esmodule(task.type):
2620        abc_path = os.path.join(uncompressed_output_file, 'ets', 'modules.abc')
2621
2622    if not os.path.exists(abc_path):
2623        info.result = options.TaskResult.failed
2624        info.error_message = "abc file not found"
2625        return ''
2626
2627    modules_abc_size = os.path.getsize(abc_path)
2628    if modules_abc_size <= 0:
2629        info.result = options.TaskResult.failed
2630        info.error_message = "abc file size is 0"
2631        return ''
2632
2633    return disasm_abc(task, abc_path)
2634
2635
2636def is_package_modules_to_module_abc(task, pa_file, module):
2637    module_str = f'{task.hap_module}@{module}'
2638
2639    return utils.file_contains_specified_fields(pa_file, module_str)
2640
2641
2642def is_normalized_ohm_url(task, is_debug, info):
2643    build_path = os.path.join(task.path, *task.hap_module_path, *task.build_path)
2644    cache_path = os.path.join(build_path, *task.cache_path, 'debug') if is_debug \
2645        else os.path.join(build_path, *task.cache_path, 'release')
2646    inc_modify_file = os.path.join(*task.inc_modify_file)
2647    dir_name, base_name = os.path.split(inc_modify_file)
2648    file_name, _ = os.path.splitext(base_name)
2649    ts_file_name = f'{file_name}.ts'
2650    ts_file_path = os.path.join(cache_path, dir_name, ts_file_name)
2651    url_string = '@normalized'
2652
2653    passed = utils.file_contains_specified_fields(ts_file_path, url_string)
2654    if not passed:
2655        info.result = options.TaskResult.failed
2656        logging.error(f'{ts_file_path} does not contain {url_string}')
2657
2658    return passed
2659
2660
2661def is_npm_txt_included_ohpm_package(info, task, is_debug, package_name):
2662    cache_file = os.path.join(task.path, *task.hap_module_path, *task.build_path, *task.cache_path)
2663    npm_entries_path = os.path.join(cache_file, 'debug', 'npmEntries.txt') if is_debug else \
2664        os.path.join(cache_file, 'release', 'npmEntries.txt')
2665
2666    if not os.path.exists(npm_entries_path):
2667        logging.error(f'{npm_entries_path} does not exist')
2668        info.result = options.TaskResult.failed
2669        return False
2670
2671    is_included = utils.file_contains_specified_fields(npm_entries_path, package_name)
2672    if not is_included:
2673        info.result = options.TaskResult.failed
2674        logging.error(f'{npm_entries_path} does not contain {package_name}')
2675
2676    return is_included
2677
2678
2679def modify_sdk_version(task, api_version):
2680    build_profile_json_file = os.path.join(task.path, 'build-profile.json5')
2681    with open(build_profile_json_file, 'r+', encoding='utf-8') as json_file:
2682        json_data = json5.load(json_file)
2683        products = json_data["app"]["products"][0]
2684        compatible_sdk_version = products["compatibleSdkVersion"]
2685        if isinstance(compatible_sdk_version, str):
2686            api_version_file_name_map = options.configs.get('api_version_file_name_map')
2687            version_str = str(api_version)
2688            for key in api_version_file_name_map.keys():
2689                if version_str in key:
2690                    version_str = key
2691                    break
2692            products["compatibleSdkVersion"] = version_str
2693        else:
2694            products["compatibleSdkVersion"] = api_version
2695
2696        json_file.seek(0)
2697        json5.dump(json_data, json_file, indent=2)
2698        json_file.truncate()
2699
2700    sync_project(task)
2701
2702
2703# Preview files require significant modifications for comparison purposes.
2704# Here, we will create a relatively simple page for testing.
2705def add_or_delete_arkui_component(task, reverse, is_error=False):
2706    preview_modify_file = os.path.join(task.path, *task.inc_modify_file)
2707    preview_modify_file_bak = preview_modify_file + '.bak'
2708
2709    if reverse:
2710        shutil.copy(preview_modify_file, preview_modify_file_bak)
2711        with open(preview_modify_file, 'r+', encoding='utf-8') as file:
2712            arkui_patch = options.configs.get('patch_content').get('arkui_patch')
2713            content = arkui_patch.get('content')
2714            pattern = re.compile(r'(build\(\)\s*\{\s*)([^{}]*)(\s*\})', re.DOTALL)
2715            component = arkui_patch.get('error_component') if is_error else arkui_patch.get('component')
2716            replacement = r'\1\2{}\3'.format(component)
2717            new_content = re.sub(pattern, replacement, content)
2718            file.seek(0)
2719            file.write(new_content)
2720            file.truncate()
2721    else:
2722        os.remove(preview_modify_file)
2723        os.rename(preview_modify_file_bak, preview_modify_file)
2724
2725
2726def execute(test_tasks):
2727    for task in test_tasks:
2728        try:
2729            logging.info(f"======> Running task: {task.name}")
2730            if options.arguments.compile_mode in ['all', 'full']:
2731                if not execute_full_compile(task):
2732                    logging.info("Full compile failed, skip other tests!")
2733                    continue
2734
2735            if options.arguments.compile_mode in ['all', 'incremental']:
2736                execute_incremental_compile(task)
2737
2738            if options.arguments.compile_mode in ['all', 'bytecode_har']:
2739                execute_bytecode_har_compile(task)
2740
2741            if options.arguments.compile_mode in ['all', 'external']:
2742                execute_external_compile(task)
2743
2744            if options.arguments.compile_mode in ['all', 'preview']:
2745                execute_preview_compile(task)
2746
2747            OtherTest.verify_binary_consistency(task)
2748
2749            # for these tests, use one hapMode maybe enough
2750            is_debug = True if options.arguments.hap_mode == 'debug' else False
2751            OtherTest.execute_break_compile(task, is_debug)
2752            if 'error' in task.type:
2753                OtherTest.compile_full_with_error(task, is_debug)
2754
2755            if 'exceed_length_error' in task.type:
2756                OtherTest.compile_with_exceed_length(task, is_debug)
2757
2758            if 'ohosTest' in task.type:
2759                OtherTest.compile_ohos_test(task)
2760
2761            logging.info(f"======> Running task: {task.name} finished")
2762        except Exception as e:
2763            logging.exception(e)
2764        finally:
2765            clean_backup(task)
2766