1e509ee18Sopenharmony_ci#!/usr/bin/env python3
2e509ee18Sopenharmony_ci# -*- coding: utf-8 -*-
3e509ee18Sopenharmony_ci"""
4e509ee18Sopenharmony_ciCopyright (c) 2024 Huawei Device Co., Ltd.
5e509ee18Sopenharmony_ciLicensed under the Apache License, Version 2.0 (the "License");
6e509ee18Sopenharmony_ciyou may not use this file except in compliance with the License.
7e509ee18Sopenharmony_ciYou may obtain a copy of the License at
8e509ee18Sopenharmony_ci
9e509ee18Sopenharmony_ci    http://www.apache.org/licenses/LICENSE-2.0
10e509ee18Sopenharmony_ci
11e509ee18Sopenharmony_ciUnless required by applicable law or agreed to in writing, software
12e509ee18Sopenharmony_cidistributed under the License is distributed on an "AS IS" BASIS,
13e509ee18Sopenharmony_ciWITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14e509ee18Sopenharmony_ciSee the License for the specific language governing permissions and
15e509ee18Sopenharmony_cilimitations under the License.
16e509ee18Sopenharmony_ci
17e509ee18Sopenharmony_ciDescription: Action words of hdc fport.
18e509ee18Sopenharmony_ci"""
19e509ee18Sopenharmony_ci
20e509ee18Sopenharmony_ciimport logging
21e509ee18Sopenharmony_ciimport subprocess
22e509ee18Sopenharmony_ci
23e509ee18Sopenharmony_ci
24e509ee18Sopenharmony_ciclass Fport(object):
25e509ee18Sopenharmony_ci    retry_times = 3
26e509ee18Sopenharmony_ci    increase_step = 7
27e509ee18Sopenharmony_ci
28e509ee18Sopenharmony_ci    @classmethod
29e509ee18Sopenharmony_ci    def fport_connect_server(cls, port, pid, bundle_name):
30e509ee18Sopenharmony_ci        for _ in range(Fport.retry_times):
31e509ee18Sopenharmony_ci            cmd = ['hdc', 'fport', f'tcp:{port}', f'ark:{pid}@{bundle_name}']
32e509ee18Sopenharmony_ci            logging.info('fport connect server: ' + ' '.join(cmd))
33e509ee18Sopenharmony_ci            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
34e509ee18Sopenharmony_ci            logging.info(result.stdout.strip())
35e509ee18Sopenharmony_ci            if 'TCP Port listen failed' not in result.stdout.decode('utf-8'):
36e509ee18Sopenharmony_ci                assert result.stdout.decode('utf-8').strip() == 'Forwardport result:OK'
37e509ee18Sopenharmony_ci                return port
38e509ee18Sopenharmony_ci            else:    # The port is occupied
39e509ee18Sopenharmony_ci                port += Fport.increase_step
40e509ee18Sopenharmony_ci        return -1
41e509ee18Sopenharmony_ci
42e509ee18Sopenharmony_ci    @classmethod
43e509ee18Sopenharmony_ci    def fport_debugger_server(cls, port, pid, tid=0):
44e509ee18Sopenharmony_ci        for _ in range(Fport.retry_times):
45e509ee18Sopenharmony_ci            if tid == 0:
46e509ee18Sopenharmony_ci                cmd = ['hdc', 'fport', f'tcp:{port}', f'ark:{pid}@Debugger']
47e509ee18Sopenharmony_ci            else:
48e509ee18Sopenharmony_ci                cmd = ['hdc', 'fport', f'tcp:{port}', f'ark:{pid}@{tid}@Debugger']
49e509ee18Sopenharmony_ci            logging.info('fport_debugger_server: ' + ' '.join(cmd))
50e509ee18Sopenharmony_ci            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
51e509ee18Sopenharmony_ci            logging.info(result.stdout.strip())
52e509ee18Sopenharmony_ci            if 'TCP Port listen failed' not in result.stdout.decode('utf-8'):
53e509ee18Sopenharmony_ci                assert result.stdout.decode('utf-8').strip() == 'Forwardport result:OK'
54e509ee18Sopenharmony_ci                return port
55e509ee18Sopenharmony_ci            else:    # The port is occupied
56e509ee18Sopenharmony_ci                port += Fport.increase_step
57e509ee18Sopenharmony_ci        return -1
58e509ee18Sopenharmony_ci
59e509ee18Sopenharmony_ci    @classmethod
60e509ee18Sopenharmony_ci    def clear_fport(cls):
61e509ee18Sopenharmony_ci        list_fport_cmd = ['hdc', 'fport', 'ls']
62e509ee18Sopenharmony_ci        list_fport_result = subprocess.run(list_fport_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
63e509ee18Sopenharmony_ci        logging.info(list_fport_result.stdout.strip())
64e509ee18Sopenharmony_ci        list_fport_out = list_fport_result.stdout.decode('utf-8')
65e509ee18Sopenharmony_ci        if 'Empty' in list_fport_out:
66e509ee18Sopenharmony_ci            return
67e509ee18Sopenharmony_ci        for fport_item in [item for item in list_fport_out.split('[Forward]') if item != '\r\n']:
68e509ee18Sopenharmony_ci            un_fport_command = (['hdc', 'fport', 'rm'] + [fport_item.split('    ')[1].split(' ')[0]] +
69e509ee18Sopenharmony_ci                                [fport_item.split('    ')[1].split(' ')[1]])
70e509ee18Sopenharmony_ci            un_fport_result = subprocess.run(un_fport_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
71e509ee18Sopenharmony_ci            logging.info(un_fport_result.stdout.strip())
72e509ee18Sopenharmony_ci            assert 'success' in un_fport_result.stdout.decode('utf-8')
73