16881f68fSopenharmony_ci#!/usr/bin/env python3
26881f68fSopenharmony_ci
36881f68fSopenharmony_ciif __name__ == '__main__':
46881f68fSopenharmony_ci    import pytest
56881f68fSopenharmony_ci    import sys
66881f68fSopenharmony_ci    sys.exit(pytest.main([__file__] + sys.argv[1:]))
76881f68fSopenharmony_ci
86881f68fSopenharmony_ciimport subprocess
96881f68fSopenharmony_ciimport os
106881f68fSopenharmony_ciimport sys
116881f68fSopenharmony_ciimport py
126881f68fSopenharmony_ciimport pytest
136881f68fSopenharmony_ciimport stat
146881f68fSopenharmony_ciimport shutil
156881f68fSopenharmony_ciimport filecmp
166881f68fSopenharmony_ciimport tempfile
176881f68fSopenharmony_ciimport time
186881f68fSopenharmony_ciimport errno
196881f68fSopenharmony_ciimport sys
206881f68fSopenharmony_ciimport platform
216881f68fSopenharmony_cifrom looseversion import LooseVersion
226881f68fSopenharmony_cifrom tempfile import NamedTemporaryFile
236881f68fSopenharmony_cifrom contextlib import contextmanager
246881f68fSopenharmony_cifrom util import (wait_for_mount, umount, cleanup, base_cmdline,
256881f68fSopenharmony_ci                  safe_sleep, basename, fuse_test_marker, test_printcap,
266881f68fSopenharmony_ci                  fuse_proto, powerset)
276881f68fSopenharmony_cifrom os.path import join as pjoin
286881f68fSopenharmony_ci
296881f68fSopenharmony_cipytestmark = fuse_test_marker()
306881f68fSopenharmony_ci
316881f68fSopenharmony_ciTEST_FILE = __file__
326881f68fSopenharmony_ci
336881f68fSopenharmony_ciwith open(TEST_FILE, 'rb') as fh:
346881f68fSopenharmony_ci    TEST_DATA = fh.read()
356881f68fSopenharmony_ci
366881f68fSopenharmony_cidef name_generator(__ctr=[0]):
376881f68fSopenharmony_ci    __ctr[0] += 1
386881f68fSopenharmony_ci    return 'testfile_%d' % __ctr[0]
396881f68fSopenharmony_ci
406881f68fSopenharmony_cioptions = []
416881f68fSopenharmony_ciif sys.platform == 'linux':
426881f68fSopenharmony_ci    options.append('clone_fd')
436881f68fSopenharmony_ci
446881f68fSopenharmony_cidef invoke_directly(mnt_dir, name, options):
456881f68fSopenharmony_ci    cmdline = base_cmdline + [ pjoin(basename, 'example', name),
466881f68fSopenharmony_ci                               '-f', mnt_dir, '-o', ','.join(options) ]
476881f68fSopenharmony_ci    if name == 'hello_ll':
486881f68fSopenharmony_ci        # supports single-threading only
496881f68fSopenharmony_ci        cmdline.append('-s')
506881f68fSopenharmony_ci
516881f68fSopenharmony_ci    return cmdline
526881f68fSopenharmony_ci
536881f68fSopenharmony_cidef invoke_mount_fuse(mnt_dir, name, options):
546881f68fSopenharmony_ci    return base_cmdline + [ pjoin(basename, 'util', 'mount.fuse3'),
556881f68fSopenharmony_ci                            name, mnt_dir, '-o', ','.join(options) ]
566881f68fSopenharmony_ci
576881f68fSopenharmony_cidef invoke_mount_fuse_drop_privileges(mnt_dir, name, options):
586881f68fSopenharmony_ci    if os.getuid() != 0:
596881f68fSopenharmony_ci        pytest.skip('drop_privileges requires root, skipping.')
606881f68fSopenharmony_ci
616881f68fSopenharmony_ci    return invoke_mount_fuse(mnt_dir, name, options + ('drop_privileges',))
626881f68fSopenharmony_ci
636881f68fSopenharmony_ciclass raii_tmpdir:
646881f68fSopenharmony_ci    def __init__(self):
656881f68fSopenharmony_ci        self.d = tempfile.mkdtemp()
666881f68fSopenharmony_ci
676881f68fSopenharmony_ci    def __str__(self):
686881f68fSopenharmony_ci        return str(self.d)
696881f68fSopenharmony_ci
706881f68fSopenharmony_ci    def mkdir(self, path):
716881f68fSopenharmony_ci        return py.path.local(str(self.d)).mkdir(path)
726881f68fSopenharmony_ci
736881f68fSopenharmony_ci@pytest.fixture
746881f68fSopenharmony_cidef short_tmpdir():
756881f68fSopenharmony_ci    return raii_tmpdir()
766881f68fSopenharmony_ci
776881f68fSopenharmony_cidef readdir_inode(dir):
786881f68fSopenharmony_ci    cmd = base_cmdline + [ pjoin(basename, 'test', 'readdir_inode'), dir ]
796881f68fSopenharmony_ci    with subprocess.Popen(cmd, stdout=subprocess.PIPE,
806881f68fSopenharmony_ci                          universal_newlines=True) as proc:
816881f68fSopenharmony_ci        lines = proc.communicate()[0].splitlines()
826881f68fSopenharmony_ci    lines.sort()
836881f68fSopenharmony_ci    return lines
846881f68fSopenharmony_ci
856881f68fSopenharmony_ci
866881f68fSopenharmony_ci@pytest.mark.parametrize("cmdline_builder", (invoke_directly, invoke_mount_fuse,
876881f68fSopenharmony_ci                                             invoke_mount_fuse_drop_privileges))
886881f68fSopenharmony_ci@pytest.mark.parametrize("options", powerset(options))
896881f68fSopenharmony_ci@pytest.mark.parametrize("name", ('hello', 'hello_ll'))
906881f68fSopenharmony_cidef test_hello(tmpdir, name, options, cmdline_builder, output_checker):
916881f68fSopenharmony_ci    mnt_dir = str(tmpdir)
926881f68fSopenharmony_ci    mount_process = subprocess.Popen(
936881f68fSopenharmony_ci        cmdline_builder(mnt_dir, name, options),
946881f68fSopenharmony_ci        stdout=output_checker.fd, stderr=output_checker.fd)
956881f68fSopenharmony_ci    try:
966881f68fSopenharmony_ci        wait_for_mount(mount_process, mnt_dir)
976881f68fSopenharmony_ci        assert os.listdir(mnt_dir) == [ 'hello' ]
986881f68fSopenharmony_ci        filename = pjoin(mnt_dir, 'hello')
996881f68fSopenharmony_ci        with open(filename, 'r') as fh:
1006881f68fSopenharmony_ci            assert fh.read() == 'Hello World!\n'
1016881f68fSopenharmony_ci        with pytest.raises(IOError) as exc_info:
1026881f68fSopenharmony_ci            open(filename, 'r+')
1036881f68fSopenharmony_ci        assert exc_info.value.errno == errno.EACCES
1046881f68fSopenharmony_ci        with pytest.raises(IOError) as exc_info:
1056881f68fSopenharmony_ci            open(filename + 'does-not-exist', 'r+')
1066881f68fSopenharmony_ci        assert exc_info.value.errno == errno.ENOENT
1076881f68fSopenharmony_ci        if name == 'hello_ll':
1086881f68fSopenharmony_ci            tst_xattr(mnt_dir)
1096881f68fSopenharmony_ci    except:
1106881f68fSopenharmony_ci        cleanup(mount_process, mnt_dir)
1116881f68fSopenharmony_ci        raise
1126881f68fSopenharmony_ci    else:
1136881f68fSopenharmony_ci        umount(mount_process, mnt_dir)
1146881f68fSopenharmony_ci
1156881f68fSopenharmony_ci@pytest.mark.parametrize("writeback", (False, True))
1166881f68fSopenharmony_ci@pytest.mark.parametrize("name", ('passthrough', 'passthrough_plus',
1176881f68fSopenharmony_ci                           'passthrough_fh', 'passthrough_ll'))
1186881f68fSopenharmony_ci@pytest.mark.parametrize("debug", (False, True))
1196881f68fSopenharmony_cidef test_passthrough(short_tmpdir, name, debug, output_checker, writeback):
1206881f68fSopenharmony_ci    # Avoid false positives from libfuse debug messages
1216881f68fSopenharmony_ci    if debug:
1226881f68fSopenharmony_ci        output_checker.register_output(r'^   unique: [0-9]+, error: -[0-9]+ .+$',
1236881f68fSopenharmony_ci                                       count=0)
1246881f68fSopenharmony_ci
1256881f68fSopenharmony_ci    # test_syscalls prints "No error" under FreeBSD
1266881f68fSopenharmony_ci    output_checker.register_output(r"^ \d\d \[[^\]]+ message: 'No error: 0'\]",
1276881f68fSopenharmony_ci                                   count=0)
1286881f68fSopenharmony_ci
1296881f68fSopenharmony_ci    mnt_dir = str(short_tmpdir.mkdir('mnt'))
1306881f68fSopenharmony_ci    src_dir = str(short_tmpdir.mkdir('src'))
1316881f68fSopenharmony_ci
1326881f68fSopenharmony_ci    if name == 'passthrough_plus':
1336881f68fSopenharmony_ci        cmdline = base_cmdline + \
1346881f68fSopenharmony_ci                  [ pjoin(basename, 'example', 'passthrough'),
1356881f68fSopenharmony_ci                    '--plus', '-f', mnt_dir ]
1366881f68fSopenharmony_ci    else:
1376881f68fSopenharmony_ci        cmdline = base_cmdline + \
1386881f68fSopenharmony_ci                  [ pjoin(basename, 'example', name),
1396881f68fSopenharmony_ci                    '-f', mnt_dir ]
1406881f68fSopenharmony_ci    if debug:
1416881f68fSopenharmony_ci        cmdline.append('-d')
1426881f68fSopenharmony_ci
1436881f68fSopenharmony_ci    if writeback:
1446881f68fSopenharmony_ci        if name != 'passthrough_ll':
1456881f68fSopenharmony_ci            pytest.skip('example does not support writeback caching')
1466881f68fSopenharmony_ci        cmdline.append('-o')
1476881f68fSopenharmony_ci        cmdline.append('writeback')
1486881f68fSopenharmony_ci
1496881f68fSopenharmony_ci    mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
1506881f68fSopenharmony_ci                                     stderr=output_checker.fd)
1516881f68fSopenharmony_ci    try:
1526881f68fSopenharmony_ci        wait_for_mount(mount_process, mnt_dir)
1536881f68fSopenharmony_ci        work_dir = mnt_dir + src_dir
1546881f68fSopenharmony_ci
1556881f68fSopenharmony_ci        tst_statvfs(work_dir)
1566881f68fSopenharmony_ci        tst_readdir(src_dir, work_dir)
1576881f68fSopenharmony_ci        tst_readdir_big(src_dir, work_dir)
1586881f68fSopenharmony_ci        tst_open_read(src_dir, work_dir)
1596881f68fSopenharmony_ci        tst_open_write(src_dir, work_dir)
1606881f68fSopenharmony_ci        tst_create(work_dir)
1616881f68fSopenharmony_ci        tst_passthrough(src_dir, work_dir)
1626881f68fSopenharmony_ci        tst_append(src_dir, work_dir)
1636881f68fSopenharmony_ci        tst_seek(src_dir, work_dir)
1646881f68fSopenharmony_ci        tst_mkdir(work_dir)
1656881f68fSopenharmony_ci        tst_rmdir(work_dir, src_dir)
1666881f68fSopenharmony_ci        tst_unlink(work_dir, src_dir)
1676881f68fSopenharmony_ci        tst_symlink(work_dir)
1686881f68fSopenharmony_ci        if os.getuid() == 0:
1696881f68fSopenharmony_ci            tst_chown(work_dir)
1706881f68fSopenharmony_ci
1716881f68fSopenharmony_ci        # Underlying fs may not have full nanosecond resolution
1726881f68fSopenharmony_ci        tst_utimens(work_dir, ns_tol=1000)
1736881f68fSopenharmony_ci
1746881f68fSopenharmony_ci        tst_link(work_dir)
1756881f68fSopenharmony_ci        tst_truncate_path(work_dir)
1766881f68fSopenharmony_ci        tst_truncate_fd(work_dir)
1776881f68fSopenharmony_ci        tst_open_unlink(work_dir)
1786881f68fSopenharmony_ci
1796881f68fSopenharmony_ci        syscall_test_cmd = [ os.path.join(basename, 'test', 'test_syscalls'),
1806881f68fSopenharmony_ci                             work_dir, ':' + src_dir ]
1816881f68fSopenharmony_ci        if writeback:
1826881f68fSopenharmony_ci            # When writeback caching is enabled, kernel has to open files for
1836881f68fSopenharmony_ci            # reading even when userspace opens with O_WDONLY. This fails if the
1846881f68fSopenharmony_ci            # filesystem process doesn't have special permission.
1856881f68fSopenharmony_ci            syscall_test_cmd.append('-53')
1866881f68fSopenharmony_ci        subprocess.check_call(syscall_test_cmd)
1876881f68fSopenharmony_ci    except:
1886881f68fSopenharmony_ci        cleanup(mount_process, mnt_dir)
1896881f68fSopenharmony_ci        raise
1906881f68fSopenharmony_ci    else:
1916881f68fSopenharmony_ci        umount(mount_process, mnt_dir)
1926881f68fSopenharmony_ci
1936881f68fSopenharmony_ci@pytest.mark.parametrize("cache", (False, True))
1946881f68fSopenharmony_cidef test_passthrough_hp(short_tmpdir, cache, output_checker):
1956881f68fSopenharmony_ci    mnt_dir = str(short_tmpdir.mkdir('mnt'))
1966881f68fSopenharmony_ci    src_dir = str(short_tmpdir.mkdir('src'))
1976881f68fSopenharmony_ci
1986881f68fSopenharmony_ci    cmdline = base_cmdline + \
1996881f68fSopenharmony_ci              [ pjoin(basename, 'example', 'passthrough_hp'),
2006881f68fSopenharmony_ci                src_dir, mnt_dir ]
2016881f68fSopenharmony_ci
2026881f68fSopenharmony_ci    cmdline.append('--foreground')
2036881f68fSopenharmony_ci
2046881f68fSopenharmony_ci    if not cache:
2056881f68fSopenharmony_ci        cmdline.append('--nocache')
2066881f68fSopenharmony_ci
2076881f68fSopenharmony_ci    mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
2086881f68fSopenharmony_ci                                     stderr=output_checker.fd)
2096881f68fSopenharmony_ci    try:
2106881f68fSopenharmony_ci        wait_for_mount(mount_process, mnt_dir)
2116881f68fSopenharmony_ci
2126881f68fSopenharmony_ci        tst_statvfs(mnt_dir)
2136881f68fSopenharmony_ci        tst_readdir(src_dir, mnt_dir)
2146881f68fSopenharmony_ci        tst_readdir_big(src_dir, mnt_dir)
2156881f68fSopenharmony_ci        tst_open_read(src_dir, mnt_dir)
2166881f68fSopenharmony_ci        tst_open_write(src_dir, mnt_dir)
2176881f68fSopenharmony_ci        tst_create(mnt_dir)
2186881f68fSopenharmony_ci        if not cache:
2196881f68fSopenharmony_ci            tst_passthrough(src_dir, mnt_dir)
2206881f68fSopenharmony_ci        tst_append(src_dir, mnt_dir)
2216881f68fSopenharmony_ci        tst_seek(src_dir, mnt_dir)
2226881f68fSopenharmony_ci        tst_mkdir(mnt_dir)
2236881f68fSopenharmony_ci        if cache:
2246881f68fSopenharmony_ci            # if cache is enabled, no operations should go through
2256881f68fSopenharmony_ci            # src_dir as the cache will become stale.
2266881f68fSopenharmony_ci            tst_rmdir(mnt_dir)
2276881f68fSopenharmony_ci            tst_unlink(mnt_dir)
2286881f68fSopenharmony_ci        else:
2296881f68fSopenharmony_ci            tst_rmdir(mnt_dir, src_dir)
2306881f68fSopenharmony_ci            tst_unlink(mnt_dir, src_dir)
2316881f68fSopenharmony_ci        tst_symlink(mnt_dir)
2326881f68fSopenharmony_ci        if os.getuid() == 0:
2336881f68fSopenharmony_ci            tst_chown(mnt_dir)
2346881f68fSopenharmony_ci
2356881f68fSopenharmony_ci        # Underlying fs may not have full nanosecond resolution
2366881f68fSopenharmony_ci        tst_utimens(mnt_dir, ns_tol=1000)
2376881f68fSopenharmony_ci
2386881f68fSopenharmony_ci        tst_link(mnt_dir)
2396881f68fSopenharmony_ci        tst_truncate_path(mnt_dir)
2406881f68fSopenharmony_ci        tst_truncate_fd(mnt_dir)
2416881f68fSopenharmony_ci        tst_open_unlink(mnt_dir)
2426881f68fSopenharmony_ci
2436881f68fSopenharmony_ci        # test_syscalls assumes that changes in source directory
2446881f68fSopenharmony_ci        # will be reflected immediately in mountpoint, so we
2456881f68fSopenharmony_ci        # can't use it.
2466881f68fSopenharmony_ci        if not cache:
2476881f68fSopenharmony_ci            syscall_test_cmd = [ os.path.join(basename, 'test', 'test_syscalls'),
2486881f68fSopenharmony_ci                             mnt_dir, ':' + src_dir ]
2496881f68fSopenharmony_ci            # unlinked testfiles check fails without kernel fix
2506881f68fSopenharmony_ci            # "fuse: fix illegal access to inode with reused nodeid"
2516881f68fSopenharmony_ci            # so opt-in for this test from kernel 5.14
2526881f68fSopenharmony_ci            if LooseVersion(platform.release()) >= '5.14':
2536881f68fSopenharmony_ci                syscall_test_cmd.append('-u')
2546881f68fSopenharmony_ci            subprocess.check_call(syscall_test_cmd)
2556881f68fSopenharmony_ci    except:
2566881f68fSopenharmony_ci        cleanup(mount_process, mnt_dir)
2576881f68fSopenharmony_ci        raise
2586881f68fSopenharmony_ci    else:
2596881f68fSopenharmony_ci        umount(mount_process, mnt_dir)
2606881f68fSopenharmony_ci
2616881f68fSopenharmony_ci
2626881f68fSopenharmony_ci@pytest.mark.skipif(fuse_proto < (7,11),
2636881f68fSopenharmony_ci                    reason='not supported by running kernel')
2646881f68fSopenharmony_cidef test_ioctl(tmpdir, output_checker):
2656881f68fSopenharmony_ci    progname = pjoin(basename, 'example', 'ioctl')
2666881f68fSopenharmony_ci    if not os.path.exists(progname):
2676881f68fSopenharmony_ci        pytest.skip('%s not built' % os.path.basename(progname))
2686881f68fSopenharmony_ci
2696881f68fSopenharmony_ci    mnt_dir = str(tmpdir)
2706881f68fSopenharmony_ci    testfile = pjoin(mnt_dir, 'fioc')
2716881f68fSopenharmony_ci    cmdline = base_cmdline + [progname, '-f', mnt_dir ]
2726881f68fSopenharmony_ci    mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
2736881f68fSopenharmony_ci                                     stderr=output_checker.fd)
2746881f68fSopenharmony_ci    try:
2756881f68fSopenharmony_ci        wait_for_mount(mount_process, mnt_dir)
2766881f68fSopenharmony_ci
2776881f68fSopenharmony_ci        cmdline = base_cmdline + \
2786881f68fSopenharmony_ci                  [ pjoin(basename, 'example', 'ioctl_client'),
2796881f68fSopenharmony_ci                    testfile ]
2806881f68fSopenharmony_ci        assert subprocess.check_output(cmdline) == b'0\n'
2816881f68fSopenharmony_ci        with open(testfile, 'wb') as fh:
2826881f68fSopenharmony_ci            fh.write(b'foobar')
2836881f68fSopenharmony_ci        assert subprocess.check_output(cmdline) == b'6\n'
2846881f68fSopenharmony_ci        subprocess.check_call(cmdline + [ '3' ])
2856881f68fSopenharmony_ci        with open(testfile, 'rb') as fh:
2866881f68fSopenharmony_ci            assert fh.read()== b'foo'
2876881f68fSopenharmony_ci    except:
2886881f68fSopenharmony_ci        cleanup(mount_process, mnt_dir)
2896881f68fSopenharmony_ci        raise
2906881f68fSopenharmony_ci    else:
2916881f68fSopenharmony_ci        umount(mount_process, mnt_dir)
2926881f68fSopenharmony_ci
2936881f68fSopenharmony_cidef test_poll(tmpdir, output_checker):
2946881f68fSopenharmony_ci    mnt_dir = str(tmpdir)
2956881f68fSopenharmony_ci    cmdline = base_cmdline + [pjoin(basename, 'example', 'poll'),
2966881f68fSopenharmony_ci               '-f', mnt_dir ]
2976881f68fSopenharmony_ci    mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
2986881f68fSopenharmony_ci                                     stderr=output_checker.fd)
2996881f68fSopenharmony_ci    try:
3006881f68fSopenharmony_ci        wait_for_mount(mount_process, mnt_dir)
3016881f68fSopenharmony_ci        cmdline = base_cmdline + \
3026881f68fSopenharmony_ci                  [ pjoin(basename, 'example', 'poll_client') ]
3036881f68fSopenharmony_ci        subprocess.check_call(cmdline, cwd=mnt_dir)
3046881f68fSopenharmony_ci    except:
3056881f68fSopenharmony_ci        cleanup(mount_process, mnt_dir)
3066881f68fSopenharmony_ci        raise
3076881f68fSopenharmony_ci    else:
3086881f68fSopenharmony_ci        umount(mount_process, mnt_dir)
3096881f68fSopenharmony_ci
3106881f68fSopenharmony_cidef test_null(tmpdir, output_checker):
3116881f68fSopenharmony_ci    progname = pjoin(basename, 'example', 'null')
3126881f68fSopenharmony_ci    if not os.path.exists(progname):
3136881f68fSopenharmony_ci        pytest.skip('%s not built' % os.path.basename(progname))
3146881f68fSopenharmony_ci
3156881f68fSopenharmony_ci    mnt_file = str(tmpdir) + '/file'
3166881f68fSopenharmony_ci    with open(mnt_file, 'w') as fh:
3176881f68fSopenharmony_ci        fh.write('dummy')
3186881f68fSopenharmony_ci    cmdline = base_cmdline + [ progname, '-f', mnt_file ]
3196881f68fSopenharmony_ci    mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
3206881f68fSopenharmony_ci                                     stderr=output_checker.fd)
3216881f68fSopenharmony_ci    def test_fn(name):
3226881f68fSopenharmony_ci        return os.stat(name).st_size > 4000
3236881f68fSopenharmony_ci    try:
3246881f68fSopenharmony_ci        wait_for_mount(mount_process, mnt_file, test_fn)
3256881f68fSopenharmony_ci        with open(mnt_file, 'rb') as fh:
3266881f68fSopenharmony_ci            assert fh.read(382) == b'\0' * 382
3276881f68fSopenharmony_ci        with open(mnt_file, 'wb') as fh:
3286881f68fSopenharmony_ci            fh.write(b'whatever')
3296881f68fSopenharmony_ci    except:
3306881f68fSopenharmony_ci        cleanup(mount_process, mnt_file)
3316881f68fSopenharmony_ci        raise
3326881f68fSopenharmony_ci    else:
3336881f68fSopenharmony_ci        umount(mount_process, mnt_file)
3346881f68fSopenharmony_ci
3356881f68fSopenharmony_ci
3366881f68fSopenharmony_ci@pytest.mark.skipif(fuse_proto < (7,12),
3376881f68fSopenharmony_ci                    reason='not supported by running kernel')
3386881f68fSopenharmony_ci@pytest.mark.parametrize("only_expire", ("invalidate_entries", "expire_entries"))
3396881f68fSopenharmony_ci@pytest.mark.parametrize("notify", (True, False))
3406881f68fSopenharmony_cidef test_notify_inval_entry(tmpdir, only_expire, notify, output_checker):
3416881f68fSopenharmony_ci    mnt_dir = str(tmpdir)
3426881f68fSopenharmony_ci    cmdline = base_cmdline + \
3436881f68fSopenharmony_ci              [ pjoin(basename, 'example', 'notify_inval_entry'),
3446881f68fSopenharmony_ci                '-f', '--update-interval=1',
3456881f68fSopenharmony_ci                '--timeout=5', mnt_dir ]
3466881f68fSopenharmony_ci    if not notify:
3476881f68fSopenharmony_ci        cmdline.append('--no-notify')
3486881f68fSopenharmony_ci    if only_expire == "expire_entries":
3496881f68fSopenharmony_ci        cmdline.append('--only-expire')
3506881f68fSopenharmony_ci        if fuse_proto < (7,38):
3516881f68fSopenharmony_ci            pytest.skip('only-expire not supported by running kernel')
3526881f68fSopenharmony_ci    mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
3536881f68fSopenharmony_ci                                     stderr=output_checker.fd)
3546881f68fSopenharmony_ci    try:
3556881f68fSopenharmony_ci        wait_for_mount(mount_process, mnt_dir)
3566881f68fSopenharmony_ci        fname = pjoin(mnt_dir, os.listdir(mnt_dir)[0])
3576881f68fSopenharmony_ci        try:
3586881f68fSopenharmony_ci            os.stat(fname)
3596881f68fSopenharmony_ci        except FileNotFoundError:
3606881f68fSopenharmony_ci            # We may have hit a race condition and issued
3616881f68fSopenharmony_ci            # readdir just before the name changed
3626881f68fSopenharmony_ci            fname = pjoin(mnt_dir, os.listdir(mnt_dir)[0])
3636881f68fSopenharmony_ci            os.stat(fname)
3646881f68fSopenharmony_ci
3656881f68fSopenharmony_ci        safe_sleep(2)
3666881f68fSopenharmony_ci        if not notify:
3676881f68fSopenharmony_ci            os.stat(fname)
3686881f68fSopenharmony_ci            safe_sleep(5)
3696881f68fSopenharmony_ci        with pytest.raises(FileNotFoundError):
3706881f68fSopenharmony_ci            os.stat(fname)
3716881f68fSopenharmony_ci    except:
3726881f68fSopenharmony_ci        cleanup(mount_process, mnt_dir)
3736881f68fSopenharmony_ci        raise
3746881f68fSopenharmony_ci    else:
3756881f68fSopenharmony_ci        umount(mount_process, mnt_dir)
3766881f68fSopenharmony_ci
3776881f68fSopenharmony_ci@pytest.mark.parametrize("intended_user", ('root', 'non_root'))
3786881f68fSopenharmony_cidef test_dev_auto_unmount(short_tmpdir, output_checker, intended_user):
3796881f68fSopenharmony_ci    """Check that root can mount with dev and auto_unmount
3806881f68fSopenharmony_ci    (but non-root cannot).
3816881f68fSopenharmony_ci    Split into root vs non-root, so that the output of pytest
3826881f68fSopenharmony_ci    makes clear what functionality is being tested."""
3836881f68fSopenharmony_ci    if os.getuid() == 0 and intended_user == 'non_root':
3846881f68fSopenharmony_ci        pytest.skip('needs to run as non-root')
3856881f68fSopenharmony_ci    if os.getuid() != 0 and intended_user == 'root':
3866881f68fSopenharmony_ci        pytest.skip('needs to run as root')
3876881f68fSopenharmony_ci    mnt_dir = str(short_tmpdir.mkdir('mnt'))
3886881f68fSopenharmony_ci    src_dir = str('/dev')
3896881f68fSopenharmony_ci    cmdline = base_cmdline + \
3906881f68fSopenharmony_ci                [ pjoin(basename, 'example', 'passthrough_ll'),
3916881f68fSopenharmony_ci                '-o', f'source={src_dir},dev,auto_unmount',
3926881f68fSopenharmony_ci                '-f', mnt_dir ]
3936881f68fSopenharmony_ci    mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
3946881f68fSopenharmony_ci                                     stderr=output_checker.fd)
3956881f68fSopenharmony_ci    try:
3966881f68fSopenharmony_ci        wait_for_mount(mount_process, mnt_dir)
3976881f68fSopenharmony_ci        if os.getuid() == 0:
3986881f68fSopenharmony_ci            open(pjoin(mnt_dir, 'null')).close()
3996881f68fSopenharmony_ci        else:
4006881f68fSopenharmony_ci            with pytest.raises(PermissionError):
4016881f68fSopenharmony_ci                open(pjoin(mnt_dir, 'null')).close()
4026881f68fSopenharmony_ci    except:
4036881f68fSopenharmony_ci        cleanup(mount_process, mnt_dir)
4046881f68fSopenharmony_ci        raise
4056881f68fSopenharmony_ci    else:
4066881f68fSopenharmony_ci        umount(mount_process, mnt_dir)
4076881f68fSopenharmony_ci
4086881f68fSopenharmony_ci@pytest.mark.skipif(os.getuid() != 0,
4096881f68fSopenharmony_ci                    reason='needs to run as root')
4106881f68fSopenharmony_cidef test_cuse(output_checker):
4116881f68fSopenharmony_ci
4126881f68fSopenharmony_ci    # Valgrind warns about unknown ioctls, that's ok
4136881f68fSopenharmony_ci    output_checker.register_output(r'^==([0-9]+).+unhandled ioctl.+\n'
4146881f68fSopenharmony_ci                                   r'==\1== \s{3}.+\n'
4156881f68fSopenharmony_ci                                   r'==\1== \s{3}.+$', count=0)
4166881f68fSopenharmony_ci
4176881f68fSopenharmony_ci    devname = 'cuse-test-%d' % os.getpid()
4186881f68fSopenharmony_ci    devpath = '/dev/%s' % devname
4196881f68fSopenharmony_ci    cmdline = base_cmdline + \
4206881f68fSopenharmony_ci              [ pjoin(basename, 'example', 'cuse'),
4216881f68fSopenharmony_ci                '-f', '--name=%s' % devname ]
4226881f68fSopenharmony_ci    mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
4236881f68fSopenharmony_ci                                     stderr=output_checker.fd)
4246881f68fSopenharmony_ci
4256881f68fSopenharmony_ci    cmdline = base_cmdline + \
4266881f68fSopenharmony_ci              [ pjoin(basename, 'example', 'cuse_client'),
4276881f68fSopenharmony_ci                devpath ]
4286881f68fSopenharmony_ci    try:
4296881f68fSopenharmony_ci        wait_for_mount(mount_process, devpath,
4306881f68fSopenharmony_ci                       test_fn=os.path.exists)
4316881f68fSopenharmony_ci        assert subprocess.check_output(cmdline + ['s']) == b'0\n'
4326881f68fSopenharmony_ci        data = b'some test data'
4336881f68fSopenharmony_ci        off = 5
4346881f68fSopenharmony_ci        proc = subprocess.Popen(cmdline + [ 'w', str(len(data)), str(off) ],
4356881f68fSopenharmony_ci                                stdin=subprocess.PIPE)
4366881f68fSopenharmony_ci        proc.stdin.write(data)
4376881f68fSopenharmony_ci        proc.stdin.close()
4386881f68fSopenharmony_ci        assert proc.wait(timeout=10) == 0
4396881f68fSopenharmony_ci        size = str(off + len(data)).encode() + b'\n'
4406881f68fSopenharmony_ci        assert subprocess.check_output(cmdline + ['s']) == size
4416881f68fSopenharmony_ci        out = subprocess.check_output(
4426881f68fSopenharmony_ci            cmdline + [ 'r', str(off + len(data) + 2), '0' ])
4436881f68fSopenharmony_ci        assert out == (b'\0' * off) + data
4446881f68fSopenharmony_ci    finally:
4456881f68fSopenharmony_ci        mount_process.terminate()
4466881f68fSopenharmony_ci
4476881f68fSopenharmony_cidef test_release_unlink_race(tmpdir, output_checker):
4486881f68fSopenharmony_ci    """test case for Issue #746
4496881f68fSopenharmony_ci
4506881f68fSopenharmony_ci    If RELEASE and UNLINK opcodes are sent back to back, and fuse_fs_release()
4516881f68fSopenharmony_ci    and fuse_fs_rename() are slow to execute, UNLINK will run while RELEASE is
4526881f68fSopenharmony_ci    still executing. UNLINK will try to rename the file and, while the rename
4536881f68fSopenharmony_ci    is happening, the RELEASE will finish executing. As a result, RELEASE will
4546881f68fSopenharmony_ci    not detect in time that UNLINK has happened, and UNLINK will not detect in
4556881f68fSopenharmony_ci    time that RELEASE has happened.
4566881f68fSopenharmony_ci
4576881f68fSopenharmony_ci
4586881f68fSopenharmony_ci    NOTE: This is triggered only when nullpath_ok is set.
4596881f68fSopenharmony_ci
4606881f68fSopenharmony_ci    If it is NOT SET then get_path_nullok() called by fuse_lib_release() will
4616881f68fSopenharmony_ci    call get_path_common() and lock the path, and then the fuse_lib_unlink()
4626881f68fSopenharmony_ci    will wait for the path to be unlocked before executing and thus synchronise
4636881f68fSopenharmony_ci    with fuse_lib_release().
4646881f68fSopenharmony_ci
4656881f68fSopenharmony_ci    If it is SET then get_path_nullok() will just set the path to null and
4666881f68fSopenharmony_ci    return without locking anything and thus allowing fuse_lib_unlink() to
4676881f68fSopenharmony_ci    eventually execute unimpeded while fuse_lib_release() is still running.
4686881f68fSopenharmony_ci    """
4696881f68fSopenharmony_ci
4706881f68fSopenharmony_ci    fuse_mountpoint = str(tmpdir)
4716881f68fSopenharmony_ci
4726881f68fSopenharmony_ci    fuse_binary_command = base_cmdline + \
4736881f68fSopenharmony_ci        [ pjoin(basename, 'test', 'release_unlink_race'),
4746881f68fSopenharmony_ci        "-f", fuse_mountpoint]
4756881f68fSopenharmony_ci
4766881f68fSopenharmony_ci    fuse_process = subprocess.Popen(fuse_binary_command,
4776881f68fSopenharmony_ci                                   stdout=output_checker.fd,
4786881f68fSopenharmony_ci                                   stderr=output_checker.fd)
4796881f68fSopenharmony_ci
4806881f68fSopenharmony_ci    try:
4816881f68fSopenharmony_ci        wait_for_mount(fuse_process, fuse_mountpoint)
4826881f68fSopenharmony_ci
4836881f68fSopenharmony_ci        temp_dir = tempfile.TemporaryDirectory(dir="/tmp/")
4846881f68fSopenharmony_ci        temp_dir_path = temp_dir.name
4856881f68fSopenharmony_ci
4866881f68fSopenharmony_ci        fuse_temp_file, fuse_temp_file_path = tempfile.mkstemp(dir=(fuse_mountpoint + temp_dir_path))
4876881f68fSopenharmony_ci
4886881f68fSopenharmony_ci        os.close(fuse_temp_file)
4896881f68fSopenharmony_ci        os.unlink(fuse_temp_file_path)
4906881f68fSopenharmony_ci
4916881f68fSopenharmony_ci        # needed for slow CI/CD pipelines for unlink OP to complete processing
4926881f68fSopenharmony_ci        safe_sleep(3)
4936881f68fSopenharmony_ci
4946881f68fSopenharmony_ci        assert os.listdir(temp_dir_path) == []
4956881f68fSopenharmony_ci
4966881f68fSopenharmony_ci    except:
4976881f68fSopenharmony_ci        temp_dir.cleanup()
4986881f68fSopenharmony_ci        cleanup(fuse_process, fuse_mountpoint)
4996881f68fSopenharmony_ci        raise
5006881f68fSopenharmony_ci
5016881f68fSopenharmony_ci    else:
5026881f68fSopenharmony_ci        temp_dir.cleanup()
5036881f68fSopenharmony_ci        umount(fuse_process, fuse_mountpoint)
5046881f68fSopenharmony_ci
5056881f68fSopenharmony_ci
5066881f68fSopenharmony_ci@contextmanager
5076881f68fSopenharmony_cidef os_open(name, flags):
5086881f68fSopenharmony_ci    fd = os.open(name, flags)
5096881f68fSopenharmony_ci    try:
5106881f68fSopenharmony_ci        yield fd
5116881f68fSopenharmony_ci    finally:
5126881f68fSopenharmony_ci        os.close(fd)
5136881f68fSopenharmony_ci
5146881f68fSopenharmony_cidef os_create(name):
5156881f68fSopenharmony_ci    os.close(os.open(name, os.O_CREAT | os.O_RDWR))
5166881f68fSopenharmony_ci
5176881f68fSopenharmony_cidef tst_unlink(mnt_dir, src_dir=None):
5186881f68fSopenharmony_ci    name = name_generator()
5196881f68fSopenharmony_ci    fullname = mnt_dir + "/" + name
5206881f68fSopenharmony_ci    srcname = fullname
5216881f68fSopenharmony_ci    if src_dir is not None:
5226881f68fSopenharmony_ci        srcname = pjoin(src_dir, name)
5236881f68fSopenharmony_ci    with open(srcname, 'wb') as fh:
5246881f68fSopenharmony_ci        fh.write(b'hello')
5256881f68fSopenharmony_ci    assert name in os.listdir(mnt_dir)
5266881f68fSopenharmony_ci    os.unlink(fullname)
5276881f68fSopenharmony_ci    with pytest.raises(OSError) as exc_info:
5286881f68fSopenharmony_ci        os.stat(fullname)
5296881f68fSopenharmony_ci    assert exc_info.value.errno == errno.ENOENT
5306881f68fSopenharmony_ci    assert name not in os.listdir(mnt_dir)
5316881f68fSopenharmony_ci
5326881f68fSopenharmony_cidef tst_mkdir(mnt_dir):
5336881f68fSopenharmony_ci    dirname = name_generator()
5346881f68fSopenharmony_ci    fullname = mnt_dir + "/" + dirname
5356881f68fSopenharmony_ci    os.mkdir(fullname)
5366881f68fSopenharmony_ci    fstat = os.stat(fullname)
5376881f68fSopenharmony_ci    assert stat.S_ISDIR(fstat.st_mode)
5386881f68fSopenharmony_ci    assert os.listdir(fullname) ==  []
5396881f68fSopenharmony_ci    # Some filesystem (e.g. BTRFS) don't track st_nlink for directories
5406881f68fSopenharmony_ci    assert fstat.st_nlink in (1,2)
5416881f68fSopenharmony_ci    assert dirname in os.listdir(mnt_dir)
5426881f68fSopenharmony_ci
5436881f68fSopenharmony_cidef tst_rmdir(mnt_dir, src_dir=None):
5446881f68fSopenharmony_ci    name = name_generator()
5456881f68fSopenharmony_ci    fullname = mnt_dir + "/" + name
5466881f68fSopenharmony_ci    srcname = fullname
5476881f68fSopenharmony_ci    if src_dir is not None:
5486881f68fSopenharmony_ci        srcname = pjoin(src_dir, name)
5496881f68fSopenharmony_ci    os.mkdir(srcname)
5506881f68fSopenharmony_ci    assert name in os.listdir(mnt_dir)
5516881f68fSopenharmony_ci    os.rmdir(fullname)
5526881f68fSopenharmony_ci    with pytest.raises(OSError) as exc_info:
5536881f68fSopenharmony_ci        os.stat(fullname)
5546881f68fSopenharmony_ci    assert exc_info.value.errno == errno.ENOENT
5556881f68fSopenharmony_ci    assert name not in os.listdir(mnt_dir)
5566881f68fSopenharmony_ci
5576881f68fSopenharmony_cidef tst_symlink(mnt_dir):
5586881f68fSopenharmony_ci    linkname = name_generator()
5596881f68fSopenharmony_ci    fullname = mnt_dir + "/" + linkname
5606881f68fSopenharmony_ci    os.symlink("/imaginary/dest", fullname)
5616881f68fSopenharmony_ci    fstat = os.lstat(fullname)
5626881f68fSopenharmony_ci    assert stat.S_ISLNK(fstat.st_mode)
5636881f68fSopenharmony_ci    assert os.readlink(fullname) == "/imaginary/dest"
5646881f68fSopenharmony_ci    assert fstat.st_nlink == 1
5656881f68fSopenharmony_ci    assert linkname in os.listdir(mnt_dir)
5666881f68fSopenharmony_ci
5676881f68fSopenharmony_cidef tst_create(mnt_dir):
5686881f68fSopenharmony_ci    name = name_generator()
5696881f68fSopenharmony_ci    fullname = pjoin(mnt_dir, name)
5706881f68fSopenharmony_ci    with pytest.raises(OSError) as exc_info:
5716881f68fSopenharmony_ci        os.stat(fullname)
5726881f68fSopenharmony_ci    assert exc_info.value.errno == errno.ENOENT
5736881f68fSopenharmony_ci    assert name not in os.listdir(mnt_dir)
5746881f68fSopenharmony_ci
5756881f68fSopenharmony_ci    fd = os.open(fullname, os.O_CREAT | os.O_RDWR)
5766881f68fSopenharmony_ci    os.close(fd)
5776881f68fSopenharmony_ci
5786881f68fSopenharmony_ci    assert name in os.listdir(mnt_dir)
5796881f68fSopenharmony_ci    fstat = os.lstat(fullname)
5806881f68fSopenharmony_ci    assert stat.S_ISREG(fstat.st_mode)
5816881f68fSopenharmony_ci    assert fstat.st_nlink == 1
5826881f68fSopenharmony_ci    assert fstat.st_size == 0
5836881f68fSopenharmony_ci
5846881f68fSopenharmony_cidef tst_chown(mnt_dir):
5856881f68fSopenharmony_ci    filename = pjoin(mnt_dir, name_generator())
5866881f68fSopenharmony_ci    os.mkdir(filename)
5876881f68fSopenharmony_ci    fstat = os.lstat(filename)
5886881f68fSopenharmony_ci    uid = fstat.st_uid
5896881f68fSopenharmony_ci    gid = fstat.st_gid
5906881f68fSopenharmony_ci
5916881f68fSopenharmony_ci    uid_new = uid + 1
5926881f68fSopenharmony_ci    os.chown(filename, uid_new, -1)
5936881f68fSopenharmony_ci    fstat = os.lstat(filename)
5946881f68fSopenharmony_ci    assert fstat.st_uid == uid_new
5956881f68fSopenharmony_ci    assert fstat.st_gid == gid
5966881f68fSopenharmony_ci
5976881f68fSopenharmony_ci    gid_new = gid + 1
5986881f68fSopenharmony_ci    os.chown(filename, -1, gid_new)
5996881f68fSopenharmony_ci    fstat = os.lstat(filename)
6006881f68fSopenharmony_ci    assert fstat.st_uid == uid_new
6016881f68fSopenharmony_ci    assert fstat.st_gid == gid_new
6026881f68fSopenharmony_ci
6036881f68fSopenharmony_cidef tst_open_read(src_dir, mnt_dir):
6046881f68fSopenharmony_ci    name = name_generator()
6056881f68fSopenharmony_ci    with open(pjoin(src_dir, name), 'wb') as fh_out, \
6066881f68fSopenharmony_ci         open(TEST_FILE, 'rb') as fh_in:
6076881f68fSopenharmony_ci        shutil.copyfileobj(fh_in, fh_out)
6086881f68fSopenharmony_ci
6096881f68fSopenharmony_ci    assert filecmp.cmp(pjoin(mnt_dir, name), TEST_FILE, False)
6106881f68fSopenharmony_ci
6116881f68fSopenharmony_cidef tst_open_write(src_dir, mnt_dir):
6126881f68fSopenharmony_ci    name = name_generator()
6136881f68fSopenharmony_ci    os_create(pjoin(src_dir, name))
6146881f68fSopenharmony_ci    fullname = pjoin(mnt_dir, name)
6156881f68fSopenharmony_ci    with open(fullname, 'wb') as fh_out, \
6166881f68fSopenharmony_ci         open(TEST_FILE, 'rb') as fh_in:
6176881f68fSopenharmony_ci        shutil.copyfileobj(fh_in, fh_out)
6186881f68fSopenharmony_ci
6196881f68fSopenharmony_ci    assert filecmp.cmp(fullname, TEST_FILE, False)
6206881f68fSopenharmony_ci
6216881f68fSopenharmony_cidef tst_append(src_dir, mnt_dir):
6226881f68fSopenharmony_ci    name = name_generator()
6236881f68fSopenharmony_ci    os_create(pjoin(src_dir, name))
6246881f68fSopenharmony_ci    fullname = pjoin(mnt_dir, name)
6256881f68fSopenharmony_ci    with os_open(fullname, os.O_WRONLY) as fd:
6266881f68fSopenharmony_ci        os.write(fd, b'foo\n')
6276881f68fSopenharmony_ci    with os_open(fullname, os.O_WRONLY|os.O_APPEND) as fd:
6286881f68fSopenharmony_ci        os.write(fd, b'bar\n')
6296881f68fSopenharmony_ci
6306881f68fSopenharmony_ci    with open(fullname, 'rb') as fh:
6316881f68fSopenharmony_ci        assert fh.read() == b'foo\nbar\n'
6326881f68fSopenharmony_ci
6336881f68fSopenharmony_cidef tst_seek(src_dir, mnt_dir):
6346881f68fSopenharmony_ci    name = name_generator()
6356881f68fSopenharmony_ci    os_create(pjoin(src_dir, name))
6366881f68fSopenharmony_ci    fullname = pjoin(mnt_dir, name)
6376881f68fSopenharmony_ci    with os_open(fullname, os.O_WRONLY) as fd:
6386881f68fSopenharmony_ci        os.lseek(fd, 1, os.SEEK_SET)
6396881f68fSopenharmony_ci        os.write(fd, b'foobar\n')
6406881f68fSopenharmony_ci    with os_open(fullname, os.O_WRONLY) as fd:
6416881f68fSopenharmony_ci        os.lseek(fd, 4, os.SEEK_SET)
6426881f68fSopenharmony_ci        os.write(fd, b'com')
6436881f68fSopenharmony_ci
6446881f68fSopenharmony_ci    with open(fullname, 'rb') as fh:
6456881f68fSopenharmony_ci        assert fh.read() == b'\0foocom\n'
6466881f68fSopenharmony_ci
6476881f68fSopenharmony_cidef tst_open_unlink(mnt_dir):
6486881f68fSopenharmony_ci    name = pjoin(mnt_dir, name_generator())
6496881f68fSopenharmony_ci    data1 = b'foo'
6506881f68fSopenharmony_ci    data2 = b'bar'
6516881f68fSopenharmony_ci    fullname = pjoin(mnt_dir, name)
6526881f68fSopenharmony_ci    with open(fullname, 'wb+', buffering=0) as fh:
6536881f68fSopenharmony_ci        fh.write(data1)
6546881f68fSopenharmony_ci        os.unlink(fullname)
6556881f68fSopenharmony_ci        with pytest.raises(OSError) as exc_info:
6566881f68fSopenharmony_ci            os.stat(fullname)
6576881f68fSopenharmony_ci        assert exc_info.value.errno == errno.ENOENT
6586881f68fSopenharmony_ci        assert name not in os.listdir(mnt_dir)
6596881f68fSopenharmony_ci        fh.write(data2)
6606881f68fSopenharmony_ci        fh.seek(0)
6616881f68fSopenharmony_ci        assert fh.read() == data1+data2
6626881f68fSopenharmony_ci
6636881f68fSopenharmony_cidef tst_statvfs(mnt_dir):
6646881f68fSopenharmony_ci    os.statvfs(mnt_dir)
6656881f68fSopenharmony_ci
6666881f68fSopenharmony_cidef tst_link(mnt_dir):
6676881f68fSopenharmony_ci    name1 = pjoin(mnt_dir, name_generator())
6686881f68fSopenharmony_ci    name2 = pjoin(mnt_dir, name_generator())
6696881f68fSopenharmony_ci    shutil.copyfile(TEST_FILE, name1)
6706881f68fSopenharmony_ci    assert filecmp.cmp(name1, TEST_FILE, False)
6716881f68fSopenharmony_ci
6726881f68fSopenharmony_ci    fstat1 = os.lstat(name1)
6736881f68fSopenharmony_ci    assert fstat1.st_nlink == 1
6746881f68fSopenharmony_ci
6756881f68fSopenharmony_ci    os.link(name1, name2)
6766881f68fSopenharmony_ci
6776881f68fSopenharmony_ci    fstat1 = os.lstat(name1)
6786881f68fSopenharmony_ci    fstat2 = os.lstat(name2)
6796881f68fSopenharmony_ci    assert fstat1 == fstat2
6806881f68fSopenharmony_ci    assert fstat1.st_nlink == 2
6816881f68fSopenharmony_ci    assert os.path.basename(name2) in os.listdir(mnt_dir)
6826881f68fSopenharmony_ci    assert filecmp.cmp(name1, name2, False)
6836881f68fSopenharmony_ci
6846881f68fSopenharmony_ci    # Since RELEASE requests are asynchronous, it is possible that
6856881f68fSopenharmony_ci    # libfuse still considers the file to be open at this point
6866881f68fSopenharmony_ci    # and (since -o hard_remove is not used) renames it instead of
6876881f68fSopenharmony_ci    # deleting it. In that case, the following lstat() call will
6886881f68fSopenharmony_ci    # still report an st_nlink value of 2 (cf. issue #157).
6896881f68fSopenharmony_ci    os.unlink(name2)
6906881f68fSopenharmony_ci
6916881f68fSopenharmony_ci    assert os.path.basename(name2) not in os.listdir(mnt_dir)
6926881f68fSopenharmony_ci    with pytest.raises(FileNotFoundError):
6936881f68fSopenharmony_ci        os.lstat(name2)
6946881f68fSopenharmony_ci
6956881f68fSopenharmony_ci    # See above, we may have to wait until RELEASE has been
6966881f68fSopenharmony_ci    # received before the st_nlink value is correct.
6976881f68fSopenharmony_ci    maxwait = time.time() + 2
6986881f68fSopenharmony_ci    fstat1 = os.lstat(name1)
6996881f68fSopenharmony_ci    while fstat1.st_nlink == 2 and time.time() < maxwait:
7006881f68fSopenharmony_ci        fstat1 = os.lstat(name1)
7016881f68fSopenharmony_ci        time.sleep(0.1)
7026881f68fSopenharmony_ci    assert fstat1.st_nlink == 1
7036881f68fSopenharmony_ci
7046881f68fSopenharmony_ci    os.unlink(name1)
7056881f68fSopenharmony_ci
7066881f68fSopenharmony_cidef tst_readdir(src_dir, mnt_dir):
7076881f68fSopenharmony_ci    newdir = name_generator()
7086881f68fSopenharmony_ci
7096881f68fSopenharmony_ci    src_newdir = pjoin(src_dir, newdir)
7106881f68fSopenharmony_ci    mnt_newdir = pjoin(mnt_dir, newdir)
7116881f68fSopenharmony_ci    file_ = src_newdir + "/" + name_generator()
7126881f68fSopenharmony_ci    subdir = src_newdir + "/" + name_generator()
7136881f68fSopenharmony_ci    subfile = subdir + "/" + name_generator()
7146881f68fSopenharmony_ci
7156881f68fSopenharmony_ci    os.mkdir(src_newdir)
7166881f68fSopenharmony_ci    shutil.copyfile(TEST_FILE, file_)
7176881f68fSopenharmony_ci    os.mkdir(subdir)
7186881f68fSopenharmony_ci    shutil.copyfile(TEST_FILE, subfile)
7196881f68fSopenharmony_ci
7206881f68fSopenharmony_ci    listdir_is = os.listdir(mnt_newdir)
7216881f68fSopenharmony_ci    listdir_is.sort()
7226881f68fSopenharmony_ci    listdir_should = [ os.path.basename(file_), os.path.basename(subdir) ]
7236881f68fSopenharmony_ci    listdir_should.sort()
7246881f68fSopenharmony_ci    assert listdir_is == listdir_should
7256881f68fSopenharmony_ci
7266881f68fSopenharmony_ci    inodes_is = readdir_inode(mnt_newdir)
7276881f68fSopenharmony_ci    inodes_should = readdir_inode(src_newdir)
7286881f68fSopenharmony_ci    assert inodes_is == inodes_should
7296881f68fSopenharmony_ci
7306881f68fSopenharmony_ci    os.unlink(file_)
7316881f68fSopenharmony_ci    os.unlink(subfile)
7326881f68fSopenharmony_ci    os.rmdir(subdir)
7336881f68fSopenharmony_ci    os.rmdir(src_newdir)
7346881f68fSopenharmony_ci
7356881f68fSopenharmony_cidef tst_readdir_big(src_dir, mnt_dir):
7366881f68fSopenharmony_ci
7376881f68fSopenharmony_ci    # Add enough entries so that readdir needs to be called
7386881f68fSopenharmony_ci    # multiple times.
7396881f68fSopenharmony_ci    fnames = []
7406881f68fSopenharmony_ci    for i in range(500):
7416881f68fSopenharmony_ci        fname  = ('A rather long filename to make sure that we '
7426881f68fSopenharmony_ci                  'fill up the buffer - ' * 3) + str(i)
7436881f68fSopenharmony_ci        with open(pjoin(src_dir, fname), 'w') as fh:
7446881f68fSopenharmony_ci            fh.write('File %d' % i)
7456881f68fSopenharmony_ci        fnames.append(fname)
7466881f68fSopenharmony_ci
7476881f68fSopenharmony_ci    listdir_is = sorted(os.listdir(mnt_dir))
7486881f68fSopenharmony_ci    listdir_should = sorted(os.listdir(src_dir))
7496881f68fSopenharmony_ci    assert listdir_is == listdir_should
7506881f68fSopenharmony_ci
7516881f68fSopenharmony_ci    inodes_is = readdir_inode(mnt_dir)
7526881f68fSopenharmony_ci    inodes_should = readdir_inode(src_dir)
7536881f68fSopenharmony_ci    assert inodes_is == inodes_should
7546881f68fSopenharmony_ci
7556881f68fSopenharmony_ci    for fname in fnames:
7566881f68fSopenharmony_ci        stat_src = os.stat(pjoin(src_dir, fname))
7576881f68fSopenharmony_ci        stat_mnt = os.stat(pjoin(mnt_dir, fname))
7586881f68fSopenharmony_ci        assert stat_src.st_ino == stat_mnt.st_ino
7596881f68fSopenharmony_ci        assert stat_src.st_mtime == stat_mnt.st_mtime
7606881f68fSopenharmony_ci        assert stat_src.st_ctime == stat_mnt.st_ctime
7616881f68fSopenharmony_ci        assert stat_src.st_size == stat_mnt.st_size
7626881f68fSopenharmony_ci        os.unlink(pjoin(src_dir, fname))
7636881f68fSopenharmony_ci
7646881f68fSopenharmony_cidef tst_truncate_path(mnt_dir):
7656881f68fSopenharmony_ci    assert len(TEST_DATA) > 1024
7666881f68fSopenharmony_ci
7676881f68fSopenharmony_ci    filename = pjoin(mnt_dir, name_generator())
7686881f68fSopenharmony_ci    with open(filename, 'wb') as fh:
7696881f68fSopenharmony_ci        fh.write(TEST_DATA)
7706881f68fSopenharmony_ci
7716881f68fSopenharmony_ci    fstat = os.stat(filename)
7726881f68fSopenharmony_ci    size = fstat.st_size
7736881f68fSopenharmony_ci    assert size == len(TEST_DATA)
7746881f68fSopenharmony_ci
7756881f68fSopenharmony_ci    # Add zeros at the end
7766881f68fSopenharmony_ci    os.truncate(filename, size + 1024)
7776881f68fSopenharmony_ci    assert os.stat(filename).st_size == size + 1024
7786881f68fSopenharmony_ci    with open(filename, 'rb') as fh:
7796881f68fSopenharmony_ci        assert fh.read(size) == TEST_DATA
7806881f68fSopenharmony_ci        assert fh.read(1025) == b'\0' * 1024
7816881f68fSopenharmony_ci
7826881f68fSopenharmony_ci    # Truncate data
7836881f68fSopenharmony_ci    os.truncate(filename, size - 1024)
7846881f68fSopenharmony_ci    assert os.stat(filename).st_size == size - 1024
7856881f68fSopenharmony_ci    with open(filename, 'rb') as fh:
7866881f68fSopenharmony_ci        assert fh.read(size) == TEST_DATA[:size-1024]
7876881f68fSopenharmony_ci
7886881f68fSopenharmony_ci    os.unlink(filename)
7896881f68fSopenharmony_ci
7906881f68fSopenharmony_cidef tst_truncate_fd(mnt_dir):
7916881f68fSopenharmony_ci    assert len(TEST_DATA) > 1024
7926881f68fSopenharmony_ci    with NamedTemporaryFile('w+b', 0, dir=mnt_dir) as fh:
7936881f68fSopenharmony_ci        fd = fh.fileno()
7946881f68fSopenharmony_ci        fh.write(TEST_DATA)
7956881f68fSopenharmony_ci        fstat = os.fstat(fd)
7966881f68fSopenharmony_ci        size = fstat.st_size
7976881f68fSopenharmony_ci        assert size == len(TEST_DATA)
7986881f68fSopenharmony_ci
7996881f68fSopenharmony_ci        # Add zeros at the end
8006881f68fSopenharmony_ci        os.ftruncate(fd, size + 1024)
8016881f68fSopenharmony_ci        assert os.fstat(fd).st_size == size + 1024
8026881f68fSopenharmony_ci        fh.seek(0)
8036881f68fSopenharmony_ci        assert fh.read(size) == TEST_DATA
8046881f68fSopenharmony_ci        assert fh.read(1025) == b'\0' * 1024
8056881f68fSopenharmony_ci
8066881f68fSopenharmony_ci        # Truncate data
8076881f68fSopenharmony_ci        os.ftruncate(fd, size - 1024)
8086881f68fSopenharmony_ci        assert os.fstat(fd).st_size == size - 1024
8096881f68fSopenharmony_ci        fh.seek(0)
8106881f68fSopenharmony_ci        assert fh.read(size) == TEST_DATA[:size-1024]
8116881f68fSopenharmony_ci
8126881f68fSopenharmony_cidef tst_utimens(mnt_dir, ns_tol=0):
8136881f68fSopenharmony_ci    filename = pjoin(mnt_dir, name_generator())
8146881f68fSopenharmony_ci    os.mkdir(filename)
8156881f68fSopenharmony_ci    fstat = os.lstat(filename)
8166881f68fSopenharmony_ci
8176881f68fSopenharmony_ci    atime = fstat.st_atime + 42.28
8186881f68fSopenharmony_ci    mtime = fstat.st_mtime - 42.23
8196881f68fSopenharmony_ci    if sys.version_info < (3,3):
8206881f68fSopenharmony_ci        os.utime(filename, (atime, mtime))
8216881f68fSopenharmony_ci    else:
8226881f68fSopenharmony_ci        atime_ns = fstat.st_atime_ns + int(42.28*1e9)
8236881f68fSopenharmony_ci        mtime_ns = fstat.st_mtime_ns - int(42.23*1e9)
8246881f68fSopenharmony_ci        os.utime(filename, None, ns=(atime_ns, mtime_ns))
8256881f68fSopenharmony_ci
8266881f68fSopenharmony_ci    fstat = os.lstat(filename)
8276881f68fSopenharmony_ci
8286881f68fSopenharmony_ci    assert abs(fstat.st_atime - atime) < 1
8296881f68fSopenharmony_ci    assert abs(fstat.st_mtime - mtime) < 1
8306881f68fSopenharmony_ci    if sys.version_info >= (3,3):
8316881f68fSopenharmony_ci        assert abs(fstat.st_atime_ns - atime_ns) <= ns_tol
8326881f68fSopenharmony_ci        assert abs(fstat.st_mtime_ns - mtime_ns) <= ns_tol
8336881f68fSopenharmony_ci
8346881f68fSopenharmony_cidef tst_passthrough(src_dir, mnt_dir):
8356881f68fSopenharmony_ci    name = name_generator()
8366881f68fSopenharmony_ci    src_name = pjoin(src_dir, name)
8376881f68fSopenharmony_ci    mnt_name = pjoin(src_dir, name)
8386881f68fSopenharmony_ci    assert name not in os.listdir(src_dir)
8396881f68fSopenharmony_ci    assert name not in os.listdir(mnt_dir)
8406881f68fSopenharmony_ci    with open(src_name, 'w') as fh:
8416881f68fSopenharmony_ci        fh.write('Hello, world')
8426881f68fSopenharmony_ci    assert name in os.listdir(src_dir)
8436881f68fSopenharmony_ci    assert name in os.listdir(mnt_dir)
8446881f68fSopenharmony_ci    assert os.stat(src_name) == os.stat(mnt_name)
8456881f68fSopenharmony_ci
8466881f68fSopenharmony_ci    name = name_generator()
8476881f68fSopenharmony_ci    src_name = pjoin(src_dir, name)
8486881f68fSopenharmony_ci    mnt_name = pjoin(src_dir, name)
8496881f68fSopenharmony_ci    assert name not in os.listdir(src_dir)
8506881f68fSopenharmony_ci    assert name not in os.listdir(mnt_dir)
8516881f68fSopenharmony_ci    with open(mnt_name, 'w') as fh:
8526881f68fSopenharmony_ci        fh.write('Hello, world')
8536881f68fSopenharmony_ci    assert name in os.listdir(src_dir)
8546881f68fSopenharmony_ci    assert name in os.listdir(mnt_dir)
8556881f68fSopenharmony_ci    assert os.stat(src_name) == os.stat(mnt_name)
8566881f68fSopenharmony_ci
8576881f68fSopenharmony_ci
8586881f68fSopenharmony_cidef tst_xattr(mnt_dir):
8596881f68fSopenharmony_ci    path = os.path.join(mnt_dir, 'hello')
8606881f68fSopenharmony_ci    os.setxattr(path, b'hello_ll_setxattr_name', b'hello_ll_setxattr_value')
8616881f68fSopenharmony_ci    assert os.getxattr(path, b'hello_ll_getxattr_name') == b'hello_ll_getxattr_value'
8626881f68fSopenharmony_ci    os.removexattr(path, b'hello_ll_removexattr_name')
8636881f68fSopenharmony_ci
8646881f68fSopenharmony_ci
8656881f68fSopenharmony_ci# avoid warning about unused import
8666881f68fSopenharmony_ciassert test_printcap
867