16881f68fSopenharmony_ci#!/usr/bin/env python3 26881f68fSopenharmony_ci 36881f68fSopenharmony_ciif __name__ == '__main__': 46881f68fSopenharmony_ci import pytest 56881f68fSopenharmony_ci import sys 66881f68fSopenharmony_ci sys.exit(pytest.main([__file__] + sys.argv[1:])) 76881f68fSopenharmony_ci 86881f68fSopenharmony_ciimport subprocess 96881f68fSopenharmony_ciimport os 106881f68fSopenharmony_ciimport sys 116881f68fSopenharmony_ciimport py 126881f68fSopenharmony_ciimport pytest 136881f68fSopenharmony_ciimport stat 146881f68fSopenharmony_ciimport shutil 156881f68fSopenharmony_ciimport filecmp 166881f68fSopenharmony_ciimport tempfile 176881f68fSopenharmony_ciimport time 186881f68fSopenharmony_ciimport errno 196881f68fSopenharmony_ciimport sys 206881f68fSopenharmony_ciimport platform 216881f68fSopenharmony_cifrom looseversion import LooseVersion 226881f68fSopenharmony_cifrom tempfile import NamedTemporaryFile 236881f68fSopenharmony_cifrom contextlib import contextmanager 246881f68fSopenharmony_cifrom util import (wait_for_mount, umount, cleanup, base_cmdline, 256881f68fSopenharmony_ci safe_sleep, basename, fuse_test_marker, test_printcap, 266881f68fSopenharmony_ci fuse_proto, powerset) 276881f68fSopenharmony_cifrom os.path import join as pjoin 286881f68fSopenharmony_ci 296881f68fSopenharmony_cipytestmark = fuse_test_marker() 306881f68fSopenharmony_ci 316881f68fSopenharmony_ciTEST_FILE = __file__ 326881f68fSopenharmony_ci 336881f68fSopenharmony_ciwith open(TEST_FILE, 'rb') as fh: 346881f68fSopenharmony_ci TEST_DATA = fh.read() 356881f68fSopenharmony_ci 366881f68fSopenharmony_cidef name_generator(__ctr=[0]): 376881f68fSopenharmony_ci __ctr[0] += 1 386881f68fSopenharmony_ci return 'testfile_%d' % __ctr[0] 396881f68fSopenharmony_ci 406881f68fSopenharmony_cioptions = [] 416881f68fSopenharmony_ciif sys.platform == 'linux': 426881f68fSopenharmony_ci options.append('clone_fd') 436881f68fSopenharmony_ci 446881f68fSopenharmony_cidef invoke_directly(mnt_dir, name, options): 456881f68fSopenharmony_ci cmdline = base_cmdline + [ pjoin(basename, 'example', name), 466881f68fSopenharmony_ci '-f', mnt_dir, '-o', ','.join(options) ] 476881f68fSopenharmony_ci if name == 'hello_ll': 486881f68fSopenharmony_ci # supports single-threading only 496881f68fSopenharmony_ci cmdline.append('-s') 506881f68fSopenharmony_ci 516881f68fSopenharmony_ci return cmdline 526881f68fSopenharmony_ci 536881f68fSopenharmony_cidef invoke_mount_fuse(mnt_dir, name, options): 546881f68fSopenharmony_ci return base_cmdline + [ pjoin(basename, 'util', 'mount.fuse3'), 556881f68fSopenharmony_ci name, mnt_dir, '-o', ','.join(options) ] 566881f68fSopenharmony_ci 576881f68fSopenharmony_cidef invoke_mount_fuse_drop_privileges(mnt_dir, name, options): 586881f68fSopenharmony_ci if os.getuid() != 0: 596881f68fSopenharmony_ci pytest.skip('drop_privileges requires root, skipping.') 606881f68fSopenharmony_ci 616881f68fSopenharmony_ci return invoke_mount_fuse(mnt_dir, name, options + ('drop_privileges',)) 626881f68fSopenharmony_ci 636881f68fSopenharmony_ciclass raii_tmpdir: 646881f68fSopenharmony_ci def __init__(self): 656881f68fSopenharmony_ci self.d = tempfile.mkdtemp() 666881f68fSopenharmony_ci 676881f68fSopenharmony_ci def __str__(self): 686881f68fSopenharmony_ci return str(self.d) 696881f68fSopenharmony_ci 706881f68fSopenharmony_ci def mkdir(self, path): 716881f68fSopenharmony_ci return py.path.local(str(self.d)).mkdir(path) 726881f68fSopenharmony_ci 736881f68fSopenharmony_ci@pytest.fixture 746881f68fSopenharmony_cidef short_tmpdir(): 756881f68fSopenharmony_ci return raii_tmpdir() 766881f68fSopenharmony_ci 776881f68fSopenharmony_cidef readdir_inode(dir): 786881f68fSopenharmony_ci cmd = base_cmdline + [ pjoin(basename, 'test', 'readdir_inode'), dir ] 796881f68fSopenharmony_ci with subprocess.Popen(cmd, stdout=subprocess.PIPE, 806881f68fSopenharmony_ci universal_newlines=True) as proc: 816881f68fSopenharmony_ci lines = proc.communicate()[0].splitlines() 826881f68fSopenharmony_ci lines.sort() 836881f68fSopenharmony_ci return lines 846881f68fSopenharmony_ci 856881f68fSopenharmony_ci 866881f68fSopenharmony_ci@pytest.mark.parametrize("cmdline_builder", (invoke_directly, invoke_mount_fuse, 876881f68fSopenharmony_ci invoke_mount_fuse_drop_privileges)) 886881f68fSopenharmony_ci@pytest.mark.parametrize("options", powerset(options)) 896881f68fSopenharmony_ci@pytest.mark.parametrize("name", ('hello', 'hello_ll')) 906881f68fSopenharmony_cidef test_hello(tmpdir, name, options, cmdline_builder, output_checker): 916881f68fSopenharmony_ci mnt_dir = str(tmpdir) 926881f68fSopenharmony_ci mount_process = subprocess.Popen( 936881f68fSopenharmony_ci cmdline_builder(mnt_dir, name, options), 946881f68fSopenharmony_ci stdout=output_checker.fd, stderr=output_checker.fd) 956881f68fSopenharmony_ci try: 966881f68fSopenharmony_ci wait_for_mount(mount_process, mnt_dir) 976881f68fSopenharmony_ci assert os.listdir(mnt_dir) == [ 'hello' ] 986881f68fSopenharmony_ci filename = pjoin(mnt_dir, 'hello') 996881f68fSopenharmony_ci with open(filename, 'r') as fh: 1006881f68fSopenharmony_ci assert fh.read() == 'Hello World!\n' 1016881f68fSopenharmony_ci with pytest.raises(IOError) as exc_info: 1026881f68fSopenharmony_ci open(filename, 'r+') 1036881f68fSopenharmony_ci assert exc_info.value.errno == errno.EACCES 1046881f68fSopenharmony_ci with pytest.raises(IOError) as exc_info: 1056881f68fSopenharmony_ci open(filename + 'does-not-exist', 'r+') 1066881f68fSopenharmony_ci assert exc_info.value.errno == errno.ENOENT 1076881f68fSopenharmony_ci if name == 'hello_ll': 1086881f68fSopenharmony_ci tst_xattr(mnt_dir) 1096881f68fSopenharmony_ci except: 1106881f68fSopenharmony_ci cleanup(mount_process, mnt_dir) 1116881f68fSopenharmony_ci raise 1126881f68fSopenharmony_ci else: 1136881f68fSopenharmony_ci umount(mount_process, mnt_dir) 1146881f68fSopenharmony_ci 1156881f68fSopenharmony_ci@pytest.mark.parametrize("writeback", (False, True)) 1166881f68fSopenharmony_ci@pytest.mark.parametrize("name", ('passthrough', 'passthrough_plus', 1176881f68fSopenharmony_ci 'passthrough_fh', 'passthrough_ll')) 1186881f68fSopenharmony_ci@pytest.mark.parametrize("debug", (False, True)) 1196881f68fSopenharmony_cidef test_passthrough(short_tmpdir, name, debug, output_checker, writeback): 1206881f68fSopenharmony_ci # Avoid false positives from libfuse debug messages 1216881f68fSopenharmony_ci if debug: 1226881f68fSopenharmony_ci output_checker.register_output(r'^ unique: [0-9]+, error: -[0-9]+ .+$', 1236881f68fSopenharmony_ci count=0) 1246881f68fSopenharmony_ci 1256881f68fSopenharmony_ci # test_syscalls prints "No error" under FreeBSD 1266881f68fSopenharmony_ci output_checker.register_output(r"^ \d\d \[[^\]]+ message: 'No error: 0'\]", 1276881f68fSopenharmony_ci count=0) 1286881f68fSopenharmony_ci 1296881f68fSopenharmony_ci mnt_dir = str(short_tmpdir.mkdir('mnt')) 1306881f68fSopenharmony_ci src_dir = str(short_tmpdir.mkdir('src')) 1316881f68fSopenharmony_ci 1326881f68fSopenharmony_ci if name == 'passthrough_plus': 1336881f68fSopenharmony_ci cmdline = base_cmdline + \ 1346881f68fSopenharmony_ci [ pjoin(basename, 'example', 'passthrough'), 1356881f68fSopenharmony_ci '--plus', '-f', mnt_dir ] 1366881f68fSopenharmony_ci else: 1376881f68fSopenharmony_ci cmdline = base_cmdline + \ 1386881f68fSopenharmony_ci [ pjoin(basename, 'example', name), 1396881f68fSopenharmony_ci '-f', mnt_dir ] 1406881f68fSopenharmony_ci if debug: 1416881f68fSopenharmony_ci cmdline.append('-d') 1426881f68fSopenharmony_ci 1436881f68fSopenharmony_ci if writeback: 1446881f68fSopenharmony_ci if name != 'passthrough_ll': 1456881f68fSopenharmony_ci pytest.skip('example does not support writeback caching') 1466881f68fSopenharmony_ci cmdline.append('-o') 1476881f68fSopenharmony_ci cmdline.append('writeback') 1486881f68fSopenharmony_ci 1496881f68fSopenharmony_ci mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd, 1506881f68fSopenharmony_ci stderr=output_checker.fd) 1516881f68fSopenharmony_ci try: 1526881f68fSopenharmony_ci wait_for_mount(mount_process, mnt_dir) 1536881f68fSopenharmony_ci work_dir = mnt_dir + src_dir 1546881f68fSopenharmony_ci 1556881f68fSopenharmony_ci tst_statvfs(work_dir) 1566881f68fSopenharmony_ci tst_readdir(src_dir, work_dir) 1576881f68fSopenharmony_ci tst_readdir_big(src_dir, work_dir) 1586881f68fSopenharmony_ci tst_open_read(src_dir, work_dir) 1596881f68fSopenharmony_ci tst_open_write(src_dir, work_dir) 1606881f68fSopenharmony_ci tst_create(work_dir) 1616881f68fSopenharmony_ci tst_passthrough(src_dir, work_dir) 1626881f68fSopenharmony_ci tst_append(src_dir, work_dir) 1636881f68fSopenharmony_ci tst_seek(src_dir, work_dir) 1646881f68fSopenharmony_ci tst_mkdir(work_dir) 1656881f68fSopenharmony_ci tst_rmdir(work_dir, src_dir) 1666881f68fSopenharmony_ci tst_unlink(work_dir, src_dir) 1676881f68fSopenharmony_ci tst_symlink(work_dir) 1686881f68fSopenharmony_ci if os.getuid() == 0: 1696881f68fSopenharmony_ci tst_chown(work_dir) 1706881f68fSopenharmony_ci 1716881f68fSopenharmony_ci # Underlying fs may not have full nanosecond resolution 1726881f68fSopenharmony_ci tst_utimens(work_dir, ns_tol=1000) 1736881f68fSopenharmony_ci 1746881f68fSopenharmony_ci tst_link(work_dir) 1756881f68fSopenharmony_ci tst_truncate_path(work_dir) 1766881f68fSopenharmony_ci tst_truncate_fd(work_dir) 1776881f68fSopenharmony_ci tst_open_unlink(work_dir) 1786881f68fSopenharmony_ci 1796881f68fSopenharmony_ci syscall_test_cmd = [ os.path.join(basename, 'test', 'test_syscalls'), 1806881f68fSopenharmony_ci work_dir, ':' + src_dir ] 1816881f68fSopenharmony_ci if writeback: 1826881f68fSopenharmony_ci # When writeback caching is enabled, kernel has to open files for 1836881f68fSopenharmony_ci # reading even when userspace opens with O_WDONLY. This fails if the 1846881f68fSopenharmony_ci # filesystem process doesn't have special permission. 1856881f68fSopenharmony_ci syscall_test_cmd.append('-53') 1866881f68fSopenharmony_ci subprocess.check_call(syscall_test_cmd) 1876881f68fSopenharmony_ci except: 1886881f68fSopenharmony_ci cleanup(mount_process, mnt_dir) 1896881f68fSopenharmony_ci raise 1906881f68fSopenharmony_ci else: 1916881f68fSopenharmony_ci umount(mount_process, mnt_dir) 1926881f68fSopenharmony_ci 1936881f68fSopenharmony_ci@pytest.mark.parametrize("cache", (False, True)) 1946881f68fSopenharmony_cidef test_passthrough_hp(short_tmpdir, cache, output_checker): 1956881f68fSopenharmony_ci mnt_dir = str(short_tmpdir.mkdir('mnt')) 1966881f68fSopenharmony_ci src_dir = str(short_tmpdir.mkdir('src')) 1976881f68fSopenharmony_ci 1986881f68fSopenharmony_ci cmdline = base_cmdline + \ 1996881f68fSopenharmony_ci [ pjoin(basename, 'example', 'passthrough_hp'), 2006881f68fSopenharmony_ci src_dir, mnt_dir ] 2016881f68fSopenharmony_ci 2026881f68fSopenharmony_ci cmdline.append('--foreground') 2036881f68fSopenharmony_ci 2046881f68fSopenharmony_ci if not cache: 2056881f68fSopenharmony_ci cmdline.append('--nocache') 2066881f68fSopenharmony_ci 2076881f68fSopenharmony_ci mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd, 2086881f68fSopenharmony_ci stderr=output_checker.fd) 2096881f68fSopenharmony_ci try: 2106881f68fSopenharmony_ci wait_for_mount(mount_process, mnt_dir) 2116881f68fSopenharmony_ci 2126881f68fSopenharmony_ci tst_statvfs(mnt_dir) 2136881f68fSopenharmony_ci tst_readdir(src_dir, mnt_dir) 2146881f68fSopenharmony_ci tst_readdir_big(src_dir, mnt_dir) 2156881f68fSopenharmony_ci tst_open_read(src_dir, mnt_dir) 2166881f68fSopenharmony_ci tst_open_write(src_dir, mnt_dir) 2176881f68fSopenharmony_ci tst_create(mnt_dir) 2186881f68fSopenharmony_ci if not cache: 2196881f68fSopenharmony_ci tst_passthrough(src_dir, mnt_dir) 2206881f68fSopenharmony_ci tst_append(src_dir, mnt_dir) 2216881f68fSopenharmony_ci tst_seek(src_dir, mnt_dir) 2226881f68fSopenharmony_ci tst_mkdir(mnt_dir) 2236881f68fSopenharmony_ci if cache: 2246881f68fSopenharmony_ci # if cache is enabled, no operations should go through 2256881f68fSopenharmony_ci # src_dir as the cache will become stale. 2266881f68fSopenharmony_ci tst_rmdir(mnt_dir) 2276881f68fSopenharmony_ci tst_unlink(mnt_dir) 2286881f68fSopenharmony_ci else: 2296881f68fSopenharmony_ci tst_rmdir(mnt_dir, src_dir) 2306881f68fSopenharmony_ci tst_unlink(mnt_dir, src_dir) 2316881f68fSopenharmony_ci tst_symlink(mnt_dir) 2326881f68fSopenharmony_ci if os.getuid() == 0: 2336881f68fSopenharmony_ci tst_chown(mnt_dir) 2346881f68fSopenharmony_ci 2356881f68fSopenharmony_ci # Underlying fs may not have full nanosecond resolution 2366881f68fSopenharmony_ci tst_utimens(mnt_dir, ns_tol=1000) 2376881f68fSopenharmony_ci 2386881f68fSopenharmony_ci tst_link(mnt_dir) 2396881f68fSopenharmony_ci tst_truncate_path(mnt_dir) 2406881f68fSopenharmony_ci tst_truncate_fd(mnt_dir) 2416881f68fSopenharmony_ci tst_open_unlink(mnt_dir) 2426881f68fSopenharmony_ci 2436881f68fSopenharmony_ci # test_syscalls assumes that changes in source directory 2446881f68fSopenharmony_ci # will be reflected immediately in mountpoint, so we 2456881f68fSopenharmony_ci # can't use it. 2466881f68fSopenharmony_ci if not cache: 2476881f68fSopenharmony_ci syscall_test_cmd = [ os.path.join(basename, 'test', 'test_syscalls'), 2486881f68fSopenharmony_ci mnt_dir, ':' + src_dir ] 2496881f68fSopenharmony_ci # unlinked testfiles check fails without kernel fix 2506881f68fSopenharmony_ci # "fuse: fix illegal access to inode with reused nodeid" 2516881f68fSopenharmony_ci # so opt-in for this test from kernel 5.14 2526881f68fSopenharmony_ci if LooseVersion(platform.release()) >= '5.14': 2536881f68fSopenharmony_ci syscall_test_cmd.append('-u') 2546881f68fSopenharmony_ci subprocess.check_call(syscall_test_cmd) 2556881f68fSopenharmony_ci except: 2566881f68fSopenharmony_ci cleanup(mount_process, mnt_dir) 2576881f68fSopenharmony_ci raise 2586881f68fSopenharmony_ci else: 2596881f68fSopenharmony_ci umount(mount_process, mnt_dir) 2606881f68fSopenharmony_ci 2616881f68fSopenharmony_ci 2626881f68fSopenharmony_ci@pytest.mark.skipif(fuse_proto < (7,11), 2636881f68fSopenharmony_ci reason='not supported by running kernel') 2646881f68fSopenharmony_cidef test_ioctl(tmpdir, output_checker): 2656881f68fSopenharmony_ci progname = pjoin(basename, 'example', 'ioctl') 2666881f68fSopenharmony_ci if not os.path.exists(progname): 2676881f68fSopenharmony_ci pytest.skip('%s not built' % os.path.basename(progname)) 2686881f68fSopenharmony_ci 2696881f68fSopenharmony_ci mnt_dir = str(tmpdir) 2706881f68fSopenharmony_ci testfile = pjoin(mnt_dir, 'fioc') 2716881f68fSopenharmony_ci cmdline = base_cmdline + [progname, '-f', mnt_dir ] 2726881f68fSopenharmony_ci mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd, 2736881f68fSopenharmony_ci stderr=output_checker.fd) 2746881f68fSopenharmony_ci try: 2756881f68fSopenharmony_ci wait_for_mount(mount_process, mnt_dir) 2766881f68fSopenharmony_ci 2776881f68fSopenharmony_ci cmdline = base_cmdline + \ 2786881f68fSopenharmony_ci [ pjoin(basename, 'example', 'ioctl_client'), 2796881f68fSopenharmony_ci testfile ] 2806881f68fSopenharmony_ci assert subprocess.check_output(cmdline) == b'0\n' 2816881f68fSopenharmony_ci with open(testfile, 'wb') as fh: 2826881f68fSopenharmony_ci fh.write(b'foobar') 2836881f68fSopenharmony_ci assert subprocess.check_output(cmdline) == b'6\n' 2846881f68fSopenharmony_ci subprocess.check_call(cmdline + [ '3' ]) 2856881f68fSopenharmony_ci with open(testfile, 'rb') as fh: 2866881f68fSopenharmony_ci assert fh.read()== b'foo' 2876881f68fSopenharmony_ci except: 2886881f68fSopenharmony_ci cleanup(mount_process, mnt_dir) 2896881f68fSopenharmony_ci raise 2906881f68fSopenharmony_ci else: 2916881f68fSopenharmony_ci umount(mount_process, mnt_dir) 2926881f68fSopenharmony_ci 2936881f68fSopenharmony_cidef test_poll(tmpdir, output_checker): 2946881f68fSopenharmony_ci mnt_dir = str(tmpdir) 2956881f68fSopenharmony_ci cmdline = base_cmdline + [pjoin(basename, 'example', 'poll'), 2966881f68fSopenharmony_ci '-f', mnt_dir ] 2976881f68fSopenharmony_ci mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd, 2986881f68fSopenharmony_ci stderr=output_checker.fd) 2996881f68fSopenharmony_ci try: 3006881f68fSopenharmony_ci wait_for_mount(mount_process, mnt_dir) 3016881f68fSopenharmony_ci cmdline = base_cmdline + \ 3026881f68fSopenharmony_ci [ pjoin(basename, 'example', 'poll_client') ] 3036881f68fSopenharmony_ci subprocess.check_call(cmdline, cwd=mnt_dir) 3046881f68fSopenharmony_ci except: 3056881f68fSopenharmony_ci cleanup(mount_process, mnt_dir) 3066881f68fSopenharmony_ci raise 3076881f68fSopenharmony_ci else: 3086881f68fSopenharmony_ci umount(mount_process, mnt_dir) 3096881f68fSopenharmony_ci 3106881f68fSopenharmony_cidef test_null(tmpdir, output_checker): 3116881f68fSopenharmony_ci progname = pjoin(basename, 'example', 'null') 3126881f68fSopenharmony_ci if not os.path.exists(progname): 3136881f68fSopenharmony_ci pytest.skip('%s not built' % os.path.basename(progname)) 3146881f68fSopenharmony_ci 3156881f68fSopenharmony_ci mnt_file = str(tmpdir) + '/file' 3166881f68fSopenharmony_ci with open(mnt_file, 'w') as fh: 3176881f68fSopenharmony_ci fh.write('dummy') 3186881f68fSopenharmony_ci cmdline = base_cmdline + [ progname, '-f', mnt_file ] 3196881f68fSopenharmony_ci mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd, 3206881f68fSopenharmony_ci stderr=output_checker.fd) 3216881f68fSopenharmony_ci def test_fn(name): 3226881f68fSopenharmony_ci return os.stat(name).st_size > 4000 3236881f68fSopenharmony_ci try: 3246881f68fSopenharmony_ci wait_for_mount(mount_process, mnt_file, test_fn) 3256881f68fSopenharmony_ci with open(mnt_file, 'rb') as fh: 3266881f68fSopenharmony_ci assert fh.read(382) == b'\0' * 382 3276881f68fSopenharmony_ci with open(mnt_file, 'wb') as fh: 3286881f68fSopenharmony_ci fh.write(b'whatever') 3296881f68fSopenharmony_ci except: 3306881f68fSopenharmony_ci cleanup(mount_process, mnt_file) 3316881f68fSopenharmony_ci raise 3326881f68fSopenharmony_ci else: 3336881f68fSopenharmony_ci umount(mount_process, mnt_file) 3346881f68fSopenharmony_ci 3356881f68fSopenharmony_ci 3366881f68fSopenharmony_ci@pytest.mark.skipif(fuse_proto < (7,12), 3376881f68fSopenharmony_ci reason='not supported by running kernel') 3386881f68fSopenharmony_ci@pytest.mark.parametrize("only_expire", ("invalidate_entries", "expire_entries")) 3396881f68fSopenharmony_ci@pytest.mark.parametrize("notify", (True, False)) 3406881f68fSopenharmony_cidef test_notify_inval_entry(tmpdir, only_expire, notify, output_checker): 3416881f68fSopenharmony_ci mnt_dir = str(tmpdir) 3426881f68fSopenharmony_ci cmdline = base_cmdline + \ 3436881f68fSopenharmony_ci [ pjoin(basename, 'example', 'notify_inval_entry'), 3446881f68fSopenharmony_ci '-f', '--update-interval=1', 3456881f68fSopenharmony_ci '--timeout=5', mnt_dir ] 3466881f68fSopenharmony_ci if not notify: 3476881f68fSopenharmony_ci cmdline.append('--no-notify') 3486881f68fSopenharmony_ci if only_expire == "expire_entries": 3496881f68fSopenharmony_ci cmdline.append('--only-expire') 3506881f68fSopenharmony_ci if fuse_proto < (7,38): 3516881f68fSopenharmony_ci pytest.skip('only-expire not supported by running kernel') 3526881f68fSopenharmony_ci mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd, 3536881f68fSopenharmony_ci stderr=output_checker.fd) 3546881f68fSopenharmony_ci try: 3556881f68fSopenharmony_ci wait_for_mount(mount_process, mnt_dir) 3566881f68fSopenharmony_ci fname = pjoin(mnt_dir, os.listdir(mnt_dir)[0]) 3576881f68fSopenharmony_ci try: 3586881f68fSopenharmony_ci os.stat(fname) 3596881f68fSopenharmony_ci except FileNotFoundError: 3606881f68fSopenharmony_ci # We may have hit a race condition and issued 3616881f68fSopenharmony_ci # readdir just before the name changed 3626881f68fSopenharmony_ci fname = pjoin(mnt_dir, os.listdir(mnt_dir)[0]) 3636881f68fSopenharmony_ci os.stat(fname) 3646881f68fSopenharmony_ci 3656881f68fSopenharmony_ci safe_sleep(2) 3666881f68fSopenharmony_ci if not notify: 3676881f68fSopenharmony_ci os.stat(fname) 3686881f68fSopenharmony_ci safe_sleep(5) 3696881f68fSopenharmony_ci with pytest.raises(FileNotFoundError): 3706881f68fSopenharmony_ci os.stat(fname) 3716881f68fSopenharmony_ci except: 3726881f68fSopenharmony_ci cleanup(mount_process, mnt_dir) 3736881f68fSopenharmony_ci raise 3746881f68fSopenharmony_ci else: 3756881f68fSopenharmony_ci umount(mount_process, mnt_dir) 3766881f68fSopenharmony_ci 3776881f68fSopenharmony_ci@pytest.mark.parametrize("intended_user", ('root', 'non_root')) 3786881f68fSopenharmony_cidef test_dev_auto_unmount(short_tmpdir, output_checker, intended_user): 3796881f68fSopenharmony_ci """Check that root can mount with dev and auto_unmount 3806881f68fSopenharmony_ci (but non-root cannot). 3816881f68fSopenharmony_ci Split into root vs non-root, so that the output of pytest 3826881f68fSopenharmony_ci makes clear what functionality is being tested.""" 3836881f68fSopenharmony_ci if os.getuid() == 0 and intended_user == 'non_root': 3846881f68fSopenharmony_ci pytest.skip('needs to run as non-root') 3856881f68fSopenharmony_ci if os.getuid() != 0 and intended_user == 'root': 3866881f68fSopenharmony_ci pytest.skip('needs to run as root') 3876881f68fSopenharmony_ci mnt_dir = str(short_tmpdir.mkdir('mnt')) 3886881f68fSopenharmony_ci src_dir = str('/dev') 3896881f68fSopenharmony_ci cmdline = base_cmdline + \ 3906881f68fSopenharmony_ci [ pjoin(basename, 'example', 'passthrough_ll'), 3916881f68fSopenharmony_ci '-o', f'source={src_dir},dev,auto_unmount', 3926881f68fSopenharmony_ci '-f', mnt_dir ] 3936881f68fSopenharmony_ci mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd, 3946881f68fSopenharmony_ci stderr=output_checker.fd) 3956881f68fSopenharmony_ci try: 3966881f68fSopenharmony_ci wait_for_mount(mount_process, mnt_dir) 3976881f68fSopenharmony_ci if os.getuid() == 0: 3986881f68fSopenharmony_ci open(pjoin(mnt_dir, 'null')).close() 3996881f68fSopenharmony_ci else: 4006881f68fSopenharmony_ci with pytest.raises(PermissionError): 4016881f68fSopenharmony_ci open(pjoin(mnt_dir, 'null')).close() 4026881f68fSopenharmony_ci except: 4036881f68fSopenharmony_ci cleanup(mount_process, mnt_dir) 4046881f68fSopenharmony_ci raise 4056881f68fSopenharmony_ci else: 4066881f68fSopenharmony_ci umount(mount_process, mnt_dir) 4076881f68fSopenharmony_ci 4086881f68fSopenharmony_ci@pytest.mark.skipif(os.getuid() != 0, 4096881f68fSopenharmony_ci reason='needs to run as root') 4106881f68fSopenharmony_cidef test_cuse(output_checker): 4116881f68fSopenharmony_ci 4126881f68fSopenharmony_ci # Valgrind warns about unknown ioctls, that's ok 4136881f68fSopenharmony_ci output_checker.register_output(r'^==([0-9]+).+unhandled ioctl.+\n' 4146881f68fSopenharmony_ci r'==\1== \s{3}.+\n' 4156881f68fSopenharmony_ci r'==\1== \s{3}.+$', count=0) 4166881f68fSopenharmony_ci 4176881f68fSopenharmony_ci devname = 'cuse-test-%d' % os.getpid() 4186881f68fSopenharmony_ci devpath = '/dev/%s' % devname 4196881f68fSopenharmony_ci cmdline = base_cmdline + \ 4206881f68fSopenharmony_ci [ pjoin(basename, 'example', 'cuse'), 4216881f68fSopenharmony_ci '-f', '--name=%s' % devname ] 4226881f68fSopenharmony_ci mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd, 4236881f68fSopenharmony_ci stderr=output_checker.fd) 4246881f68fSopenharmony_ci 4256881f68fSopenharmony_ci cmdline = base_cmdline + \ 4266881f68fSopenharmony_ci [ pjoin(basename, 'example', 'cuse_client'), 4276881f68fSopenharmony_ci devpath ] 4286881f68fSopenharmony_ci try: 4296881f68fSopenharmony_ci wait_for_mount(mount_process, devpath, 4306881f68fSopenharmony_ci test_fn=os.path.exists) 4316881f68fSopenharmony_ci assert subprocess.check_output(cmdline + ['s']) == b'0\n' 4326881f68fSopenharmony_ci data = b'some test data' 4336881f68fSopenharmony_ci off = 5 4346881f68fSopenharmony_ci proc = subprocess.Popen(cmdline + [ 'w', str(len(data)), str(off) ], 4356881f68fSopenharmony_ci stdin=subprocess.PIPE) 4366881f68fSopenharmony_ci proc.stdin.write(data) 4376881f68fSopenharmony_ci proc.stdin.close() 4386881f68fSopenharmony_ci assert proc.wait(timeout=10) == 0 4396881f68fSopenharmony_ci size = str(off + len(data)).encode() + b'\n' 4406881f68fSopenharmony_ci assert subprocess.check_output(cmdline + ['s']) == size 4416881f68fSopenharmony_ci out = subprocess.check_output( 4426881f68fSopenharmony_ci cmdline + [ 'r', str(off + len(data) + 2), '0' ]) 4436881f68fSopenharmony_ci assert out == (b'\0' * off) + data 4446881f68fSopenharmony_ci finally: 4456881f68fSopenharmony_ci mount_process.terminate() 4466881f68fSopenharmony_ci 4476881f68fSopenharmony_cidef test_release_unlink_race(tmpdir, output_checker): 4486881f68fSopenharmony_ci """test case for Issue #746 4496881f68fSopenharmony_ci 4506881f68fSopenharmony_ci If RELEASE and UNLINK opcodes are sent back to back, and fuse_fs_release() 4516881f68fSopenharmony_ci and fuse_fs_rename() are slow to execute, UNLINK will run while RELEASE is 4526881f68fSopenharmony_ci still executing. UNLINK will try to rename the file and, while the rename 4536881f68fSopenharmony_ci is happening, the RELEASE will finish executing. As a result, RELEASE will 4546881f68fSopenharmony_ci not detect in time that UNLINK has happened, and UNLINK will not detect in 4556881f68fSopenharmony_ci time that RELEASE has happened. 4566881f68fSopenharmony_ci 4576881f68fSopenharmony_ci 4586881f68fSopenharmony_ci NOTE: This is triggered only when nullpath_ok is set. 4596881f68fSopenharmony_ci 4606881f68fSopenharmony_ci If it is NOT SET then get_path_nullok() called by fuse_lib_release() will 4616881f68fSopenharmony_ci call get_path_common() and lock the path, and then the fuse_lib_unlink() 4626881f68fSopenharmony_ci will wait for the path to be unlocked before executing and thus synchronise 4636881f68fSopenharmony_ci with fuse_lib_release(). 4646881f68fSopenharmony_ci 4656881f68fSopenharmony_ci If it is SET then get_path_nullok() will just set the path to null and 4666881f68fSopenharmony_ci return without locking anything and thus allowing fuse_lib_unlink() to 4676881f68fSopenharmony_ci eventually execute unimpeded while fuse_lib_release() is still running. 4686881f68fSopenharmony_ci """ 4696881f68fSopenharmony_ci 4706881f68fSopenharmony_ci fuse_mountpoint = str(tmpdir) 4716881f68fSopenharmony_ci 4726881f68fSopenharmony_ci fuse_binary_command = base_cmdline + \ 4736881f68fSopenharmony_ci [ pjoin(basename, 'test', 'release_unlink_race'), 4746881f68fSopenharmony_ci "-f", fuse_mountpoint] 4756881f68fSopenharmony_ci 4766881f68fSopenharmony_ci fuse_process = subprocess.Popen(fuse_binary_command, 4776881f68fSopenharmony_ci stdout=output_checker.fd, 4786881f68fSopenharmony_ci stderr=output_checker.fd) 4796881f68fSopenharmony_ci 4806881f68fSopenharmony_ci try: 4816881f68fSopenharmony_ci wait_for_mount(fuse_process, fuse_mountpoint) 4826881f68fSopenharmony_ci 4836881f68fSopenharmony_ci temp_dir = tempfile.TemporaryDirectory(dir="/tmp/") 4846881f68fSopenharmony_ci temp_dir_path = temp_dir.name 4856881f68fSopenharmony_ci 4866881f68fSopenharmony_ci fuse_temp_file, fuse_temp_file_path = tempfile.mkstemp(dir=(fuse_mountpoint + temp_dir_path)) 4876881f68fSopenharmony_ci 4886881f68fSopenharmony_ci os.close(fuse_temp_file) 4896881f68fSopenharmony_ci os.unlink(fuse_temp_file_path) 4906881f68fSopenharmony_ci 4916881f68fSopenharmony_ci # needed for slow CI/CD pipelines for unlink OP to complete processing 4926881f68fSopenharmony_ci safe_sleep(3) 4936881f68fSopenharmony_ci 4946881f68fSopenharmony_ci assert os.listdir(temp_dir_path) == [] 4956881f68fSopenharmony_ci 4966881f68fSopenharmony_ci except: 4976881f68fSopenharmony_ci temp_dir.cleanup() 4986881f68fSopenharmony_ci cleanup(fuse_process, fuse_mountpoint) 4996881f68fSopenharmony_ci raise 5006881f68fSopenharmony_ci 5016881f68fSopenharmony_ci else: 5026881f68fSopenharmony_ci temp_dir.cleanup() 5036881f68fSopenharmony_ci umount(fuse_process, fuse_mountpoint) 5046881f68fSopenharmony_ci 5056881f68fSopenharmony_ci 5066881f68fSopenharmony_ci@contextmanager 5076881f68fSopenharmony_cidef os_open(name, flags): 5086881f68fSopenharmony_ci fd = os.open(name, flags) 5096881f68fSopenharmony_ci try: 5106881f68fSopenharmony_ci yield fd 5116881f68fSopenharmony_ci finally: 5126881f68fSopenharmony_ci os.close(fd) 5136881f68fSopenharmony_ci 5146881f68fSopenharmony_cidef os_create(name): 5156881f68fSopenharmony_ci os.close(os.open(name, os.O_CREAT | os.O_RDWR)) 5166881f68fSopenharmony_ci 5176881f68fSopenharmony_cidef tst_unlink(mnt_dir, src_dir=None): 5186881f68fSopenharmony_ci name = name_generator() 5196881f68fSopenharmony_ci fullname = mnt_dir + "/" + name 5206881f68fSopenharmony_ci srcname = fullname 5216881f68fSopenharmony_ci if src_dir is not None: 5226881f68fSopenharmony_ci srcname = pjoin(src_dir, name) 5236881f68fSopenharmony_ci with open(srcname, 'wb') as fh: 5246881f68fSopenharmony_ci fh.write(b'hello') 5256881f68fSopenharmony_ci assert name in os.listdir(mnt_dir) 5266881f68fSopenharmony_ci os.unlink(fullname) 5276881f68fSopenharmony_ci with pytest.raises(OSError) as exc_info: 5286881f68fSopenharmony_ci os.stat(fullname) 5296881f68fSopenharmony_ci assert exc_info.value.errno == errno.ENOENT 5306881f68fSopenharmony_ci assert name not in os.listdir(mnt_dir) 5316881f68fSopenharmony_ci 5326881f68fSopenharmony_cidef tst_mkdir(mnt_dir): 5336881f68fSopenharmony_ci dirname = name_generator() 5346881f68fSopenharmony_ci fullname = mnt_dir + "/" + dirname 5356881f68fSopenharmony_ci os.mkdir(fullname) 5366881f68fSopenharmony_ci fstat = os.stat(fullname) 5376881f68fSopenharmony_ci assert stat.S_ISDIR(fstat.st_mode) 5386881f68fSopenharmony_ci assert os.listdir(fullname) == [] 5396881f68fSopenharmony_ci # Some filesystem (e.g. BTRFS) don't track st_nlink for directories 5406881f68fSopenharmony_ci assert fstat.st_nlink in (1,2) 5416881f68fSopenharmony_ci assert dirname in os.listdir(mnt_dir) 5426881f68fSopenharmony_ci 5436881f68fSopenharmony_cidef tst_rmdir(mnt_dir, src_dir=None): 5446881f68fSopenharmony_ci name = name_generator() 5456881f68fSopenharmony_ci fullname = mnt_dir + "/" + name 5466881f68fSopenharmony_ci srcname = fullname 5476881f68fSopenharmony_ci if src_dir is not None: 5486881f68fSopenharmony_ci srcname = pjoin(src_dir, name) 5496881f68fSopenharmony_ci os.mkdir(srcname) 5506881f68fSopenharmony_ci assert name in os.listdir(mnt_dir) 5516881f68fSopenharmony_ci os.rmdir(fullname) 5526881f68fSopenharmony_ci with pytest.raises(OSError) as exc_info: 5536881f68fSopenharmony_ci os.stat(fullname) 5546881f68fSopenharmony_ci assert exc_info.value.errno == errno.ENOENT 5556881f68fSopenharmony_ci assert name not in os.listdir(mnt_dir) 5566881f68fSopenharmony_ci 5576881f68fSopenharmony_cidef tst_symlink(mnt_dir): 5586881f68fSopenharmony_ci linkname = name_generator() 5596881f68fSopenharmony_ci fullname = mnt_dir + "/" + linkname 5606881f68fSopenharmony_ci os.symlink("/imaginary/dest", fullname) 5616881f68fSopenharmony_ci fstat = os.lstat(fullname) 5626881f68fSopenharmony_ci assert stat.S_ISLNK(fstat.st_mode) 5636881f68fSopenharmony_ci assert os.readlink(fullname) == "/imaginary/dest" 5646881f68fSopenharmony_ci assert fstat.st_nlink == 1 5656881f68fSopenharmony_ci assert linkname in os.listdir(mnt_dir) 5666881f68fSopenharmony_ci 5676881f68fSopenharmony_cidef tst_create(mnt_dir): 5686881f68fSopenharmony_ci name = name_generator() 5696881f68fSopenharmony_ci fullname = pjoin(mnt_dir, name) 5706881f68fSopenharmony_ci with pytest.raises(OSError) as exc_info: 5716881f68fSopenharmony_ci os.stat(fullname) 5726881f68fSopenharmony_ci assert exc_info.value.errno == errno.ENOENT 5736881f68fSopenharmony_ci assert name not in os.listdir(mnt_dir) 5746881f68fSopenharmony_ci 5756881f68fSopenharmony_ci fd = os.open(fullname, os.O_CREAT | os.O_RDWR) 5766881f68fSopenharmony_ci os.close(fd) 5776881f68fSopenharmony_ci 5786881f68fSopenharmony_ci assert name in os.listdir(mnt_dir) 5796881f68fSopenharmony_ci fstat = os.lstat(fullname) 5806881f68fSopenharmony_ci assert stat.S_ISREG(fstat.st_mode) 5816881f68fSopenharmony_ci assert fstat.st_nlink == 1 5826881f68fSopenharmony_ci assert fstat.st_size == 0 5836881f68fSopenharmony_ci 5846881f68fSopenharmony_cidef tst_chown(mnt_dir): 5856881f68fSopenharmony_ci filename = pjoin(mnt_dir, name_generator()) 5866881f68fSopenharmony_ci os.mkdir(filename) 5876881f68fSopenharmony_ci fstat = os.lstat(filename) 5886881f68fSopenharmony_ci uid = fstat.st_uid 5896881f68fSopenharmony_ci gid = fstat.st_gid 5906881f68fSopenharmony_ci 5916881f68fSopenharmony_ci uid_new = uid + 1 5926881f68fSopenharmony_ci os.chown(filename, uid_new, -1) 5936881f68fSopenharmony_ci fstat = os.lstat(filename) 5946881f68fSopenharmony_ci assert fstat.st_uid == uid_new 5956881f68fSopenharmony_ci assert fstat.st_gid == gid 5966881f68fSopenharmony_ci 5976881f68fSopenharmony_ci gid_new = gid + 1 5986881f68fSopenharmony_ci os.chown(filename, -1, gid_new) 5996881f68fSopenharmony_ci fstat = os.lstat(filename) 6006881f68fSopenharmony_ci assert fstat.st_uid == uid_new 6016881f68fSopenharmony_ci assert fstat.st_gid == gid_new 6026881f68fSopenharmony_ci 6036881f68fSopenharmony_cidef tst_open_read(src_dir, mnt_dir): 6046881f68fSopenharmony_ci name = name_generator() 6056881f68fSopenharmony_ci with open(pjoin(src_dir, name), 'wb') as fh_out, \ 6066881f68fSopenharmony_ci open(TEST_FILE, 'rb') as fh_in: 6076881f68fSopenharmony_ci shutil.copyfileobj(fh_in, fh_out) 6086881f68fSopenharmony_ci 6096881f68fSopenharmony_ci assert filecmp.cmp(pjoin(mnt_dir, name), TEST_FILE, False) 6106881f68fSopenharmony_ci 6116881f68fSopenharmony_cidef tst_open_write(src_dir, mnt_dir): 6126881f68fSopenharmony_ci name = name_generator() 6136881f68fSopenharmony_ci os_create(pjoin(src_dir, name)) 6146881f68fSopenharmony_ci fullname = pjoin(mnt_dir, name) 6156881f68fSopenharmony_ci with open(fullname, 'wb') as fh_out, \ 6166881f68fSopenharmony_ci open(TEST_FILE, 'rb') as fh_in: 6176881f68fSopenharmony_ci shutil.copyfileobj(fh_in, fh_out) 6186881f68fSopenharmony_ci 6196881f68fSopenharmony_ci assert filecmp.cmp(fullname, TEST_FILE, False) 6206881f68fSopenharmony_ci 6216881f68fSopenharmony_cidef tst_append(src_dir, mnt_dir): 6226881f68fSopenharmony_ci name = name_generator() 6236881f68fSopenharmony_ci os_create(pjoin(src_dir, name)) 6246881f68fSopenharmony_ci fullname = pjoin(mnt_dir, name) 6256881f68fSopenharmony_ci with os_open(fullname, os.O_WRONLY) as fd: 6266881f68fSopenharmony_ci os.write(fd, b'foo\n') 6276881f68fSopenharmony_ci with os_open(fullname, os.O_WRONLY|os.O_APPEND) as fd: 6286881f68fSopenharmony_ci os.write(fd, b'bar\n') 6296881f68fSopenharmony_ci 6306881f68fSopenharmony_ci with open(fullname, 'rb') as fh: 6316881f68fSopenharmony_ci assert fh.read() == b'foo\nbar\n' 6326881f68fSopenharmony_ci 6336881f68fSopenharmony_cidef tst_seek(src_dir, mnt_dir): 6346881f68fSopenharmony_ci name = name_generator() 6356881f68fSopenharmony_ci os_create(pjoin(src_dir, name)) 6366881f68fSopenharmony_ci fullname = pjoin(mnt_dir, name) 6376881f68fSopenharmony_ci with os_open(fullname, os.O_WRONLY) as fd: 6386881f68fSopenharmony_ci os.lseek(fd, 1, os.SEEK_SET) 6396881f68fSopenharmony_ci os.write(fd, b'foobar\n') 6406881f68fSopenharmony_ci with os_open(fullname, os.O_WRONLY) as fd: 6416881f68fSopenharmony_ci os.lseek(fd, 4, os.SEEK_SET) 6426881f68fSopenharmony_ci os.write(fd, b'com') 6436881f68fSopenharmony_ci 6446881f68fSopenharmony_ci with open(fullname, 'rb') as fh: 6456881f68fSopenharmony_ci assert fh.read() == b'\0foocom\n' 6466881f68fSopenharmony_ci 6476881f68fSopenharmony_cidef tst_open_unlink(mnt_dir): 6486881f68fSopenharmony_ci name = pjoin(mnt_dir, name_generator()) 6496881f68fSopenharmony_ci data1 = b'foo' 6506881f68fSopenharmony_ci data2 = b'bar' 6516881f68fSopenharmony_ci fullname = pjoin(mnt_dir, name) 6526881f68fSopenharmony_ci with open(fullname, 'wb+', buffering=0) as fh: 6536881f68fSopenharmony_ci fh.write(data1) 6546881f68fSopenharmony_ci os.unlink(fullname) 6556881f68fSopenharmony_ci with pytest.raises(OSError) as exc_info: 6566881f68fSopenharmony_ci os.stat(fullname) 6576881f68fSopenharmony_ci assert exc_info.value.errno == errno.ENOENT 6586881f68fSopenharmony_ci assert name not in os.listdir(mnt_dir) 6596881f68fSopenharmony_ci fh.write(data2) 6606881f68fSopenharmony_ci fh.seek(0) 6616881f68fSopenharmony_ci assert fh.read() == data1+data2 6626881f68fSopenharmony_ci 6636881f68fSopenharmony_cidef tst_statvfs(mnt_dir): 6646881f68fSopenharmony_ci os.statvfs(mnt_dir) 6656881f68fSopenharmony_ci 6666881f68fSopenharmony_cidef tst_link(mnt_dir): 6676881f68fSopenharmony_ci name1 = pjoin(mnt_dir, name_generator()) 6686881f68fSopenharmony_ci name2 = pjoin(mnt_dir, name_generator()) 6696881f68fSopenharmony_ci shutil.copyfile(TEST_FILE, name1) 6706881f68fSopenharmony_ci assert filecmp.cmp(name1, TEST_FILE, False) 6716881f68fSopenharmony_ci 6726881f68fSopenharmony_ci fstat1 = os.lstat(name1) 6736881f68fSopenharmony_ci assert fstat1.st_nlink == 1 6746881f68fSopenharmony_ci 6756881f68fSopenharmony_ci os.link(name1, name2) 6766881f68fSopenharmony_ci 6776881f68fSopenharmony_ci fstat1 = os.lstat(name1) 6786881f68fSopenharmony_ci fstat2 = os.lstat(name2) 6796881f68fSopenharmony_ci assert fstat1 == fstat2 6806881f68fSopenharmony_ci assert fstat1.st_nlink == 2 6816881f68fSopenharmony_ci assert os.path.basename(name2) in os.listdir(mnt_dir) 6826881f68fSopenharmony_ci assert filecmp.cmp(name1, name2, False) 6836881f68fSopenharmony_ci 6846881f68fSopenharmony_ci # Since RELEASE requests are asynchronous, it is possible that 6856881f68fSopenharmony_ci # libfuse still considers the file to be open at this point 6866881f68fSopenharmony_ci # and (since -o hard_remove is not used) renames it instead of 6876881f68fSopenharmony_ci # deleting it. In that case, the following lstat() call will 6886881f68fSopenharmony_ci # still report an st_nlink value of 2 (cf. issue #157). 6896881f68fSopenharmony_ci os.unlink(name2) 6906881f68fSopenharmony_ci 6916881f68fSopenharmony_ci assert os.path.basename(name2) not in os.listdir(mnt_dir) 6926881f68fSopenharmony_ci with pytest.raises(FileNotFoundError): 6936881f68fSopenharmony_ci os.lstat(name2) 6946881f68fSopenharmony_ci 6956881f68fSopenharmony_ci # See above, we may have to wait until RELEASE has been 6966881f68fSopenharmony_ci # received before the st_nlink value is correct. 6976881f68fSopenharmony_ci maxwait = time.time() + 2 6986881f68fSopenharmony_ci fstat1 = os.lstat(name1) 6996881f68fSopenharmony_ci while fstat1.st_nlink == 2 and time.time() < maxwait: 7006881f68fSopenharmony_ci fstat1 = os.lstat(name1) 7016881f68fSopenharmony_ci time.sleep(0.1) 7026881f68fSopenharmony_ci assert fstat1.st_nlink == 1 7036881f68fSopenharmony_ci 7046881f68fSopenharmony_ci os.unlink(name1) 7056881f68fSopenharmony_ci 7066881f68fSopenharmony_cidef tst_readdir(src_dir, mnt_dir): 7076881f68fSopenharmony_ci newdir = name_generator() 7086881f68fSopenharmony_ci 7096881f68fSopenharmony_ci src_newdir = pjoin(src_dir, newdir) 7106881f68fSopenharmony_ci mnt_newdir = pjoin(mnt_dir, newdir) 7116881f68fSopenharmony_ci file_ = src_newdir + "/" + name_generator() 7126881f68fSopenharmony_ci subdir = src_newdir + "/" + name_generator() 7136881f68fSopenharmony_ci subfile = subdir + "/" + name_generator() 7146881f68fSopenharmony_ci 7156881f68fSopenharmony_ci os.mkdir(src_newdir) 7166881f68fSopenharmony_ci shutil.copyfile(TEST_FILE, file_) 7176881f68fSopenharmony_ci os.mkdir(subdir) 7186881f68fSopenharmony_ci shutil.copyfile(TEST_FILE, subfile) 7196881f68fSopenharmony_ci 7206881f68fSopenharmony_ci listdir_is = os.listdir(mnt_newdir) 7216881f68fSopenharmony_ci listdir_is.sort() 7226881f68fSopenharmony_ci listdir_should = [ os.path.basename(file_), os.path.basename(subdir) ] 7236881f68fSopenharmony_ci listdir_should.sort() 7246881f68fSopenharmony_ci assert listdir_is == listdir_should 7256881f68fSopenharmony_ci 7266881f68fSopenharmony_ci inodes_is = readdir_inode(mnt_newdir) 7276881f68fSopenharmony_ci inodes_should = readdir_inode(src_newdir) 7286881f68fSopenharmony_ci assert inodes_is == inodes_should 7296881f68fSopenharmony_ci 7306881f68fSopenharmony_ci os.unlink(file_) 7316881f68fSopenharmony_ci os.unlink(subfile) 7326881f68fSopenharmony_ci os.rmdir(subdir) 7336881f68fSopenharmony_ci os.rmdir(src_newdir) 7346881f68fSopenharmony_ci 7356881f68fSopenharmony_cidef tst_readdir_big(src_dir, mnt_dir): 7366881f68fSopenharmony_ci 7376881f68fSopenharmony_ci # Add enough entries so that readdir needs to be called 7386881f68fSopenharmony_ci # multiple times. 7396881f68fSopenharmony_ci fnames = [] 7406881f68fSopenharmony_ci for i in range(500): 7416881f68fSopenharmony_ci fname = ('A rather long filename to make sure that we ' 7426881f68fSopenharmony_ci 'fill up the buffer - ' * 3) + str(i) 7436881f68fSopenharmony_ci with open(pjoin(src_dir, fname), 'w') as fh: 7446881f68fSopenharmony_ci fh.write('File %d' % i) 7456881f68fSopenharmony_ci fnames.append(fname) 7466881f68fSopenharmony_ci 7476881f68fSopenharmony_ci listdir_is = sorted(os.listdir(mnt_dir)) 7486881f68fSopenharmony_ci listdir_should = sorted(os.listdir(src_dir)) 7496881f68fSopenharmony_ci assert listdir_is == listdir_should 7506881f68fSopenharmony_ci 7516881f68fSopenharmony_ci inodes_is = readdir_inode(mnt_dir) 7526881f68fSopenharmony_ci inodes_should = readdir_inode(src_dir) 7536881f68fSopenharmony_ci assert inodes_is == inodes_should 7546881f68fSopenharmony_ci 7556881f68fSopenharmony_ci for fname in fnames: 7566881f68fSopenharmony_ci stat_src = os.stat(pjoin(src_dir, fname)) 7576881f68fSopenharmony_ci stat_mnt = os.stat(pjoin(mnt_dir, fname)) 7586881f68fSopenharmony_ci assert stat_src.st_ino == stat_mnt.st_ino 7596881f68fSopenharmony_ci assert stat_src.st_mtime == stat_mnt.st_mtime 7606881f68fSopenharmony_ci assert stat_src.st_ctime == stat_mnt.st_ctime 7616881f68fSopenharmony_ci assert stat_src.st_size == stat_mnt.st_size 7626881f68fSopenharmony_ci os.unlink(pjoin(src_dir, fname)) 7636881f68fSopenharmony_ci 7646881f68fSopenharmony_cidef tst_truncate_path(mnt_dir): 7656881f68fSopenharmony_ci assert len(TEST_DATA) > 1024 7666881f68fSopenharmony_ci 7676881f68fSopenharmony_ci filename = pjoin(mnt_dir, name_generator()) 7686881f68fSopenharmony_ci with open(filename, 'wb') as fh: 7696881f68fSopenharmony_ci fh.write(TEST_DATA) 7706881f68fSopenharmony_ci 7716881f68fSopenharmony_ci fstat = os.stat(filename) 7726881f68fSopenharmony_ci size = fstat.st_size 7736881f68fSopenharmony_ci assert size == len(TEST_DATA) 7746881f68fSopenharmony_ci 7756881f68fSopenharmony_ci # Add zeros at the end 7766881f68fSopenharmony_ci os.truncate(filename, size + 1024) 7776881f68fSopenharmony_ci assert os.stat(filename).st_size == size + 1024 7786881f68fSopenharmony_ci with open(filename, 'rb') as fh: 7796881f68fSopenharmony_ci assert fh.read(size) == TEST_DATA 7806881f68fSopenharmony_ci assert fh.read(1025) == b'\0' * 1024 7816881f68fSopenharmony_ci 7826881f68fSopenharmony_ci # Truncate data 7836881f68fSopenharmony_ci os.truncate(filename, size - 1024) 7846881f68fSopenharmony_ci assert os.stat(filename).st_size == size - 1024 7856881f68fSopenharmony_ci with open(filename, 'rb') as fh: 7866881f68fSopenharmony_ci assert fh.read(size) == TEST_DATA[:size-1024] 7876881f68fSopenharmony_ci 7886881f68fSopenharmony_ci os.unlink(filename) 7896881f68fSopenharmony_ci 7906881f68fSopenharmony_cidef tst_truncate_fd(mnt_dir): 7916881f68fSopenharmony_ci assert len(TEST_DATA) > 1024 7926881f68fSopenharmony_ci with NamedTemporaryFile('w+b', 0, dir=mnt_dir) as fh: 7936881f68fSopenharmony_ci fd = fh.fileno() 7946881f68fSopenharmony_ci fh.write(TEST_DATA) 7956881f68fSopenharmony_ci fstat = os.fstat(fd) 7966881f68fSopenharmony_ci size = fstat.st_size 7976881f68fSopenharmony_ci assert size == len(TEST_DATA) 7986881f68fSopenharmony_ci 7996881f68fSopenharmony_ci # Add zeros at the end 8006881f68fSopenharmony_ci os.ftruncate(fd, size + 1024) 8016881f68fSopenharmony_ci assert os.fstat(fd).st_size == size + 1024 8026881f68fSopenharmony_ci fh.seek(0) 8036881f68fSopenharmony_ci assert fh.read(size) == TEST_DATA 8046881f68fSopenharmony_ci assert fh.read(1025) == b'\0' * 1024 8056881f68fSopenharmony_ci 8066881f68fSopenharmony_ci # Truncate data 8076881f68fSopenharmony_ci os.ftruncate(fd, size - 1024) 8086881f68fSopenharmony_ci assert os.fstat(fd).st_size == size - 1024 8096881f68fSopenharmony_ci fh.seek(0) 8106881f68fSopenharmony_ci assert fh.read(size) == TEST_DATA[:size-1024] 8116881f68fSopenharmony_ci 8126881f68fSopenharmony_cidef tst_utimens(mnt_dir, ns_tol=0): 8136881f68fSopenharmony_ci filename = pjoin(mnt_dir, name_generator()) 8146881f68fSopenharmony_ci os.mkdir(filename) 8156881f68fSopenharmony_ci fstat = os.lstat(filename) 8166881f68fSopenharmony_ci 8176881f68fSopenharmony_ci atime = fstat.st_atime + 42.28 8186881f68fSopenharmony_ci mtime = fstat.st_mtime - 42.23 8196881f68fSopenharmony_ci if sys.version_info < (3,3): 8206881f68fSopenharmony_ci os.utime(filename, (atime, mtime)) 8216881f68fSopenharmony_ci else: 8226881f68fSopenharmony_ci atime_ns = fstat.st_atime_ns + int(42.28*1e9) 8236881f68fSopenharmony_ci mtime_ns = fstat.st_mtime_ns - int(42.23*1e9) 8246881f68fSopenharmony_ci os.utime(filename, None, ns=(atime_ns, mtime_ns)) 8256881f68fSopenharmony_ci 8266881f68fSopenharmony_ci fstat = os.lstat(filename) 8276881f68fSopenharmony_ci 8286881f68fSopenharmony_ci assert abs(fstat.st_atime - atime) < 1 8296881f68fSopenharmony_ci assert abs(fstat.st_mtime - mtime) < 1 8306881f68fSopenharmony_ci if sys.version_info >= (3,3): 8316881f68fSopenharmony_ci assert abs(fstat.st_atime_ns - atime_ns) <= ns_tol 8326881f68fSopenharmony_ci assert abs(fstat.st_mtime_ns - mtime_ns) <= ns_tol 8336881f68fSopenharmony_ci 8346881f68fSopenharmony_cidef tst_passthrough(src_dir, mnt_dir): 8356881f68fSopenharmony_ci name = name_generator() 8366881f68fSopenharmony_ci src_name = pjoin(src_dir, name) 8376881f68fSopenharmony_ci mnt_name = pjoin(src_dir, name) 8386881f68fSopenharmony_ci assert name not in os.listdir(src_dir) 8396881f68fSopenharmony_ci assert name not in os.listdir(mnt_dir) 8406881f68fSopenharmony_ci with open(src_name, 'w') as fh: 8416881f68fSopenharmony_ci fh.write('Hello, world') 8426881f68fSopenharmony_ci assert name in os.listdir(src_dir) 8436881f68fSopenharmony_ci assert name in os.listdir(mnt_dir) 8446881f68fSopenharmony_ci assert os.stat(src_name) == os.stat(mnt_name) 8456881f68fSopenharmony_ci 8466881f68fSopenharmony_ci name = name_generator() 8476881f68fSopenharmony_ci src_name = pjoin(src_dir, name) 8486881f68fSopenharmony_ci mnt_name = pjoin(src_dir, name) 8496881f68fSopenharmony_ci assert name not in os.listdir(src_dir) 8506881f68fSopenharmony_ci assert name not in os.listdir(mnt_dir) 8516881f68fSopenharmony_ci with open(mnt_name, 'w') as fh: 8526881f68fSopenharmony_ci fh.write('Hello, world') 8536881f68fSopenharmony_ci assert name in os.listdir(src_dir) 8546881f68fSopenharmony_ci assert name in os.listdir(mnt_dir) 8556881f68fSopenharmony_ci assert os.stat(src_name) == os.stat(mnt_name) 8566881f68fSopenharmony_ci 8576881f68fSopenharmony_ci 8586881f68fSopenharmony_cidef tst_xattr(mnt_dir): 8596881f68fSopenharmony_ci path = os.path.join(mnt_dir, 'hello') 8606881f68fSopenharmony_ci os.setxattr(path, b'hello_ll_setxattr_name', b'hello_ll_setxattr_value') 8616881f68fSopenharmony_ci assert os.getxattr(path, b'hello_ll_getxattr_name') == b'hello_ll_getxattr_value' 8626881f68fSopenharmony_ci os.removexattr(path, b'hello_ll_removexattr_name') 8636881f68fSopenharmony_ci 8646881f68fSopenharmony_ci 8656881f68fSopenharmony_ci# avoid warning about unused import 8666881f68fSopenharmony_ciassert test_printcap 867