1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (C) 2024 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15# 运行环境: python 3.10+, tqdm
16# pip install tqdm
17# python hdc_recv_all_test.py
18
19import datetime
20import subprocess
21import os
22import stat
23import logging
24from multiprocessing import Pool
25import tqdm
26
27cwd = os.getcwd()
28logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
29
30
31def get_files_list(args=""):
32    """
33    This function returns a list of files for the input args.
34    """
35
36    if args == "":
37        cmd = f"hdc shell find"
38    else:
39        cmd = f"hdc shell find {args}"
40    output = subprocess.check_output(cmd.split()).decode()
41    return output.split('\r\n')[:-1]
42
43
44def ls_z_check(path):
45    """
46    List all files in directory with selinux context.
47    """
48
49    cmd = f"hdc shell \"ls -alZ {path}\""
50    output = subprocess.check_output(cmd.split()).decode()
51    return output.replace('\r\n', ' ').replace(',', ' ').replace('\r', ' ')
52
53
54def file_copy_check(local, remote, file_id):
55    """
56    Check if the file has been successfully copied and could be transferred by hdc.
57    """
58
59    if file_id >= 0:
60        pass
61    else:
62        return f"file_id error"
63    check_ret = False
64    recv_ret = False
65    copy_output = ""
66    check_output = ""
67    recv_output = ""
68    remove_output = ""
69    list_z_check = ls_z_check(remote)
70
71    if "ls:" not in list_z_check:
72        check_ret = True
73    local_real = os.path.join(local, str(file_id))
74    if local != "":
75        if os.path.exists(local):
76            pass
77        else:
78            os.mkdir(local)
79        device_tmp_dir = "data/local/tmp"
80        copy_cmd = f"hdc shell mkdir -p {device_tmp_dir}/{file_id} && cp -rf {remote} {device_tmp_dir}/{file_id}/"
81        shell_cmd = f"hdc shell ls -alZ {device_tmp_dir}/{file_id}/"
82        recv_cmd = f"hdc file recv {device_tmp_dir}/{file_id} {local}"
83        remove_cmd = f"hdc shell rm -rf {device_tmp_dir}/{file_id}"
84        copy_output = subprocess.check_output(copy_cmd.split()).decode().replace(',', ' ')
85        check_output = subprocess.check_output(shell_cmd.split()).decode().replace(',', ' ')
86        recv_output = subprocess.check_output(recv_cmd.split()).decode().replace(',', ' ')
87        remove_output = subprocess.check_output(remove_cmd.split()).decode().replace(',', ' ')
88        if os.path.exists(local_real):
89            recv_ret = True
90    out_string = f"{check_ret},{recv_ret},{remote},{local}\
91        {list_z_check},{copy_output},{check_output},{recv_output},{remove_output}"
92    return out_string
93
94
95def file_recv_check(local, remote, file_id):
96    """
97    Check if the file could be transferred by hdc directly.
98    """
99
100    if file_id >= 0:
101        pass
102    else:
103        return f"file_id error"
104    check_ret = False
105    recv_ret = False
106    recv_output = ""
107
108    list_z_check = ls_z_check(remote)
109    local_real = os.path.join(local, str(file_id))
110    if "ls:" not in list_z_check:
111        check_ret = True
112    if local != "":
113        if os.path.exists(local):
114            pass
115        else:
116            os.mkdir(local)
117        recv_cmd = f"hdc file recv {remote} {local_real}"
118        recv_output = run_command_with_timeout(recv_cmd, timeout=60)[0].replace(',', ' ')
119        if "FileTransfer finish" in recv_output:
120            recv_ret = True
121        elif os.path.exists(local_real):
122            recv_ret = True
123        out_string = f"{check_ret},{recv_ret},{remote},{local},{list_z_check},{recv_output}"
124    return out_string
125
126
127def run_command_with_timeout(command, timeout):
128    """
129    Run a command with timeout.
130    """
131
132    try:
133        result = subprocess.run(command, check=True, stdout=subprocess.PIPE,
134            stderr=subprocess.PIPE, timeout=timeout)
135        return result.stdout.decode(), result.stderr.decode()
136    except subprocess.TimeoutExpired:
137        return "", "Command timed out"
138    except subprocess.CalledProcessError as e:
139        return "", e.stderr.decode()
140
141
142def write_list_csv(device_path="", host_tmp_dir=""):
143    """
144    Write the output of the file check result to a csv file.
145    """
146
147    list_file = get_files_list(device_path)
148    save_file = os.path.join(host_tmp_dir, "files_list.csv")
149    flags = os.O_WRONLY | os.O_CREAT
150    modes = stat.S_IRUSR | stat.S_IWUSR
151    with os.fdopen(os.open(save_file, flags, modes), 'w') as fd:
152        for file_name in list_file:
153            if file_name != "":
154                fd.write(f"{file_name}\n")
155    logging.info("List of files written to %s", save_file)
156
157
158def process_files(tmp_dir, files, process_id, chunk_size, action):
159    """
160    Process files in chunks.
161    """
162
163    save_dir = os.path.join(tmp_dir, f"recv_{process_id}_dir")
164    save_csv = os.path.join(tmp_dir, f"recv_files_list_{process_id}.csv")
165    if not os.path.exists(save_dir):
166        os.mkdir(save_dir)
167    cnt = 0
168    flags = os.O_WRONLY | os.O_CREAT
169    modes = stat.S_IRUSR | stat.S_IWUSR
170    with os.fdopen(os.open(save_csv, flags, modes), 'w') as fd:
171        for file in tqdm.tqdm(files):
172            file_name = file.strip()
173            cnt += 1
174            file_id = process_id * chunk_size + cnt
175            if action == 'recv':
176                out_string = file_recv_check(save_dir, file_name, file_id)
177            elif action == 'copy':
178                out_string = file_copy_check(save_dir, file_name, file_id)
179            else:
180                out_string = ""
181            data = f"{str(file_id)},{out_string}".replace('\r\n', ' ').replace('\r', ' ')
182            fd.write(f"{data}\n")
183
184
185def split_files(file_name, split_num):
186    """
187    Multiprocess split file input list into chunks.
188    """
189
190    if not os.path.exists(file_name):
191        logging.error("File %s not found", file_name)
192        return []
193    if split_num <= 0:
194        logging.error("Split number must be greater than 0")
195        return []
196    all_list = []
197    with open(file_name, 'r') as fd:
198        lines = fd.readlines()
199        size = len(lines)
200        chunk_size = round(size / split_num)
201        for i in range(split_num):
202            start = i * chunk_size
203            end = start + chunk_size
204            if i == split_num - 1:
205                end = size
206            table_list = lines[start:end]
207            all_list.append(table_list)
208    return all_list
209
210
211def merge_file(tmp_dir, file_num, input_name, action='recv'):
212    """
213    Merge ouput csv into single file.
214    """
215
216    files = []
217    header = ""
218    for i in range(file_num):
219        file = os.path.join(tmp_dir, f"{input_name}_{i}.csv")
220        files.append(file)
221    new_file = os.path.join(tmp_dir, f"merge_dir.csv")
222    if action == 'recv':
223        header = "file_id,ls result,recv result,full path,save dir,ls output,recv output\n"
224    elif action == 'copy':
225        header = "file_id,ls result,recv result,full path,dave dir,ls output,copy output,copied ls output,\
226            recv output,remove output\n"
227    flags = os.O_WRONLY | os.O_CREAT
228    modes = stat.S_IRUSR | stat.S_IWUSR
229    with os.fdopen(os.open(new_file, flags, modes), 'w') as fd:
230        fd.write(header)
231        for file in files:
232            readline_merge(file, fd)
233
234
235def readline_merge(input_file, output_fd):
236    """
237    Read file line by line and write to output file.
238    """
239
240    with open(input_file, 'r') as f:
241        lines = f.readlines()
242        for line in lines:
243            output_fd.write(line)
244
245
246def run_file_check_all(tmp_dir, subprocess_num, action):
247    """
248    Run file check, for all the files.
249    """
250
251    file_name = os.path.join(tmp_dir, "files_list.csv")
252    all_list = split_files(file_name, subprocess_num)
253    results = []
254    input_name = "recv_files_list"
255    with Pool(processes=subprocess_num) as pool:
256        for i in range(subprocess_num):
257            result = pool.apply_async(process_files, args=(tmp_dir, all_list[i], i, len(all_list[0]), action))
258            results.append(result)
259
260        pool.close()
261        pool.join()
262        merge_file(tmp_dir, subprocess_num, input_name, action)
263
264
265def main():
266    """
267    Main function.
268    """
269
270    time_stamp = datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d_%H")
271    tmp_dir = os.path.join(cwd, time_stamp)
272    save_file = os.path.join(tmp_dir, "files_list.csv")
273    if not os.path.exists(save_file):
274        input("第一次执行请使用root版本,\n请按下[ENTER]确认开始收集")
275        if not os.path.exists(tmp_dir):
276            os.mkdir(tmp_dir)
277        args = "/ -type d 2>/dev/null"
278        write_list_csv(args, tmp_dir)
279    else:
280        subprocess_num = 14
281        num = input("第二次执行请使用user版本,\n请确保第一次执行和第二次执行处在同一小时\n"\
282            "请输入数字选择测试内容:\n1. [recv]user版本文件直接收取\n2. [copy]user版本文件在cp后间接收取\n")
283        action = ""
284        if num == "1":
285            action = 'recv'
286        elif num == "2":
287            action = 'copy'
288        else:
289            logging.error("输入错误,测试程序已退出,请重新执行")
290            return
291        run_file_check_all(tmp_dir, subprocess_num, action)
292
293
294if __name__ == "__main__":
295    main()