127b27ec6Sopenharmony_ci#!/usr/bin/env python3 227b27ec6Sopenharmony_ciimport subprocess 327b27ec6Sopenharmony_ciimport time 427b27ec6Sopenharmony_ciimport glob 527b27ec6Sopenharmony_ciimport os 627b27ec6Sopenharmony_ciimport tempfile 727b27ec6Sopenharmony_ciimport unittest 827b27ec6Sopenharmony_ciimport sys 927b27ec6Sopenharmony_ci 1027b27ec6Sopenharmony_ciSIZES = [3, 11] # Always 2 sizes 1127b27ec6Sopenharmony_ciMIB = 1048576 1227b27ec6Sopenharmony_ciLZ4 = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/../lz4") 1327b27ec6Sopenharmony_ciif not os.path.exists(LZ4): 1427b27ec6Sopenharmony_ci LZ4 = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/../programs/lz4") 1527b27ec6Sopenharmony_ciTEMP = tempfile.gettempdir() 1627b27ec6Sopenharmony_ci 1727b27ec6Sopenharmony_ci 1827b27ec6Sopenharmony_ciclass NVerboseFileInfo(object): 1927b27ec6Sopenharmony_ci def __init__(self, line_in): 2027b27ec6Sopenharmony_ci self.line = line_in 2127b27ec6Sopenharmony_ci splitlines = line_in.split() 2227b27ec6Sopenharmony_ci if len(splitlines) != 7: 2327b27ec6Sopenharmony_ci errout(f"Unexpected line: {line_in}") 2427b27ec6Sopenharmony_ci self.frames, self.type, self.block, self.compressed, self.uncompressed, self.ratio, self.filename = splitlines 2527b27ec6Sopenharmony_ci self.exp_unc_size = 0 2627b27ec6Sopenharmony_ci # Get real file sizes 2727b27ec6Sopenharmony_ci if "concat-all" in self.filename or "2f--content-size" in self.filename: 2827b27ec6Sopenharmony_ci for i in SIZES: 2927b27ec6Sopenharmony_ci self.exp_unc_size += os.path.getsize(f"{TEMP}/test_list_{i}M") 3027b27ec6Sopenharmony_ci else: 3127b27ec6Sopenharmony_ci uncompressed_filename = self.filename.split("-")[0] 3227b27ec6Sopenharmony_ci self.exp_unc_size += os.path.getsize(f"{TEMP}/{uncompressed_filename}") 3327b27ec6Sopenharmony_ci self.exp_comp_size = os.path.getsize(f"{TEMP}/{self.filename}") 3427b27ec6Sopenharmony_ci 3527b27ec6Sopenharmony_ci 3627b27ec6Sopenharmony_ciclass TestNonVerbose(unittest.TestCase): 3727b27ec6Sopenharmony_ci @classmethod 3827b27ec6Sopenharmony_ci def setUpClass(self): 3927b27ec6Sopenharmony_ci self.nvinfo_list = [] 4027b27ec6Sopenharmony_ci test_list_files = glob.glob(f"{TEMP}/test_list_*.lz4") 4127b27ec6Sopenharmony_ci # One of the files has 2 frames so duplicate it in this list to map each frame 1 to a single file 4227b27ec6Sopenharmony_ci for i, filename in enumerate(test_list_files): 4327b27ec6Sopenharmony_ci for i, line in enumerate(execute(f"{LZ4} --list -m {filename}", print_output=True)): 4427b27ec6Sopenharmony_ci if i > 0: 4527b27ec6Sopenharmony_ci self.nvinfo_list.append(NVerboseFileInfo(line)) 4627b27ec6Sopenharmony_ci 4727b27ec6Sopenharmony_ci def test_frames(self): 4827b27ec6Sopenharmony_ci all_concat_frames = 0 4927b27ec6Sopenharmony_ci all_concat_index = None 5027b27ec6Sopenharmony_ci for i, nvinfo in enumerate(self.nvinfo_list): 5127b27ec6Sopenharmony_ci if "concat-all" in nvinfo.filename: 5227b27ec6Sopenharmony_ci all_concat_index = i 5327b27ec6Sopenharmony_ci elif "2f--content-size" in nvinfo.filename: 5427b27ec6Sopenharmony_ci self.assertEqual("2", nvinfo.frames, nvinfo.line) 5527b27ec6Sopenharmony_ci all_concat_frames += 2 5627b27ec6Sopenharmony_ci else: 5727b27ec6Sopenharmony_ci self.assertEqual("1", nvinfo.frames, nvinfo.line) 5827b27ec6Sopenharmony_ci all_concat_frames += 1 5927b27ec6Sopenharmony_ci self.assertNotEqual(None, all_concat_index, "Couldn't find concat-all file index.") 6027b27ec6Sopenharmony_ci self.assertEqual(self.nvinfo_list[all_concat_index].frames, str(all_concat_frames), self.nvinfo_list[all_concat_index].line) 6127b27ec6Sopenharmony_ci 6227b27ec6Sopenharmony_ci def test_frame_types(self): 6327b27ec6Sopenharmony_ci for nvinfo in self.nvinfo_list: 6427b27ec6Sopenharmony_ci if "-lz4f-" in nvinfo.filename: 6527b27ec6Sopenharmony_ci self.assertEqual(nvinfo.type, "LZ4Frame", nvinfo.line) 6627b27ec6Sopenharmony_ci elif "-legc-" in nvinfo.filename: 6727b27ec6Sopenharmony_ci self.assertEqual(nvinfo.type, "LegacyFrame", nvinfo.line) 6827b27ec6Sopenharmony_ci elif "-skip-" in nvinfo.filename: 6927b27ec6Sopenharmony_ci self.assertEqual(nvinfo.type, "SkippableFrame", nvinfo.line) 7027b27ec6Sopenharmony_ci 7127b27ec6Sopenharmony_ci def test_block(self): 7227b27ec6Sopenharmony_ci for nvinfo in self.nvinfo_list: 7327b27ec6Sopenharmony_ci # if "-leg" in nvinfo.filename or "-skip" in nvinfo.filename: 7427b27ec6Sopenharmony_ci # self.assertEqual(nvinfo.block, "-", nvinfo.line) 7527b27ec6Sopenharmony_ci if "--BD" in nvinfo.filename: 7627b27ec6Sopenharmony_ci self.assertRegex(nvinfo.block, "^B[0-9]+D$", nvinfo.line) 7727b27ec6Sopenharmony_ci elif "--BI" in nvinfo.filename: 7827b27ec6Sopenharmony_ci self.assertRegex(nvinfo.block, "^B[0-9]+I$", nvinfo.line) 7927b27ec6Sopenharmony_ci 8027b27ec6Sopenharmony_ci def test_compressed_size(self): 8127b27ec6Sopenharmony_ci for nvinfo in self.nvinfo_list: 8227b27ec6Sopenharmony_ci self.assertEqual(nvinfo.compressed, to_human(nvinfo.exp_comp_size), nvinfo.line) 8327b27ec6Sopenharmony_ci 8427b27ec6Sopenharmony_ci def test_ratio(self): 8527b27ec6Sopenharmony_ci for nvinfo in self.nvinfo_list: 8627b27ec6Sopenharmony_ci if "--content-size" in nvinfo.filename: 8727b27ec6Sopenharmony_ci self.assertEqual(nvinfo.ratio, f"{float(nvinfo.exp_comp_size) / float(nvinfo.exp_unc_size) * 100:.2f}%", nvinfo.line) 8827b27ec6Sopenharmony_ci 8927b27ec6Sopenharmony_ci def test_uncompressed_size(self): 9027b27ec6Sopenharmony_ci for nvinfo in self.nvinfo_list: 9127b27ec6Sopenharmony_ci if "--content-size" in nvinfo.filename: 9227b27ec6Sopenharmony_ci self.assertEqual(nvinfo.uncompressed, to_human(nvinfo.exp_unc_size), nvinfo.line) 9327b27ec6Sopenharmony_ci 9427b27ec6Sopenharmony_ci 9527b27ec6Sopenharmony_ciclass VerboseFileInfo(object): 9627b27ec6Sopenharmony_ci def __init__(self, lines): 9727b27ec6Sopenharmony_ci # Parse lines 9827b27ec6Sopenharmony_ci self.frame_list = [] 9927b27ec6Sopenharmony_ci self.file_frame_map = [] 10027b27ec6Sopenharmony_ci for i, line in enumerate(lines): 10127b27ec6Sopenharmony_ci if i == 0: 10227b27ec6Sopenharmony_ci self.filename = line 10327b27ec6Sopenharmony_ci continue 10427b27ec6Sopenharmony_ci elif i == 1: 10527b27ec6Sopenharmony_ci # Skip header 10627b27ec6Sopenharmony_ci continue 10727b27ec6Sopenharmony_ci frame_info = dict(zip(["frame", "type", "block", "checksum", "compressed", "uncompressed", "ratio"], line.split())) 10827b27ec6Sopenharmony_ci frame_info["line"] = line 10927b27ec6Sopenharmony_ci self.frame_list.append(frame_info) 11027b27ec6Sopenharmony_ci 11127b27ec6Sopenharmony_ci 11227b27ec6Sopenharmony_ciclass TestVerbose(unittest.TestCase): 11327b27ec6Sopenharmony_ci @classmethod 11427b27ec6Sopenharmony_ci def setUpClass(self): 11527b27ec6Sopenharmony_ci # Even do we're listing 2 files to test multiline working as expected. 11627b27ec6Sopenharmony_ci # we're only really interested in testing the output of the concat-all file. 11727b27ec6Sopenharmony_ci self.vinfo_list = [] 11827b27ec6Sopenharmony_ci start = end = 0 11927b27ec6Sopenharmony_ci test_list_SM_lz4f = glob.glob(f"{TEMP}/test_list_*M-lz4f-2f--content-size.lz4") 12027b27ec6Sopenharmony_ci for i, filename in enumerate(test_list_SM_lz4f): 12127b27ec6Sopenharmony_ci output = execute(f"{LZ4} --list -m -v {TEMP}/test_list_concat-all.lz4 {filename}", print_output=True) 12227b27ec6Sopenharmony_ci for i, line in enumerate(output): 12327b27ec6Sopenharmony_ci if line.startswith("test_list"): 12427b27ec6Sopenharmony_ci if start != 0 and end != 0: 12527b27ec6Sopenharmony_ci self.vinfo_list.append(VerboseFileInfo(output[start:end])) 12627b27ec6Sopenharmony_ci start = i 12727b27ec6Sopenharmony_ci if not line: 12827b27ec6Sopenharmony_ci end = i 12927b27ec6Sopenharmony_ci self.vinfo_list.append(VerboseFileInfo(output[start:end])) 13027b27ec6Sopenharmony_ci # Populate file_frame_map as a reference of the expected info 13127b27ec6Sopenharmony_ci concat_file_list = glob.glob(f"{TEMP}/test_list_[!concat]*.lz4") 13227b27ec6Sopenharmony_ci # One of the files has 2 frames so duplicate it in this list to map each frame 1 to a single file 13327b27ec6Sopenharmony_ci for i, filename in enumerate(concat_file_list): 13427b27ec6Sopenharmony_ci if "2f--content-size" in filename: 13527b27ec6Sopenharmony_ci concat_file_list.insert(i, filename) 13627b27ec6Sopenharmony_ci break 13727b27ec6Sopenharmony_ci self.cvinfo = self.vinfo_list[0] 13827b27ec6Sopenharmony_ci self.cvinfo.file_frame_map = concat_file_list 13927b27ec6Sopenharmony_ci self.cvinfo.compressed_size = os.path.getsize(f"{TEMP}/test_list_concat-all.lz4") 14027b27ec6Sopenharmony_ci 14127b27ec6Sopenharmony_ci def test_filename(self): 14227b27ec6Sopenharmony_ci for i, vinfo in enumerate(self.vinfo_list): 14327b27ec6Sopenharmony_ci self.assertRegex(vinfo.filename, f"^test_list_.*({i + 1}/{len(self.vinfo_list)})".format(i + 1, len(self.vinfo_list))) 14427b27ec6Sopenharmony_ci 14527b27ec6Sopenharmony_ci def test_frame_number(self): 14627b27ec6Sopenharmony_ci for vinfo in self.vinfo_list: 14727b27ec6Sopenharmony_ci for i, frame_info in enumerate(vinfo.frame_list): 14827b27ec6Sopenharmony_ci self.assertEqual(frame_info["frame"], str(i + 1), frame_info["line"]) 14927b27ec6Sopenharmony_ci 15027b27ec6Sopenharmony_ci def test_frame_type(self): 15127b27ec6Sopenharmony_ci for i, frame_info in enumerate(self.cvinfo.frame_list): 15227b27ec6Sopenharmony_ci if "-lz4f-" in self.cvinfo.file_frame_map[i]: 15327b27ec6Sopenharmony_ci self.assertEqual(self.cvinfo.frame_list[i]["type"], "LZ4Frame", self.cvinfo.frame_list[i]["line"]) 15427b27ec6Sopenharmony_ci elif "-legc-" in self.cvinfo.file_frame_map[i]: 15527b27ec6Sopenharmony_ci self.assertEqual(self.cvinfo.frame_list[i]["type"], "LegacyFrame", self.cvinfo.frame_list[i]["line"]) 15627b27ec6Sopenharmony_ci elif "-skip-" in self.cvinfo.file_frame_map[i]: 15727b27ec6Sopenharmony_ci self.assertEqual(self.cvinfo.frame_list[i]["type"], "SkippableFrame", self.cvinfo.frame_list[i]["line"]) 15827b27ec6Sopenharmony_ci 15927b27ec6Sopenharmony_ci def test_block(self): 16027b27ec6Sopenharmony_ci for i, frame_info in enumerate(self.cvinfo.frame_list): 16127b27ec6Sopenharmony_ci if "--BD" in self.cvinfo.file_frame_map[i]: 16227b27ec6Sopenharmony_ci self.assertRegex(self.cvinfo.frame_list[i]["block"], "^B[0-9]+D$", self.cvinfo.frame_list[i]["line"]) 16327b27ec6Sopenharmony_ci elif "--BI" in self.cvinfo.file_frame_map[i]: 16427b27ec6Sopenharmony_ci self.assertEqual(self.cvinfo.frame_list[i]["block"], "^B[0-9]+I$", self.cvinfo.frame_list[i]["line"]) 16527b27ec6Sopenharmony_ci 16627b27ec6Sopenharmony_ci def test_checksum(self): 16727b27ec6Sopenharmony_ci for i, frame_info in enumerate(self.cvinfo.frame_list): 16827b27ec6Sopenharmony_ci if "-lz4f-" in self.cvinfo.file_frame_map[i] and "--no-frame-crc" not in self.cvinfo.file_frame_map[i]: 16927b27ec6Sopenharmony_ci self.assertEqual(self.cvinfo.frame_list[i]["checksum"], "XXH32", self.cvinfo.frame_list[i]["line"]) 17027b27ec6Sopenharmony_ci 17127b27ec6Sopenharmony_ci def test_compressed(self): 17227b27ec6Sopenharmony_ci total = 0 17327b27ec6Sopenharmony_ci for i, frame_info in enumerate(self.cvinfo.frame_list): 17427b27ec6Sopenharmony_ci if "-2f-" not in self.cvinfo.file_frame_map[i]: 17527b27ec6Sopenharmony_ci expected_size = os.path.getsize(self.cvinfo.file_frame_map[i]) 17627b27ec6Sopenharmony_ci self.assertEqual(self.cvinfo.frame_list[i]["compressed"], str(expected_size), self.cvinfo.frame_list[i]["line"]) 17727b27ec6Sopenharmony_ci total += int(self.cvinfo.frame_list[i]["compressed"]) 17827b27ec6Sopenharmony_ci self.assertEqual(total, self.cvinfo.compressed_size, f"Expected total sum ({total}) to match {self.cvinfo.filename} filesize") 17927b27ec6Sopenharmony_ci 18027b27ec6Sopenharmony_ci def test_uncompressed(self): 18127b27ec6Sopenharmony_ci for i, frame_info in enumerate(self.cvinfo.frame_list): 18227b27ec6Sopenharmony_ci ffm = self.cvinfo.file_frame_map[i] 18327b27ec6Sopenharmony_ci if "-2f-" not in ffm and "--content-size" in ffm: 18427b27ec6Sopenharmony_ci expected_size_unc = int(ffm[ffm.rindex("_") + 1:ffm.index("M")]) * 1048576 18527b27ec6Sopenharmony_ci self.assertEqual(self.cvinfo.frame_list[i]["uncompressed"], str(expected_size_unc), self.cvinfo.frame_list[i]["line"]) 18627b27ec6Sopenharmony_ci 18727b27ec6Sopenharmony_ci def test_ratio(self): 18827b27ec6Sopenharmony_ci for i, frame_info in enumerate(self.cvinfo.frame_list): 18927b27ec6Sopenharmony_ci if "--content-size" in self.cvinfo.file_frame_map[i]: 19027b27ec6Sopenharmony_ci self.assertEqual(self.cvinfo.frame_list[i]['ratio'], 19127b27ec6Sopenharmony_ci f"{float(self.cvinfo.frame_list[i]['compressed']) / float(self.cvinfo.frame_list[i]['uncompressed']) * 100:.2f}%", 19227b27ec6Sopenharmony_ci self.cvinfo.frame_list[i]["line"]) 19327b27ec6Sopenharmony_ci 19427b27ec6Sopenharmony_ci 19527b27ec6Sopenharmony_cidef to_human(size): 19627b27ec6Sopenharmony_ci for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y']: 19727b27ec6Sopenharmony_ci if size < 1024.0: 19827b27ec6Sopenharmony_ci break 19927b27ec6Sopenharmony_ci size /= 1024.0 20027b27ec6Sopenharmony_ci return f"{size:.2f}{unit}" 20127b27ec6Sopenharmony_ci 20227b27ec6Sopenharmony_ci 20327b27ec6Sopenharmony_cidef log(text): 20427b27ec6Sopenharmony_ci print(time.strftime("%Y/%m/%d %H:%M:%S") + ' - ' + text) 20527b27ec6Sopenharmony_ci 20627b27ec6Sopenharmony_ci 20727b27ec6Sopenharmony_cidef errout(text, err=1): 20827b27ec6Sopenharmony_ci log(text) 20927b27ec6Sopenharmony_ci exit(err) 21027b27ec6Sopenharmony_ci 21127b27ec6Sopenharmony_ci 21227b27ec6Sopenharmony_cidef execute(command, print_command=True, print_output=False, print_error=True): 21327b27ec6Sopenharmony_ci if os.environ.get('QEMU_SYS'): 21427b27ec6Sopenharmony_ci command = f"{os.environ['QEMU_SYS']} {command}" 21527b27ec6Sopenharmony_ci if print_command: 21627b27ec6Sopenharmony_ci log("> " + command) 21727b27ec6Sopenharmony_ci popen = subprocess.Popen(command.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE) 21827b27ec6Sopenharmony_ci stdout_lines, stderr_lines = popen.communicate() 21927b27ec6Sopenharmony_ci stderr_lines = stderr_lines.decode("utf-8") 22027b27ec6Sopenharmony_ci stdout_lines = stdout_lines.decode("utf-8") 22127b27ec6Sopenharmony_ci if print_output: 22227b27ec6Sopenharmony_ci if stdout_lines: 22327b27ec6Sopenharmony_ci print(stdout_lines) 22427b27ec6Sopenharmony_ci if stderr_lines: 22527b27ec6Sopenharmony_ci print(stderr_lines) 22627b27ec6Sopenharmony_ci if popen.returncode is not None and popen.returncode != 0: 22727b27ec6Sopenharmony_ci if stderr_lines and not print_output and print_error: 22827b27ec6Sopenharmony_ci print(stderr_lines) 22927b27ec6Sopenharmony_ci errout(f"Failed to run: {command}, {stdout_lines + stderr_lines}\n") 23027b27ec6Sopenharmony_ci return (stdout_lines + stderr_lines).splitlines() 23127b27ec6Sopenharmony_ci 23227b27ec6Sopenharmony_ci 23327b27ec6Sopenharmony_cidef cleanup(silent=False): 23427b27ec6Sopenharmony_ci for f in glob.glob(f"{TEMP}/test_list*"): 23527b27ec6Sopenharmony_ci if not silent: 23627b27ec6Sopenharmony_ci log(f"Deleting {f}") 23727b27ec6Sopenharmony_ci os.unlink(f) 23827b27ec6Sopenharmony_ci 23927b27ec6Sopenharmony_ci 24027b27ec6Sopenharmony_cidef datagen(file_name, size): 24127b27ec6Sopenharmony_ci non_sparse_size = size // 2 24227b27ec6Sopenharmony_ci sparse_size = size - non_sparse_size 24327b27ec6Sopenharmony_ci with open(file_name, "wb") as f: 24427b27ec6Sopenharmony_ci f.seek(sparse_size) 24527b27ec6Sopenharmony_ci f.write(os.urandom(non_sparse_size)) 24627b27ec6Sopenharmony_ci 24727b27ec6Sopenharmony_ci 24827b27ec6Sopenharmony_cidef generate_files(): 24927b27ec6Sopenharmony_ci # file format ~ test_list<frametype>-<no_frames>f<create-args>.lz4 ~ 25027b27ec6Sopenharmony_ci # Generate LZ4Frames 25127b27ec6Sopenharmony_ci for i in SIZES: 25227b27ec6Sopenharmony_ci filename = f"{TEMP}/test_list_{i}M" 25327b27ec6Sopenharmony_ci log(f"Generating {filename}") 25427b27ec6Sopenharmony_ci datagen(filename, i * MIB) 25527b27ec6Sopenharmony_ci for j in ["--content-size", "-BI", "-BD", "-BX", "--no-frame-crc"]: 25627b27ec6Sopenharmony_ci lz4file = f"{filename}-lz4f-1f{j}.lz4" 25727b27ec6Sopenharmony_ci execute(f"{LZ4} {j} {filename} {lz4file}") 25827b27ec6Sopenharmony_ci # Generate skippable frames 25927b27ec6Sopenharmony_ci lz4file = f"{filename}-skip-1f.lz4" 26027b27ec6Sopenharmony_ci skipsize = i * 1024 26127b27ec6Sopenharmony_ci skipbytes = bytes([80, 42, 77, 24]) + skipsize.to_bytes(4, byteorder='little', signed=False) 26227b27ec6Sopenharmony_ci with open(lz4file, 'wb') as f: 26327b27ec6Sopenharmony_ci f.write(skipbytes) 26427b27ec6Sopenharmony_ci f.write(os.urandom(skipsize)) 26527b27ec6Sopenharmony_ci # Generate legacy frames 26627b27ec6Sopenharmony_ci lz4file = f"{filename}-legc-1f.lz4" 26727b27ec6Sopenharmony_ci execute(f"{LZ4} -l {filename} {lz4file}") 26827b27ec6Sopenharmony_ci 26927b27ec6Sopenharmony_ci # Concatenate --content-size files 27027b27ec6Sopenharmony_ci file_list = glob.glob(f"{TEMP}/test_list_*-lz4f-1f--content-size.lz4") 27127b27ec6Sopenharmony_ci with open(f"{TEMP}/test_list_{sum(SIZES)}M-lz4f-2f--content-size.lz4", 'ab') as outfile: 27227b27ec6Sopenharmony_ci for fname in file_list: 27327b27ec6Sopenharmony_ci with open(fname, 'rb') as infile: 27427b27ec6Sopenharmony_ci outfile.write(infile.read()) 27527b27ec6Sopenharmony_ci 27627b27ec6Sopenharmony_ci # Concatenate all files 27727b27ec6Sopenharmony_ci file_list = glob.glob(f"{TEMP}/test_list_*.lz4") 27827b27ec6Sopenharmony_ci with open(f"{TEMP}/test_list_concat-all.lz4", 'ab') as outfile: 27927b27ec6Sopenharmony_ci for fname in file_list: 28027b27ec6Sopenharmony_ci with open(fname, 'rb') as infile: 28127b27ec6Sopenharmony_ci outfile.write(infile.read()) 28227b27ec6Sopenharmony_ci 28327b27ec6Sopenharmony_ci 28427b27ec6Sopenharmony_ciif __name__ == '__main__': 28527b27ec6Sopenharmony_ci cleanup() 28627b27ec6Sopenharmony_ci generate_files() 28727b27ec6Sopenharmony_ci ret = unittest.main(verbosity=2, exit=False) 28827b27ec6Sopenharmony_ci cleanup(silent=True) 28927b27ec6Sopenharmony_ci sys.exit(not ret.result.wasSuccessful()) 290