1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4# 5# Copyright (c) 2023 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import shutil 20import sys 21import os 22import time 23import threading 24import subprocess 25import re 26from enum import Enum 27 28from containers.status import throw_exception 29from exceptions.ohos_exception import OHOSException 30from services.interface.build_file_generator_interface import BuildFileGeneratorInterface 31from containers.arg import Arg, ModuleType 32from util.system_util import SystemUtil 33from util.io_util import IoUtil 34from util.log_util import LogUtil 35from util.component_util import ComponentUtil 36from resources.global_var import CURRENT_OHOS_ROOT, set_hpm_check_info 37 38 39class CMDTYPE(Enum): 40 BUILD = 1 41 INSTALL = 2 42 PACKAGE = 3 43 PUBLISH = 4 44 UPDATE = 5 45 46 47class Hpm(BuildFileGeneratorInterface): 48 49 def __init__(self): 50 super().__init__() 51 self._regist_hpm_path() 52 53 def run(self): 54 self.execute_hpm_cmd(CMDTYPE.BUILD) 55 56 @throw_exception 57 def execute_hpm_cmd(self, cmd_type: int, **kwargs): 58 if cmd_type == CMDTYPE.BUILD: 59 return self._execute_hpm_build_cmd() 60 elif cmd_type == CMDTYPE.INSTALL: 61 return self._execute_hpm_install_cmd() 62 elif cmd_type == CMDTYPE.PACKAGE: 63 return self._execute_hpm_pack_cmd() 64 elif cmd_type == CMDTYPE.PUBLISH: 65 return self._execute_hpm_publish_cmd() 66 elif cmd_type == CMDTYPE.UPDATE: 67 return self._execute_hpm_update_cmd() 68 else: 69 raise OHOSException( 70 'You are tring to use an unsupported hpm cmd type "{}"'.format(cmd_type), '3001') 71 72 '''Description: Get hpm excutable path and regist it 73 @parameter: none 74 @return: Status 75 ''' 76 77 @throw_exception 78 def _check_hpm_version(self, cmd, current_hpm): 79 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) 80 try: 81 out, err = proc.communicate(timeout=5) 82 except subprocess.TimeoutExpired: 83 proc.kill() 84 if proc.returncode == 0: 85 latest_hpm_version = "" 86 pattern = r'^@ohos/hpm-cli\s*\|(?:[^|]*\|){3}([^|]*)' 87 for line in out.splitlines(): 88 match = re.match(pattern, line) 89 if match: 90 latest_hpm_version = match.group(1).strip() 91 break 92 if latest_hpm_version and latest_hpm_version != current_hpm: 93 set_hpm_check_info("your current hpm version is not the latest, consider update hpm: bash build/prebuilts_config.sh") 94 95 @throw_exception 96 def _regist_hpm_path(self): 97 hpm_path = shutil.which("hpm") 98 if hpm_path and os.path.exists(hpm_path): 99 self.exec = hpm_path 100 elif os.path.exists(os.path.join(CURRENT_OHOS_ROOT, "prebuilts/hpm/node_modules/.bin/hpm")): 101 self.exec = os.path.join(CURRENT_OHOS_ROOT, "prebuilts/hpm/node_modules/.bin/hpm") 102 else: 103 raise OHOSException( 104 'There is no hpm executable file at {}'.format(hpm_path), '0001') 105 106 current_hpm_version = subprocess.run([self.exec, "-V"], capture_output=True, text=True).stdout.strip() 107 npm_path = os.path.join(CURRENT_OHOS_ROOT, "prebuilts/build-tools/common/nodejs/current/bin/npm") 108 cmd = npm_path + " search hpm-cli --registry https://registry.npmjs.org/" 109 cmd = cmd.split() 110 thread = threading.Thread(target=self._check_hpm_version, args=(cmd, current_hpm_version)) 111 thread.start() 112 113 @throw_exception 114 def _execute_hpm_build_cmd(self, **kwargs): 115 hpm_build_cmd = [self.exec, "build"] + self._convert_flags() 116 variant = hpm_build_cmd[hpm_build_cmd.index("--variant") + 1] 117 logpath = os.path.join('out', variant, 'build.log') 118 if os.path.exists(logpath): 119 mtime = os.stat(logpath).st_mtime 120 os.rename(logpath, '{}/build.{}.log'.format(os.path.dirname(logpath), mtime)) 121 SystemUtil.exec_command(hpm_build_cmd, log_path=logpath) 122 123 @throw_exception 124 def _execute_hpm_install_cmd(self, **kwargs): 125 hpm_install_cmd = [self.exec, "install"] + self._convert_flags() 126 SystemUtil.exec_command(hpm_install_cmd) 127 128 @throw_exception 129 def _execute_hpm_pack_cmd(self, **kwargs): 130 hpm_pack_cmd = [self.exec, "pack", "-t"] + self._convert_flags() 131 SystemUtil.exec_command(hpm_pack_cmd) 132 133 @throw_exception 134 def _execute_hpm_publish_cmd(self, **kwargs): 135 hpm_publish_cmd = [self.exec, "publish", "-t"] + self._convert_flags() 136 SystemUtil.exec_command(hpm_publish_cmd) 137 138 @throw_exception 139 def _execute_hpm_update_cmd(self, **kwargs): 140 hpm_update_cmd = [self.exec, "update"] + self._convert_flags() 141 SystemUtil.exec_command(hpm_update_cmd) 142 143 '''Description: Convert all registed args into a list 144 @parameter: none 145 @return: list of all registed args 146 ''' 147 148 def _convert_args(self) -> list: 149 args_list = [] 150 151 for key, value in self.args_dict.items(): 152 if isinstance(value, bool): 153 args_list.append('{}={}'.format(key, str(value).lower())) 154 155 elif isinstance(value, str): 156 args_list.append('{}="{}"'.format(key, value)) 157 158 elif isinstance(value, int): 159 args_list.append('{}={}'.format(key, value)) 160 161 elif isinstance(value, list): 162 args_list.append('{}="{}"'.format(key, "&&".join(value))) 163 164 return args_list 165 166 '''Description: Convert all registed flags into a list 167 @parameter: none 168 @return: list of all registed flags 169 ''' 170 171 def _convert_flags(self) -> list: 172 flags_list = [] 173 174 for key, value in self.flags_dict.items(): 175 # 部件参数无需参数名 176 if key == "part_name": 177 flags_list.append(str(value)) 178 else: 179 if value == '': 180 flags_list.append('--{}'.format(key)) 181 elif key == 'path': 182 flags_list.extend(['--{}'.format(key), '{}'.format(str(value))]) 183 else: 184 flags_list.extend(['--{}'.format(key).lower(), '{}'.format(str(value))]) 185 186 return flags_list 187 188 def _check_parts_validity(self, components: list): 189 illegal_components = [] 190 for component in components: 191 if not ComponentUtil.search_bundle_file(component): 192 illegal_components.append(component) 193 if illegal_components: 194 raise OHOSException('ERROR argument "--parts": Invalid parts "{}". '.format(illegal_components))