1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# Copyright (C) 2024 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 运行环境: python 3.10+, tqdm 16# pip install tqdm 17# python hdc_recv_all_test.py 18 19import datetime 20import subprocess 21import os 22import stat 23import logging 24from multiprocessing import Pool 25import tqdm 26 27cwd = os.getcwd() 28logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') 29 30 31def get_files_list(args=""): 32 """ 33 This function returns a list of files for the input args. 34 """ 35 36 if args == "": 37 cmd = f"hdc shell find" 38 else: 39 cmd = f"hdc shell find {args}" 40 output = subprocess.check_output(cmd.split()).decode() 41 return output.split('\r\n')[:-1] 42 43 44def ls_z_check(path): 45 """ 46 List all files in directory with selinux context. 47 """ 48 49 cmd = f"hdc shell \"ls -alZ {path}\"" 50 output = subprocess.check_output(cmd.split()).decode() 51 return output.replace('\r\n', ' ').replace(',', ' ').replace('\r', ' ') 52 53 54def file_copy_check(local, remote, file_id): 55 """ 56 Check if the file has been successfully copied and could be transferred by hdc. 57 """ 58 59 if file_id >= 0: 60 pass 61 else: 62 return f"file_id error" 63 check_ret = False 64 recv_ret = False 65 copy_output = "" 66 check_output = "" 67 recv_output = "" 68 remove_output = "" 69 list_z_check = ls_z_check(remote) 70 71 if "ls:" not in list_z_check: 72 check_ret = True 73 local_real = os.path.join(local, str(file_id)) 74 if local != "": 75 if os.path.exists(local): 76 pass 77 else: 78 os.mkdir(local) 79 device_tmp_dir = "data/local/tmp" 80 copy_cmd = f"hdc shell mkdir -p {device_tmp_dir}/{file_id} && cp -rf {remote} {device_tmp_dir}/{file_id}/" 81 shell_cmd = f"hdc shell ls -alZ {device_tmp_dir}/{file_id}/" 82 recv_cmd = f"hdc file recv {device_tmp_dir}/{file_id} {local}" 83 remove_cmd = f"hdc shell rm -rf {device_tmp_dir}/{file_id}" 84 copy_output = subprocess.check_output(copy_cmd.split()).decode().replace(',', ' ') 85 check_output = subprocess.check_output(shell_cmd.split()).decode().replace(',', ' ') 86 recv_output = subprocess.check_output(recv_cmd.split()).decode().replace(',', ' ') 87 remove_output = subprocess.check_output(remove_cmd.split()).decode().replace(',', ' ') 88 if os.path.exists(local_real): 89 recv_ret = True 90 out_string = f"{check_ret},{recv_ret},{remote},{local}\ 91 {list_z_check},{copy_output},{check_output},{recv_output},{remove_output}" 92 return out_string 93 94 95def file_recv_check(local, remote, file_id): 96 """ 97 Check if the file could be transferred by hdc directly. 98 """ 99 100 if file_id >= 0: 101 pass 102 else: 103 return f"file_id error" 104 check_ret = False 105 recv_ret = False 106 recv_output = "" 107 108 list_z_check = ls_z_check(remote) 109 local_real = os.path.join(local, str(file_id)) 110 if "ls:" not in list_z_check: 111 check_ret = True 112 if local != "": 113 if os.path.exists(local): 114 pass 115 else: 116 os.mkdir(local) 117 recv_cmd = f"hdc file recv {remote} {local_real}" 118 recv_output = run_command_with_timeout(recv_cmd, timeout=60)[0].replace(',', ' ') 119 if "FileTransfer finish" in recv_output: 120 recv_ret = True 121 elif os.path.exists(local_real): 122 recv_ret = True 123 out_string = f"{check_ret},{recv_ret},{remote},{local},{list_z_check},{recv_output}" 124 return out_string 125 126 127def run_command_with_timeout(command, timeout): 128 """ 129 Run a command with timeout. 130 """ 131 132 try: 133 result = subprocess.run(command, check=True, stdout=subprocess.PIPE, 134 stderr=subprocess.PIPE, timeout=timeout) 135 return result.stdout.decode(), result.stderr.decode() 136 except subprocess.TimeoutExpired: 137 return "", "Command timed out" 138 except subprocess.CalledProcessError as e: 139 return "", e.stderr.decode() 140 141 142def write_list_csv(device_path="", host_tmp_dir=""): 143 """ 144 Write the output of the file check result to a csv file. 145 """ 146 147 list_file = get_files_list(device_path) 148 save_file = os.path.join(host_tmp_dir, "files_list.csv") 149 flags = os.O_WRONLY | os.O_CREAT 150 modes = stat.S_IRUSR | stat.S_IWUSR 151 with os.fdopen(os.open(save_file, flags, modes), 'w') as fd: 152 for file_name in list_file: 153 if file_name != "": 154 fd.write(f"{file_name}\n") 155 logging.info("List of files written to %s", save_file) 156 157 158def process_files(tmp_dir, files, process_id, chunk_size, action): 159 """ 160 Process files in chunks. 161 """ 162 163 save_dir = os.path.join(tmp_dir, f"recv_{process_id}_dir") 164 save_csv = os.path.join(tmp_dir, f"recv_files_list_{process_id}.csv") 165 if not os.path.exists(save_dir): 166 os.mkdir(save_dir) 167 cnt = 0 168 flags = os.O_WRONLY | os.O_CREAT 169 modes = stat.S_IRUSR | stat.S_IWUSR 170 with os.fdopen(os.open(save_csv, flags, modes), 'w') as fd: 171 for file in tqdm.tqdm(files): 172 file_name = file.strip() 173 cnt += 1 174 file_id = process_id * chunk_size + cnt 175 if action == 'recv': 176 out_string = file_recv_check(save_dir, file_name, file_id) 177 elif action == 'copy': 178 out_string = file_copy_check(save_dir, file_name, file_id) 179 else: 180 out_string = "" 181 data = f"{str(file_id)},{out_string}".replace('\r\n', ' ').replace('\r', ' ') 182 fd.write(f"{data}\n") 183 184 185def split_files(file_name, split_num): 186 """ 187 Multiprocess split file input list into chunks. 188 """ 189 190 if not os.path.exists(file_name): 191 logging.error("File %s not found", file_name) 192 return [] 193 if split_num <= 0: 194 logging.error("Split number must be greater than 0") 195 return [] 196 all_list = [] 197 with open(file_name, 'r') as fd: 198 lines = fd.readlines() 199 size = len(lines) 200 chunk_size = round(size / split_num) 201 for i in range(split_num): 202 start = i * chunk_size 203 end = start + chunk_size 204 if i == split_num - 1: 205 end = size 206 table_list = lines[start:end] 207 all_list.append(table_list) 208 return all_list 209 210 211def merge_file(tmp_dir, file_num, input_name, action='recv'): 212 """ 213 Merge ouput csv into single file. 214 """ 215 216 files = [] 217 header = "" 218 for i in range(file_num): 219 file = os.path.join(tmp_dir, f"{input_name}_{i}.csv") 220 files.append(file) 221 new_file = os.path.join(tmp_dir, f"merge_dir.csv") 222 if action == 'recv': 223 header = "file_id,ls result,recv result,full path,save dir,ls output,recv output\n" 224 elif action == 'copy': 225 header = "file_id,ls result,recv result,full path,dave dir,ls output,copy output,copied ls output,\ 226 recv output,remove output\n" 227 flags = os.O_WRONLY | os.O_CREAT 228 modes = stat.S_IRUSR | stat.S_IWUSR 229 with os.fdopen(os.open(new_file, flags, modes), 'w') as fd: 230 fd.write(header) 231 for file in files: 232 readline_merge(file, fd) 233 234 235def readline_merge(input_file, output_fd): 236 """ 237 Read file line by line and write to output file. 238 """ 239 240 with open(input_file, 'r') as f: 241 lines = f.readlines() 242 for line in lines: 243 output_fd.write(line) 244 245 246def run_file_check_all(tmp_dir, subprocess_num, action): 247 """ 248 Run file check, for all the files. 249 """ 250 251 file_name = os.path.join(tmp_dir, "files_list.csv") 252 all_list = split_files(file_name, subprocess_num) 253 results = [] 254 input_name = "recv_files_list" 255 with Pool(processes=subprocess_num) as pool: 256 for i in range(subprocess_num): 257 result = pool.apply_async(process_files, args=(tmp_dir, all_list[i], i, len(all_list[0]), action)) 258 results.append(result) 259 260 pool.close() 261 pool.join() 262 merge_file(tmp_dir, subprocess_num, input_name, action) 263 264 265def main(): 266 """ 267 Main function. 268 """ 269 270 time_stamp = datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d_%H") 271 tmp_dir = os.path.join(cwd, time_stamp) 272 save_file = os.path.join(tmp_dir, "files_list.csv") 273 if not os.path.exists(save_file): 274 input("第一次执行请使用root版本,\n请按下[ENTER]确认开始收集") 275 if not os.path.exists(tmp_dir): 276 os.mkdir(tmp_dir) 277 args = "/ -type d 2>/dev/null" 278 write_list_csv(args, tmp_dir) 279 else: 280 subprocess_num = 14 281 num = input("第二次执行请使用user版本,\n请确保第一次执行和第二次执行处在同一小时\n"\ 282 "请输入数字选择测试内容:\n1. [recv]user版本文件直接收取\n2. [copy]user版本文件在cp后间接收取\n") 283 action = "" 284 if num == "1": 285 action = 'recv' 286 elif num == "2": 287 action = 'copy' 288 else: 289 logging.error("输入错误,测试程序已退出,请重新执行") 290 return 291 run_file_check_all(tmp_dir, subprocess_num, action) 292 293 294if __name__ == "__main__": 295 main()