1e509ee18Sopenharmony_ci#!/usr/bin/env python3 2e509ee18Sopenharmony_ci# -*- coding: utf-8 -*- 3e509ee18Sopenharmony_ci""" 4e509ee18Sopenharmony_ciCopyright (c) 2024 Huawei Device Co., Ltd. 5e509ee18Sopenharmony_ciLicensed under the Apache License, Version 2.0 (the "License"); 6e509ee18Sopenharmony_ciyou may not use this file except in compliance with the License. 7e509ee18Sopenharmony_ciYou may obtain a copy of the License at 8e509ee18Sopenharmony_ci 9e509ee18Sopenharmony_ci http://www.apache.org/licenses/LICENSE-2.0 10e509ee18Sopenharmony_ci 11e509ee18Sopenharmony_ciUnless required by applicable law or agreed to in writing, software 12e509ee18Sopenharmony_cidistributed under the License is distributed on an "AS IS" BASIS, 13e509ee18Sopenharmony_ciWITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14e509ee18Sopenharmony_ciSee the License for the specific language governing permissions and 15e509ee18Sopenharmony_cilimitations under the License. 16e509ee18Sopenharmony_ci 17e509ee18Sopenharmony_ciDescription: Action words of hdc fport. 18e509ee18Sopenharmony_ci""" 19e509ee18Sopenharmony_ci 20e509ee18Sopenharmony_ciimport logging 21e509ee18Sopenharmony_ciimport subprocess 22e509ee18Sopenharmony_ci 23e509ee18Sopenharmony_ci 24e509ee18Sopenharmony_ciclass Fport(object): 25e509ee18Sopenharmony_ci retry_times = 3 26e509ee18Sopenharmony_ci increase_step = 7 27e509ee18Sopenharmony_ci 28e509ee18Sopenharmony_ci @classmethod 29e509ee18Sopenharmony_ci def fport_connect_server(cls, port, pid, bundle_name): 30e509ee18Sopenharmony_ci for _ in range(Fport.retry_times): 31e509ee18Sopenharmony_ci cmd = ['hdc', 'fport', f'tcp:{port}', f'ark:{pid}@{bundle_name}'] 32e509ee18Sopenharmony_ci logging.info('fport connect server: ' + ' '.join(cmd)) 33e509ee18Sopenharmony_ci result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 34e509ee18Sopenharmony_ci logging.info(result.stdout.strip()) 35e509ee18Sopenharmony_ci if 'TCP Port listen failed' not in result.stdout.decode('utf-8'): 36e509ee18Sopenharmony_ci assert result.stdout.decode('utf-8').strip() == 'Forwardport result:OK' 37e509ee18Sopenharmony_ci return port 38e509ee18Sopenharmony_ci else: # The port is occupied 39e509ee18Sopenharmony_ci port += Fport.increase_step 40e509ee18Sopenharmony_ci return -1 41e509ee18Sopenharmony_ci 42e509ee18Sopenharmony_ci @classmethod 43e509ee18Sopenharmony_ci def fport_debugger_server(cls, port, pid, tid=0): 44e509ee18Sopenharmony_ci for _ in range(Fport.retry_times): 45e509ee18Sopenharmony_ci if tid == 0: 46e509ee18Sopenharmony_ci cmd = ['hdc', 'fport', f'tcp:{port}', f'ark:{pid}@Debugger'] 47e509ee18Sopenharmony_ci else: 48e509ee18Sopenharmony_ci cmd = ['hdc', 'fport', f'tcp:{port}', f'ark:{pid}@{tid}@Debugger'] 49e509ee18Sopenharmony_ci logging.info('fport_debugger_server: ' + ' '.join(cmd)) 50e509ee18Sopenharmony_ci result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 51e509ee18Sopenharmony_ci logging.info(result.stdout.strip()) 52e509ee18Sopenharmony_ci if 'TCP Port listen failed' not in result.stdout.decode('utf-8'): 53e509ee18Sopenharmony_ci assert result.stdout.decode('utf-8').strip() == 'Forwardport result:OK' 54e509ee18Sopenharmony_ci return port 55e509ee18Sopenharmony_ci else: # The port is occupied 56e509ee18Sopenharmony_ci port += Fport.increase_step 57e509ee18Sopenharmony_ci return -1 58e509ee18Sopenharmony_ci 59e509ee18Sopenharmony_ci @classmethod 60e509ee18Sopenharmony_ci def clear_fport(cls): 61e509ee18Sopenharmony_ci list_fport_cmd = ['hdc', 'fport', 'ls'] 62e509ee18Sopenharmony_ci list_fport_result = subprocess.run(list_fport_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 63e509ee18Sopenharmony_ci logging.info(list_fport_result.stdout.strip()) 64e509ee18Sopenharmony_ci list_fport_out = list_fport_result.stdout.decode('utf-8') 65e509ee18Sopenharmony_ci if 'Empty' in list_fport_out: 66e509ee18Sopenharmony_ci return 67e509ee18Sopenharmony_ci for fport_item in [item for item in list_fport_out.split('[Forward]') if item != '\r\n']: 68e509ee18Sopenharmony_ci un_fport_command = (['hdc', 'fport', 'rm'] + [fport_item.split(' ')[1].split(' ')[0]] + 69e509ee18Sopenharmony_ci [fport_item.split(' ')[1].split(' ')[1]]) 70e509ee18Sopenharmony_ci un_fport_result = subprocess.run(un_fport_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 71e509ee18Sopenharmony_ci logging.info(un_fport_result.stdout.strip()) 72e509ee18Sopenharmony_ci assert 'success' in un_fport_result.stdout.decode('utf-8') 73