xref: /third_party/python/Lib/test/test_os.py (revision 7db96d56)
1# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
3# portable than they had been thought to be.
4
5import asyncio
6import codecs
7import contextlib
8import decimal
9import errno
10import fnmatch
11import fractions
12import itertools
13import locale
14import os
15import pickle
16import select
17import shutil
18import signal
19import socket
20import stat
21import struct
22import subprocess
23import sys
24import sysconfig
25import tempfile
26import textwrap
27import time
28import types
29import unittest
30import uuid
31import warnings
32from test import support
33from test.support import import_helper
34from test.support import os_helper
35from test.support import socket_helper
36from test.support import warnings_helper
37from platform import win32_is_iot
38
39try:
40    import resource
41except ImportError:
42    resource = None
43try:
44    import fcntl
45except ImportError:
46    fcntl = None
47try:
48    import _winapi
49except ImportError:
50    _winapi = None
51try:
52    import pwd
53    all_users = [u.pw_uid for u in pwd.getpwall()]
54except (ImportError, AttributeError):
55    all_users = []
56try:
57    from _testcapi import INT_MAX, PY_SSIZE_T_MAX
58except ImportError:
59    INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
60
61try:
62    import mmap
63except ImportError:
64    mmap = None
65
66from test.support.script_helper import assert_python_ok
67from test.support import unix_shell
68from test.support.os_helper import FakePath
69
70
71root_in_posix = False
72if hasattr(os, 'geteuid'):
73    root_in_posix = (os.geteuid() == 0)
74
75# Detect whether we're on a Linux system that uses the (now outdated
76# and unmaintained) linuxthreads threading library.  There's an issue
77# when combining linuxthreads with a failed execv call: see
78# http://bugs.python.org/issue4970.
79if hasattr(sys, 'thread_info') and sys.thread_info.version:
80    USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
81else:
82    USING_LINUXTHREADS = False
83
84# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
85HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
86
87
88def requires_os_func(name):
89    return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
90
91
92def create_file(filename, content=b'content'):
93    with open(filename, "xb", 0) as fp:
94        fp.write(content)
95
96
97# bpo-41625: On AIX, splice() only works with a socket, not with a pipe.
98requires_splice_pipe = unittest.skipIf(sys.platform.startswith("aix"),
99                                       'on AIX, splice() only accepts sockets')
100
101
102def tearDownModule():
103    asyncio.set_event_loop_policy(None)
104
105
106class MiscTests(unittest.TestCase):
107    def test_getcwd(self):
108        cwd = os.getcwd()
109        self.assertIsInstance(cwd, str)
110
111    def test_getcwd_long_path(self):
112        # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
113        # Windows, MAX_PATH is defined as 260 characters, but Windows supports
114        # longer path if longer paths support is enabled. Internally, the os
115        # module uses MAXPATHLEN which is at least 1024.
116        #
117        # Use a directory name of 200 characters to fit into Windows MAX_PATH
118        # limit.
119        #
120        # On Windows, the test can stop when trying to create a path longer
121        # than MAX_PATH if long paths support is disabled:
122        # see RtlAreLongPathsEnabled().
123        min_len = 2000   # characters
124        # On VxWorks, PATH_MAX is defined as 1024 bytes. Creating a path
125        # longer than PATH_MAX will fail.
126        if sys.platform == 'vxworks':
127            min_len = 1000
128        dirlen = 200     # characters
129        dirname = 'python_test_dir_'
130        dirname = dirname + ('a' * (dirlen - len(dirname)))
131
132        with tempfile.TemporaryDirectory() as tmpdir:
133            with os_helper.change_cwd(tmpdir) as path:
134                expected = path
135
136                while True:
137                    cwd = os.getcwd()
138                    self.assertEqual(cwd, expected)
139
140                    need = min_len - (len(cwd) + len(os.path.sep))
141                    if need <= 0:
142                        break
143                    if len(dirname) > need and need > 0:
144                        dirname = dirname[:need]
145
146                    path = os.path.join(path, dirname)
147                    try:
148                        os.mkdir(path)
149                        # On Windows, chdir() can fail
150                        # even if mkdir() succeeded
151                        os.chdir(path)
152                    except FileNotFoundError:
153                        # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
154                        # ERROR_FILENAME_EXCED_RANGE (206) errors
155                        # ("The filename or extension is too long")
156                        break
157                    except OSError as exc:
158                        if exc.errno == errno.ENAMETOOLONG:
159                            break
160                        else:
161                            raise
162
163                    expected = path
164
165                if support.verbose:
166                    print(f"Tested current directory length: {len(cwd)}")
167
168    def test_getcwdb(self):
169        cwd = os.getcwdb()
170        self.assertIsInstance(cwd, bytes)
171        self.assertEqual(os.fsdecode(cwd), os.getcwd())
172
173
174# Tests creating TESTFN
175class FileTests(unittest.TestCase):
176    def setUp(self):
177        if os.path.lexists(os_helper.TESTFN):
178            os.unlink(os_helper.TESTFN)
179    tearDown = setUp
180
181    def test_access(self):
182        f = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
183        os.close(f)
184        self.assertTrue(os.access(os_helper.TESTFN, os.W_OK))
185
186    @unittest.skipIf(
187        support.is_emscripten, "Test is unstable under Emscripten."
188    )
189    @unittest.skipIf(
190        support.is_wasi, "WASI does not support dup."
191    )
192    def test_closerange(self):
193        first = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
194        # We must allocate two consecutive file descriptors, otherwise
195        # it will mess up other file descriptors (perhaps even the three
196        # standard ones).
197        second = os.dup(first)
198        try:
199            retries = 0
200            while second != first + 1:
201                os.close(first)
202                retries += 1
203                if retries > 10:
204                    # XXX test skipped
205                    self.skipTest("couldn't allocate two consecutive fds")
206                first, second = second, os.dup(second)
207        finally:
208            os.close(second)
209        # close a fd that is open, and one that isn't
210        os.closerange(first, first + 2)
211        self.assertRaises(OSError, os.write, first, b"a")
212
213    @support.cpython_only
214    def test_rename(self):
215        path = os_helper.TESTFN
216        old = sys.getrefcount(path)
217        self.assertRaises(TypeError, os.rename, path, 0)
218        new = sys.getrefcount(path)
219        self.assertEqual(old, new)
220
221    def test_read(self):
222        with open(os_helper.TESTFN, "w+b") as fobj:
223            fobj.write(b"spam")
224            fobj.flush()
225            fd = fobj.fileno()
226            os.lseek(fd, 0, 0)
227            s = os.read(fd, 4)
228            self.assertEqual(type(s), bytes)
229            self.assertEqual(s, b"spam")
230
231    @support.cpython_only
232    # Skip the test on 32-bit platforms: the number of bytes must fit in a
233    # Py_ssize_t type
234    @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
235                         "needs INT_MAX < PY_SSIZE_T_MAX")
236    @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
237    def test_large_read(self, size):
238        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
239        create_file(os_helper.TESTFN, b'test')
240
241        # Issue #21932: Make sure that os.read() does not raise an
242        # OverflowError for size larger than INT_MAX
243        with open(os_helper.TESTFN, "rb") as fp:
244            data = os.read(fp.fileno(), size)
245
246        # The test does not try to read more than 2 GiB at once because the
247        # operating system is free to return less bytes than requested.
248        self.assertEqual(data, b'test')
249
250    def test_write(self):
251        # os.write() accepts bytes- and buffer-like objects but not strings
252        fd = os.open(os_helper.TESTFN, os.O_CREAT | os.O_WRONLY)
253        self.assertRaises(TypeError, os.write, fd, "beans")
254        os.write(fd, b"bacon\n")
255        os.write(fd, bytearray(b"eggs\n"))
256        os.write(fd, memoryview(b"spam\n"))
257        os.close(fd)
258        with open(os_helper.TESTFN, "rb") as fobj:
259            self.assertEqual(fobj.read().splitlines(),
260                [b"bacon", b"eggs", b"spam"])
261
262    def write_windows_console(self, *args):
263        retcode = subprocess.call(args,
264            # use a new console to not flood the test output
265            creationflags=subprocess.CREATE_NEW_CONSOLE,
266            # use a shell to hide the console window (SW_HIDE)
267            shell=True)
268        self.assertEqual(retcode, 0)
269
270    @unittest.skipUnless(sys.platform == 'win32',
271                         'test specific to the Windows console')
272    def test_write_windows_console(self):
273        # Issue #11395: the Windows console returns an error (12: not enough
274        # space error) on writing into stdout if stdout mode is binary and the
275        # length is greater than 66,000 bytes (or less, depending on heap
276        # usage).
277        code = "print('x' * 100000)"
278        self.write_windows_console(sys.executable, "-c", code)
279        self.write_windows_console(sys.executable, "-u", "-c", code)
280
281    def fdopen_helper(self, *args):
282        fd = os.open(os_helper.TESTFN, os.O_RDONLY)
283        f = os.fdopen(fd, *args, encoding="utf-8")
284        f.close()
285
286    def test_fdopen(self):
287        fd = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
288        os.close(fd)
289
290        self.fdopen_helper()
291        self.fdopen_helper('r')
292        self.fdopen_helper('r', 100)
293
294    def test_replace(self):
295        TESTFN2 = os_helper.TESTFN + ".2"
296        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
297        self.addCleanup(os_helper.unlink, TESTFN2)
298
299        create_file(os_helper.TESTFN, b"1")
300        create_file(TESTFN2, b"2")
301
302        os.replace(os_helper.TESTFN, TESTFN2)
303        self.assertRaises(FileNotFoundError, os.stat, os_helper.TESTFN)
304        with open(TESTFN2, 'r', encoding='utf-8') as f:
305            self.assertEqual(f.read(), "1")
306
307    def test_open_keywords(self):
308        f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
309            dir_fd=None)
310        os.close(f)
311
312    def test_symlink_keywords(self):
313        symlink = support.get_attribute(os, "symlink")
314        try:
315            symlink(src='target', dst=os_helper.TESTFN,
316                target_is_directory=False, dir_fd=None)
317        except (NotImplementedError, OSError):
318            pass  # No OS support or unprivileged user
319
320    @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
321    def test_copy_file_range_invalid_values(self):
322        with self.assertRaises(ValueError):
323            os.copy_file_range(0, 1, -10)
324
325    @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
326    def test_copy_file_range(self):
327        TESTFN2 = os_helper.TESTFN + ".3"
328        data = b'0123456789'
329
330        create_file(os_helper.TESTFN, data)
331        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
332
333        in_file = open(os_helper.TESTFN, 'rb')
334        self.addCleanup(in_file.close)
335        in_fd = in_file.fileno()
336
337        out_file = open(TESTFN2, 'w+b')
338        self.addCleanup(os_helper.unlink, TESTFN2)
339        self.addCleanup(out_file.close)
340        out_fd = out_file.fileno()
341
342        try:
343            i = os.copy_file_range(in_fd, out_fd, 5)
344        except OSError as e:
345            # Handle the case in which Python was compiled
346            # in a system with the syscall but without support
347            # in the kernel.
348            if e.errno != errno.ENOSYS:
349                raise
350            self.skipTest(e)
351        else:
352            # The number of copied bytes can be less than
353            # the number of bytes originally requested.
354            self.assertIn(i, range(0, 6));
355
356            with open(TESTFN2, 'rb') as in_file:
357                self.assertEqual(in_file.read(), data[:i])
358
359    @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
360    def test_copy_file_range_offset(self):
361        TESTFN4 = os_helper.TESTFN + ".4"
362        data = b'0123456789'
363        bytes_to_copy = 6
364        in_skip = 3
365        out_seek = 5
366
367        create_file(os_helper.TESTFN, data)
368        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
369
370        in_file = open(os_helper.TESTFN, 'rb')
371        self.addCleanup(in_file.close)
372        in_fd = in_file.fileno()
373
374        out_file = open(TESTFN4, 'w+b')
375        self.addCleanup(os_helper.unlink, TESTFN4)
376        self.addCleanup(out_file.close)
377        out_fd = out_file.fileno()
378
379        try:
380            i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
381                                   offset_src=in_skip,
382                                   offset_dst=out_seek)
383        except OSError as e:
384            # Handle the case in which Python was compiled
385            # in a system with the syscall but without support
386            # in the kernel.
387            if e.errno != errno.ENOSYS:
388                raise
389            self.skipTest(e)
390        else:
391            # The number of copied bytes can be less than
392            # the number of bytes originally requested.
393            self.assertIn(i, range(0, bytes_to_copy+1));
394
395            with open(TESTFN4, 'rb') as in_file:
396                read = in_file.read()
397            # seeked bytes (5) are zero'ed
398            self.assertEqual(read[:out_seek], b'\x00'*out_seek)
399            # 012 are skipped (in_skip)
400            # 345678 are copied in the file (in_skip + bytes_to_copy)
401            self.assertEqual(read[out_seek:],
402                             data[in_skip:in_skip+i])
403
404    @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
405    def test_splice_invalid_values(self):
406        with self.assertRaises(ValueError):
407            os.splice(0, 1, -10)
408
409    @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
410    @requires_splice_pipe
411    def test_splice(self):
412        TESTFN2 = os_helper.TESTFN + ".3"
413        data = b'0123456789'
414
415        create_file(os_helper.TESTFN, data)
416        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
417
418        in_file = open(os_helper.TESTFN, 'rb')
419        self.addCleanup(in_file.close)
420        in_fd = in_file.fileno()
421
422        read_fd, write_fd = os.pipe()
423        self.addCleanup(lambda: os.close(read_fd))
424        self.addCleanup(lambda: os.close(write_fd))
425
426        try:
427            i = os.splice(in_fd, write_fd, 5)
428        except OSError as e:
429            # Handle the case in which Python was compiled
430            # in a system with the syscall but without support
431            # in the kernel.
432            if e.errno != errno.ENOSYS:
433                raise
434            self.skipTest(e)
435        else:
436            # The number of copied bytes can be less than
437            # the number of bytes originally requested.
438            self.assertIn(i, range(0, 6));
439
440            self.assertEqual(os.read(read_fd, 100), data[:i])
441
442    @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
443    @requires_splice_pipe
444    def test_splice_offset_in(self):
445        TESTFN4 = os_helper.TESTFN + ".4"
446        data = b'0123456789'
447        bytes_to_copy = 6
448        in_skip = 3
449
450        create_file(os_helper.TESTFN, data)
451        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
452
453        in_file = open(os_helper.TESTFN, 'rb')
454        self.addCleanup(in_file.close)
455        in_fd = in_file.fileno()
456
457        read_fd, write_fd = os.pipe()
458        self.addCleanup(lambda: os.close(read_fd))
459        self.addCleanup(lambda: os.close(write_fd))
460
461        try:
462            i = os.splice(in_fd, write_fd, bytes_to_copy, offset_src=in_skip)
463        except OSError as e:
464            # Handle the case in which Python was compiled
465            # in a system with the syscall but without support
466            # in the kernel.
467            if e.errno != errno.ENOSYS:
468                raise
469            self.skipTest(e)
470        else:
471            # The number of copied bytes can be less than
472            # the number of bytes originally requested.
473            self.assertIn(i, range(0, bytes_to_copy+1));
474
475            read = os.read(read_fd, 100)
476            # 012 are skipped (in_skip)
477            # 345678 are copied in the file (in_skip + bytes_to_copy)
478            self.assertEqual(read, data[in_skip:in_skip+i])
479
480    @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
481    @requires_splice_pipe
482    def test_splice_offset_out(self):
483        TESTFN4 = os_helper.TESTFN + ".4"
484        data = b'0123456789'
485        bytes_to_copy = 6
486        out_seek = 3
487
488        create_file(os_helper.TESTFN, data)
489        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
490
491        read_fd, write_fd = os.pipe()
492        self.addCleanup(lambda: os.close(read_fd))
493        self.addCleanup(lambda: os.close(write_fd))
494        os.write(write_fd, data)
495
496        out_file = open(TESTFN4, 'w+b')
497        self.addCleanup(os_helper.unlink, TESTFN4)
498        self.addCleanup(out_file.close)
499        out_fd = out_file.fileno()
500
501        try:
502            i = os.splice(read_fd, out_fd, bytes_to_copy, offset_dst=out_seek)
503        except OSError as e:
504            # Handle the case in which Python was compiled
505            # in a system with the syscall but without support
506            # in the kernel.
507            if e.errno != errno.ENOSYS:
508                raise
509            self.skipTest(e)
510        else:
511            # The number of copied bytes can be less than
512            # the number of bytes originally requested.
513            self.assertIn(i, range(0, bytes_to_copy+1));
514
515            with open(TESTFN4, 'rb') as in_file:
516                read = in_file.read()
517            # seeked bytes (5) are zero'ed
518            self.assertEqual(read[:out_seek], b'\x00'*out_seek)
519            # 012 are skipped (in_skip)
520            # 345678 are copied in the file (in_skip + bytes_to_copy)
521            self.assertEqual(read[out_seek:], data[:i])
522
523
524# Test attributes on return values from os.*stat* family.
525class StatAttributeTests(unittest.TestCase):
526    def setUp(self):
527        self.fname = os_helper.TESTFN
528        self.addCleanup(os_helper.unlink, self.fname)
529        create_file(self.fname, b"ABC")
530
531    def check_stat_attributes(self, fname):
532        result = os.stat(fname)
533
534        # Make sure direct access works
535        self.assertEqual(result[stat.ST_SIZE], 3)
536        self.assertEqual(result.st_size, 3)
537
538        # Make sure all the attributes are there
539        members = dir(result)
540        for name in dir(stat):
541            if name[:3] == 'ST_':
542                attr = name.lower()
543                if name.endswith("TIME"):
544                    def trunc(x): return int(x)
545                else:
546                    def trunc(x): return x
547                self.assertEqual(trunc(getattr(result, attr)),
548                                  result[getattr(stat, name)])
549                self.assertIn(attr, members)
550
551        # Make sure that the st_?time and st_?time_ns fields roughly agree
552        # (they should always agree up to around tens-of-microseconds)
553        for name in 'st_atime st_mtime st_ctime'.split():
554            floaty = int(getattr(result, name) * 100000)
555            nanosecondy = getattr(result, name + "_ns") // 10000
556            self.assertAlmostEqual(floaty, nanosecondy, delta=2)
557
558        try:
559            result[200]
560            self.fail("No exception raised")
561        except IndexError:
562            pass
563
564        # Make sure that assignment fails
565        try:
566            result.st_mode = 1
567            self.fail("No exception raised")
568        except AttributeError:
569            pass
570
571        try:
572            result.st_rdev = 1
573            self.fail("No exception raised")
574        except (AttributeError, TypeError):
575            pass
576
577        try:
578            result.parrot = 1
579            self.fail("No exception raised")
580        except AttributeError:
581            pass
582
583        # Use the stat_result constructor with a too-short tuple.
584        try:
585            result2 = os.stat_result((10,))
586            self.fail("No exception raised")
587        except TypeError:
588            pass
589
590        # Use the constructor with a too-long tuple.
591        try:
592            result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
593        except TypeError:
594            pass
595
596    def test_stat_attributes(self):
597        self.check_stat_attributes(self.fname)
598
599    def test_stat_attributes_bytes(self):
600        try:
601            fname = self.fname.encode(sys.getfilesystemencoding())
602        except UnicodeEncodeError:
603            self.skipTest("cannot encode %a for the filesystem" % self.fname)
604        self.check_stat_attributes(fname)
605
606    def test_stat_result_pickle(self):
607        result = os.stat(self.fname)
608        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
609            p = pickle.dumps(result, proto)
610            self.assertIn(b'stat_result', p)
611            if proto < 4:
612                self.assertIn(b'cos\nstat_result\n', p)
613            unpickled = pickle.loads(p)
614            self.assertEqual(result, unpickled)
615
616    @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
617    def test_statvfs_attributes(self):
618        result = os.statvfs(self.fname)
619
620        # Make sure direct access works
621        self.assertEqual(result.f_bfree, result[3])
622
623        # Make sure all the attributes are there.
624        members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
625                    'ffree', 'favail', 'flag', 'namemax')
626        for value, member in enumerate(members):
627            self.assertEqual(getattr(result, 'f_' + member), result[value])
628
629        self.assertTrue(isinstance(result.f_fsid, int))
630
631        # Test that the size of the tuple doesn't change
632        self.assertEqual(len(result), 10)
633
634        # Make sure that assignment really fails
635        try:
636            result.f_bfree = 1
637            self.fail("No exception raised")
638        except AttributeError:
639            pass
640
641        try:
642            result.parrot = 1
643            self.fail("No exception raised")
644        except AttributeError:
645            pass
646
647        # Use the constructor with a too-short tuple.
648        try:
649            result2 = os.statvfs_result((10,))
650            self.fail("No exception raised")
651        except TypeError:
652            pass
653
654        # Use the constructor with a too-long tuple.
655        try:
656            result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
657        except TypeError:
658            pass
659
660    @unittest.skipUnless(hasattr(os, 'statvfs'),
661                         "need os.statvfs()")
662    def test_statvfs_result_pickle(self):
663        result = os.statvfs(self.fname)
664
665        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
666            p = pickle.dumps(result, proto)
667            self.assertIn(b'statvfs_result', p)
668            if proto < 4:
669                self.assertIn(b'cos\nstatvfs_result\n', p)
670            unpickled = pickle.loads(p)
671            self.assertEqual(result, unpickled)
672
673    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
674    def test_1686475(self):
675        # Verify that an open file can be stat'ed
676        try:
677            os.stat(r"c:\pagefile.sys")
678        except FileNotFoundError:
679            self.skipTest(r'c:\pagefile.sys does not exist')
680        except OSError as e:
681            self.fail("Could not stat pagefile.sys")
682
683    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
684    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
685    def test_15261(self):
686        # Verify that stat'ing a closed fd does not cause crash
687        r, w = os.pipe()
688        try:
689            os.stat(r)          # should not raise error
690        finally:
691            os.close(r)
692            os.close(w)
693        with self.assertRaises(OSError) as ctx:
694            os.stat(r)
695        self.assertEqual(ctx.exception.errno, errno.EBADF)
696
697    def check_file_attributes(self, result):
698        self.assertTrue(hasattr(result, 'st_file_attributes'))
699        self.assertTrue(isinstance(result.st_file_attributes, int))
700        self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
701
702    @unittest.skipUnless(sys.platform == "win32",
703                         "st_file_attributes is Win32 specific")
704    def test_file_attributes(self):
705        # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
706        result = os.stat(self.fname)
707        self.check_file_attributes(result)
708        self.assertEqual(
709            result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
710            0)
711
712        # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
713        dirname = os_helper.TESTFN + "dir"
714        os.mkdir(dirname)
715        self.addCleanup(os.rmdir, dirname)
716
717        result = os.stat(dirname)
718        self.check_file_attributes(result)
719        self.assertEqual(
720            result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
721            stat.FILE_ATTRIBUTE_DIRECTORY)
722
723    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
724    def test_access_denied(self):
725        # Default to FindFirstFile WIN32_FIND_DATA when access is
726        # denied. See issue 28075.
727        # os.environ['TEMP'] should be located on a volume that
728        # supports file ACLs.
729        fname = os.path.join(os.environ['TEMP'], self.fname)
730        self.addCleanup(os_helper.unlink, fname)
731        create_file(fname, b'ABC')
732        # Deny the right to [S]YNCHRONIZE on the file to
733        # force CreateFile to fail with ERROR_ACCESS_DENIED.
734        DETACHED_PROCESS = 8
735        subprocess.check_call(
736            # bpo-30584: Use security identifier *S-1-5-32-545 instead
737            # of localized "Users" to not depend on the locale.
738            ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
739            creationflags=DETACHED_PROCESS
740        )
741        result = os.stat(fname)
742        self.assertNotEqual(result.st_size, 0)
743
744    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
745    def test_stat_block_device(self):
746        # bpo-38030: os.stat fails for block devices
747        # Test a filename like "//./C:"
748        fname = "//./" + os.path.splitdrive(os.getcwd())[0]
749        result = os.stat(fname)
750        self.assertEqual(result.st_mode, stat.S_IFBLK)
751
752
753class UtimeTests(unittest.TestCase):
754    def setUp(self):
755        self.dirname = os_helper.TESTFN
756        self.fname = os.path.join(self.dirname, "f1")
757
758        self.addCleanup(os_helper.rmtree, self.dirname)
759        os.mkdir(self.dirname)
760        create_file(self.fname)
761
762    def support_subsecond(self, filename):
763        # Heuristic to check if the filesystem supports timestamp with
764        # subsecond resolution: check if float and int timestamps are different
765        st = os.stat(filename)
766        return ((st.st_atime != st[7])
767                or (st.st_mtime != st[8])
768                or (st.st_ctime != st[9]))
769
770    def _test_utime(self, set_time, filename=None):
771        if not filename:
772            filename = self.fname
773
774        support_subsecond = self.support_subsecond(filename)
775        if support_subsecond:
776            # Timestamp with a resolution of 1 microsecond (10^-6).
777            #
778            # The resolution of the C internal function used by os.utime()
779            # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
780            # test with a resolution of 1 ns requires more work:
781            # see the issue #15745.
782            atime_ns = 1002003000   # 1.002003 seconds
783            mtime_ns = 4005006000   # 4.005006 seconds
784        else:
785            # use a resolution of 1 second
786            atime_ns = 5 * 10**9
787            mtime_ns = 8 * 10**9
788
789        set_time(filename, (atime_ns, mtime_ns))
790        st = os.stat(filename)
791
792        if support_subsecond:
793            self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
794            self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
795        else:
796            self.assertEqual(st.st_atime, atime_ns * 1e-9)
797            self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
798        self.assertEqual(st.st_atime_ns, atime_ns)
799        self.assertEqual(st.st_mtime_ns, mtime_ns)
800
801    def test_utime(self):
802        def set_time(filename, ns):
803            # test the ns keyword parameter
804            os.utime(filename, ns=ns)
805        self._test_utime(set_time)
806
807    @staticmethod
808    def ns_to_sec(ns):
809        # Convert a number of nanosecond (int) to a number of seconds (float).
810        # Round towards infinity by adding 0.5 nanosecond to avoid rounding
811        # issue, os.utime() rounds towards minus infinity.
812        return (ns * 1e-9) + 0.5e-9
813
814    def test_utime_by_indexed(self):
815        # pass times as floating point seconds as the second indexed parameter
816        def set_time(filename, ns):
817            atime_ns, mtime_ns = ns
818            atime = self.ns_to_sec(atime_ns)
819            mtime = self.ns_to_sec(mtime_ns)
820            # test utimensat(timespec), utimes(timeval), utime(utimbuf)
821            # or utime(time_t)
822            os.utime(filename, (atime, mtime))
823        self._test_utime(set_time)
824
825    def test_utime_by_times(self):
826        def set_time(filename, ns):
827            atime_ns, mtime_ns = ns
828            atime = self.ns_to_sec(atime_ns)
829            mtime = self.ns_to_sec(mtime_ns)
830            # test the times keyword parameter
831            os.utime(filename, times=(atime, mtime))
832        self._test_utime(set_time)
833
834    @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
835                         "follow_symlinks support for utime required "
836                         "for this test.")
837    def test_utime_nofollow_symlinks(self):
838        def set_time(filename, ns):
839            # use follow_symlinks=False to test utimensat(timespec)
840            # or lutimes(timeval)
841            os.utime(filename, ns=ns, follow_symlinks=False)
842        self._test_utime(set_time)
843
844    @unittest.skipUnless(os.utime in os.supports_fd,
845                         "fd support for utime required for this test.")
846    def test_utime_fd(self):
847        def set_time(filename, ns):
848            with open(filename, 'wb', 0) as fp:
849                # use a file descriptor to test futimens(timespec)
850                # or futimes(timeval)
851                os.utime(fp.fileno(), ns=ns)
852        self._test_utime(set_time)
853
854    @unittest.skipUnless(os.utime in os.supports_dir_fd,
855                         "dir_fd support for utime required for this test.")
856    def test_utime_dir_fd(self):
857        def set_time(filename, ns):
858            dirname, name = os.path.split(filename)
859            with os_helper.open_dir_fd(dirname) as dirfd:
860                # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
861                os.utime(name, dir_fd=dirfd, ns=ns)
862        self._test_utime(set_time)
863
864    def test_utime_directory(self):
865        def set_time(filename, ns):
866            # test calling os.utime() on a directory
867            os.utime(filename, ns=ns)
868        self._test_utime(set_time, filename=self.dirname)
869
870    def _test_utime_current(self, set_time):
871        # Get the system clock
872        current = time.time()
873
874        # Call os.utime() to set the timestamp to the current system clock
875        set_time(self.fname)
876
877        if not self.support_subsecond(self.fname):
878            delta = 1.0
879        else:
880            # On Windows, the usual resolution of time.time() is 15.6 ms.
881            # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
882            #
883            # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
884            # also 50 ms on other platforms.
885            delta = 0.050
886        st = os.stat(self.fname)
887        msg = ("st_time=%r, current=%r, dt=%r"
888               % (st.st_mtime, current, st.st_mtime - current))
889        self.assertAlmostEqual(st.st_mtime, current,
890                               delta=delta, msg=msg)
891
892    def test_utime_current(self):
893        def set_time(filename):
894            # Set to the current time in the new way
895            os.utime(self.fname)
896        self._test_utime_current(set_time)
897
898    def test_utime_current_old(self):
899        def set_time(filename):
900            # Set to the current time in the old explicit way.
901            os.utime(self.fname, None)
902        self._test_utime_current(set_time)
903
904    def get_file_system(self, path):
905        if sys.platform == 'win32':
906            root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
907            import ctypes
908            kernel32 = ctypes.windll.kernel32
909            buf = ctypes.create_unicode_buffer("", 100)
910            ok = kernel32.GetVolumeInformationW(root, None, 0,
911                                                None, None, None,
912                                                buf, len(buf))
913            if ok:
914                return buf.value
915        # return None if the filesystem is unknown
916
917    def test_large_time(self):
918        # Many filesystems are limited to the year 2038. At least, the test
919        # pass with NTFS filesystem.
920        if self.get_file_system(self.dirname) != "NTFS":
921            self.skipTest("requires NTFS")
922
923        large = 5000000000   # some day in 2128
924        os.utime(self.fname, (large, large))
925        self.assertEqual(os.stat(self.fname).st_mtime, large)
926
927    def test_utime_invalid_arguments(self):
928        # seconds and nanoseconds parameters are mutually exclusive
929        with self.assertRaises(ValueError):
930            os.utime(self.fname, (5, 5), ns=(5, 5))
931        with self.assertRaises(TypeError):
932            os.utime(self.fname, [5, 5])
933        with self.assertRaises(TypeError):
934            os.utime(self.fname, (5,))
935        with self.assertRaises(TypeError):
936            os.utime(self.fname, (5, 5, 5))
937        with self.assertRaises(TypeError):
938            os.utime(self.fname, ns=[5, 5])
939        with self.assertRaises(TypeError):
940            os.utime(self.fname, ns=(5,))
941        with self.assertRaises(TypeError):
942            os.utime(self.fname, ns=(5, 5, 5))
943
944        if os.utime not in os.supports_follow_symlinks:
945            with self.assertRaises(NotImplementedError):
946                os.utime(self.fname, (5, 5), follow_symlinks=False)
947        if os.utime not in os.supports_fd:
948            with open(self.fname, 'wb', 0) as fp:
949                with self.assertRaises(TypeError):
950                    os.utime(fp.fileno(), (5, 5))
951        if os.utime not in os.supports_dir_fd:
952            with self.assertRaises(NotImplementedError):
953                os.utime(self.fname, (5, 5), dir_fd=0)
954
955    @support.cpython_only
956    def test_issue31577(self):
957        # The interpreter shouldn't crash in case utime() received a bad
958        # ns argument.
959        def get_bad_int(divmod_ret_val):
960            class BadInt:
961                def __divmod__(*args):
962                    return divmod_ret_val
963            return BadInt()
964        with self.assertRaises(TypeError):
965            os.utime(self.fname, ns=(get_bad_int(42), 1))
966        with self.assertRaises(TypeError):
967            os.utime(self.fname, ns=(get_bad_int(()), 1))
968        with self.assertRaises(TypeError):
969            os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
970
971
972from test import mapping_tests
973
974class EnvironTests(mapping_tests.BasicTestMappingProtocol):
975    """check that os.environ object conform to mapping protocol"""
976    type2test = None
977
978    def setUp(self):
979        self.__save = dict(os.environ)
980        if os.supports_bytes_environ:
981            self.__saveb = dict(os.environb)
982        for key, value in self._reference().items():
983            os.environ[key] = value
984
985    def tearDown(self):
986        os.environ.clear()
987        os.environ.update(self.__save)
988        if os.supports_bytes_environ:
989            os.environb.clear()
990            os.environb.update(self.__saveb)
991
992    def _reference(self):
993        return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
994
995    def _empty_mapping(self):
996        os.environ.clear()
997        return os.environ
998
999    # Bug 1110478
1000    @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
1001                         'requires a shell')
1002    @unittest.skipUnless(hasattr(os, 'popen'), "needs os.popen()")
1003    @support.requires_subprocess()
1004    def test_update2(self):
1005        os.environ.clear()
1006        os.environ.update(HELLO="World")
1007        with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
1008            value = popen.read().strip()
1009            self.assertEqual(value, "World")
1010
1011    @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
1012                         'requires a shell')
1013    @unittest.skipUnless(hasattr(os, 'popen'), "needs os.popen()")
1014    @support.requires_subprocess()
1015    def test_os_popen_iter(self):
1016        with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
1017                      % unix_shell) as popen:
1018            it = iter(popen)
1019            self.assertEqual(next(it), "line1\n")
1020            self.assertEqual(next(it), "line2\n")
1021            self.assertEqual(next(it), "line3\n")
1022            self.assertRaises(StopIteration, next, it)
1023
1024    # Verify environ keys and values from the OS are of the
1025    # correct str type.
1026    def test_keyvalue_types(self):
1027        for key, val in os.environ.items():
1028            self.assertEqual(type(key), str)
1029            self.assertEqual(type(val), str)
1030
1031    def test_items(self):
1032        for key, value in self._reference().items():
1033            self.assertEqual(os.environ.get(key), value)
1034
1035    # Issue 7310
1036    def test___repr__(self):
1037        """Check that the repr() of os.environ looks like environ({...})."""
1038        env = os.environ
1039        formatted_items = ", ".join(
1040            f"{key!r}: {value!r}"
1041            for key, value in env.items()
1042        )
1043        self.assertEqual(repr(env), f"environ({{{formatted_items}}})")
1044
1045    def test_get_exec_path(self):
1046        defpath_list = os.defpath.split(os.pathsep)
1047        test_path = ['/monty', '/python', '', '/flying/circus']
1048        test_env = {'PATH': os.pathsep.join(test_path)}
1049
1050        saved_environ = os.environ
1051        try:
1052            os.environ = dict(test_env)
1053            # Test that defaulting to os.environ works.
1054            self.assertSequenceEqual(test_path, os.get_exec_path())
1055            self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
1056        finally:
1057            os.environ = saved_environ
1058
1059        # No PATH environment variable
1060        self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
1061        # Empty PATH environment variable
1062        self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
1063        # Supplied PATH environment variable
1064        self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
1065
1066        if os.supports_bytes_environ:
1067            # env cannot contain 'PATH' and b'PATH' keys
1068            try:
1069                # ignore BytesWarning warning
1070                with warnings.catch_warnings(record=True):
1071                    mixed_env = {'PATH': '1', b'PATH': b'2'}
1072            except BytesWarning:
1073                # mixed_env cannot be created with python -bb
1074                pass
1075            else:
1076                self.assertRaises(ValueError, os.get_exec_path, mixed_env)
1077
1078            # bytes key and/or value
1079            self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
1080                ['abc'])
1081            self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
1082                ['abc'])
1083            self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
1084                ['abc'])
1085
1086    @unittest.skipUnless(os.supports_bytes_environ,
1087                         "os.environb required for this test.")
1088    def test_environb(self):
1089        # os.environ -> os.environb
1090        value = 'euro\u20ac'
1091        try:
1092            value_bytes = value.encode(sys.getfilesystemencoding(),
1093                                       'surrogateescape')
1094        except UnicodeEncodeError:
1095            msg = "U+20AC character is not encodable to %s" % (
1096                sys.getfilesystemencoding(),)
1097            self.skipTest(msg)
1098        os.environ['unicode'] = value
1099        self.assertEqual(os.environ['unicode'], value)
1100        self.assertEqual(os.environb[b'unicode'], value_bytes)
1101
1102        # os.environb -> os.environ
1103        value = b'\xff'
1104        os.environb[b'bytes'] = value
1105        self.assertEqual(os.environb[b'bytes'], value)
1106        value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
1107        self.assertEqual(os.environ['bytes'], value_str)
1108
1109    @support.requires_subprocess()
1110    def test_putenv_unsetenv(self):
1111        name = "PYTHONTESTVAR"
1112        value = "testvalue"
1113        code = f'import os; print(repr(os.environ.get({name!r})))'
1114
1115        with os_helper.EnvironmentVarGuard() as env:
1116            env.pop(name, None)
1117
1118            os.putenv(name, value)
1119            proc = subprocess.run([sys.executable, '-c', code], check=True,
1120                                  stdout=subprocess.PIPE, text=True)
1121            self.assertEqual(proc.stdout.rstrip(), repr(value))
1122
1123            os.unsetenv(name)
1124            proc = subprocess.run([sys.executable, '-c', code], check=True,
1125                                  stdout=subprocess.PIPE, text=True)
1126            self.assertEqual(proc.stdout.rstrip(), repr(None))
1127
1128    # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
1129    @support.requires_mac_ver(10, 6)
1130    def test_putenv_unsetenv_error(self):
1131        # Empty variable name is invalid.
1132        # "=" and null character are not allowed in a variable name.
1133        for name in ('', '=name', 'na=me', 'name=', 'name\0', 'na\0me'):
1134            self.assertRaises((OSError, ValueError), os.putenv, name, "value")
1135            self.assertRaises((OSError, ValueError), os.unsetenv, name)
1136
1137        if sys.platform == "win32":
1138            # On Windows, an environment variable string ("name=value" string)
1139            # is limited to 32,767 characters
1140            longstr = 'x' * 32_768
1141            self.assertRaises(ValueError, os.putenv, longstr, "1")
1142            self.assertRaises(ValueError, os.putenv, "X", longstr)
1143            self.assertRaises(ValueError, os.unsetenv, longstr)
1144
1145    def test_key_type(self):
1146        missing = 'missingkey'
1147        self.assertNotIn(missing, os.environ)
1148
1149        with self.assertRaises(KeyError) as cm:
1150            os.environ[missing]
1151        self.assertIs(cm.exception.args[0], missing)
1152        self.assertTrue(cm.exception.__suppress_context__)
1153
1154        with self.assertRaises(KeyError) as cm:
1155            del os.environ[missing]
1156        self.assertIs(cm.exception.args[0], missing)
1157        self.assertTrue(cm.exception.__suppress_context__)
1158
1159    def _test_environ_iteration(self, collection):
1160        iterator = iter(collection)
1161        new_key = "__new_key__"
1162
1163        next(iterator)  # start iteration over os.environ.items
1164
1165        # add a new key in os.environ mapping
1166        os.environ[new_key] = "test_environ_iteration"
1167
1168        try:
1169            next(iterator)  # force iteration over modified mapping
1170            self.assertEqual(os.environ[new_key], "test_environ_iteration")
1171        finally:
1172            del os.environ[new_key]
1173
1174    def test_iter_error_when_changing_os_environ(self):
1175        self._test_environ_iteration(os.environ)
1176
1177    def test_iter_error_when_changing_os_environ_items(self):
1178        self._test_environ_iteration(os.environ.items())
1179
1180    def test_iter_error_when_changing_os_environ_values(self):
1181        self._test_environ_iteration(os.environ.values())
1182
1183    def _test_underlying_process_env(self, var, expected):
1184        if not (unix_shell and os.path.exists(unix_shell)):
1185            return
1186        elif not support.has_subprocess_support:
1187            return
1188
1189        with os.popen(f"{unix_shell} -c 'echo ${var}'") as popen:
1190            value = popen.read().strip()
1191
1192        self.assertEqual(expected, value)
1193
1194    def test_or_operator(self):
1195        overridden_key = '_TEST_VAR_'
1196        original_value = 'original_value'
1197        os.environ[overridden_key] = original_value
1198
1199        new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1200        expected = dict(os.environ)
1201        expected.update(new_vars_dict)
1202
1203        actual = os.environ | new_vars_dict
1204        self.assertDictEqual(expected, actual)
1205        self.assertEqual('3', actual[overridden_key])
1206
1207        new_vars_items = new_vars_dict.items()
1208        self.assertIs(NotImplemented, os.environ.__or__(new_vars_items))
1209
1210        self._test_underlying_process_env('_A_', '')
1211        self._test_underlying_process_env(overridden_key, original_value)
1212
1213    def test_ior_operator(self):
1214        overridden_key = '_TEST_VAR_'
1215        os.environ[overridden_key] = 'original_value'
1216
1217        new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1218        expected = dict(os.environ)
1219        expected.update(new_vars_dict)
1220
1221        os.environ |= new_vars_dict
1222        self.assertEqual(expected, os.environ)
1223        self.assertEqual('3', os.environ[overridden_key])
1224
1225        self._test_underlying_process_env('_A_', '1')
1226        self._test_underlying_process_env(overridden_key, '3')
1227
1228    def test_ior_operator_invalid_dicts(self):
1229        os_environ_copy = os.environ.copy()
1230        with self.assertRaises(TypeError):
1231            dict_with_bad_key = {1: '_A_'}
1232            os.environ |= dict_with_bad_key
1233
1234        with self.assertRaises(TypeError):
1235            dict_with_bad_val = {'_A_': 1}
1236            os.environ |= dict_with_bad_val
1237
1238        # Check nothing was added.
1239        self.assertEqual(os_environ_copy, os.environ)
1240
1241    def test_ior_operator_key_value_iterable(self):
1242        overridden_key = '_TEST_VAR_'
1243        os.environ[overridden_key] = 'original_value'
1244
1245        new_vars_items = (('_A_', '1'), ('_B_', '2'), (overridden_key, '3'))
1246        expected = dict(os.environ)
1247        expected.update(new_vars_items)
1248
1249        os.environ |= new_vars_items
1250        self.assertEqual(expected, os.environ)
1251        self.assertEqual('3', os.environ[overridden_key])
1252
1253        self._test_underlying_process_env('_A_', '1')
1254        self._test_underlying_process_env(overridden_key, '3')
1255
1256    def test_ror_operator(self):
1257        overridden_key = '_TEST_VAR_'
1258        original_value = 'original_value'
1259        os.environ[overridden_key] = original_value
1260
1261        new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1262        expected = dict(new_vars_dict)
1263        expected.update(os.environ)
1264
1265        actual = new_vars_dict | os.environ
1266        self.assertDictEqual(expected, actual)
1267        self.assertEqual(original_value, actual[overridden_key])
1268
1269        new_vars_items = new_vars_dict.items()
1270        self.assertIs(NotImplemented, os.environ.__ror__(new_vars_items))
1271
1272        self._test_underlying_process_env('_A_', '')
1273        self._test_underlying_process_env(overridden_key, original_value)
1274
1275
1276class WalkTests(unittest.TestCase):
1277    """Tests for os.walk()."""
1278
1279    # Wrapper to hide minor differences between os.walk and os.fwalk
1280    # to tests both functions with the same code base
1281    def walk(self, top, **kwargs):
1282        if 'follow_symlinks' in kwargs:
1283            kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1284        return os.walk(top, **kwargs)
1285
1286    def setUp(self):
1287        join = os.path.join
1288        self.addCleanup(os_helper.rmtree, os_helper.TESTFN)
1289
1290        # Build:
1291        #     TESTFN/
1292        #       TEST1/              a file kid and two directory kids
1293        #         tmp1
1294        #         SUB1/             a file kid and a directory kid
1295        #           tmp2
1296        #           SUB11/          no kids
1297        #         SUB2/             a file kid and a dirsymlink kid
1298        #           tmp3
1299        #           SUB21/          not readable
1300        #             tmp5
1301        #           link/           a symlink to TESTFN.2
1302        #           broken_link
1303        #           broken_link2
1304        #           broken_link3
1305        #       TEST2/
1306        #         tmp4              a lone file
1307        self.walk_path = join(os_helper.TESTFN, "TEST1")
1308        self.sub1_path = join(self.walk_path, "SUB1")
1309        self.sub11_path = join(self.sub1_path, "SUB11")
1310        sub2_path = join(self.walk_path, "SUB2")
1311        sub21_path = join(sub2_path, "SUB21")
1312        tmp1_path = join(self.walk_path, "tmp1")
1313        tmp2_path = join(self.sub1_path, "tmp2")
1314        tmp3_path = join(sub2_path, "tmp3")
1315        tmp5_path = join(sub21_path, "tmp3")
1316        self.link_path = join(sub2_path, "link")
1317        t2_path = join(os_helper.TESTFN, "TEST2")
1318        tmp4_path = join(os_helper.TESTFN, "TEST2", "tmp4")
1319        broken_link_path = join(sub2_path, "broken_link")
1320        broken_link2_path = join(sub2_path, "broken_link2")
1321        broken_link3_path = join(sub2_path, "broken_link3")
1322
1323        # Create stuff.
1324        os.makedirs(self.sub11_path)
1325        os.makedirs(sub2_path)
1326        os.makedirs(sub21_path)
1327        os.makedirs(t2_path)
1328
1329        for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
1330            with open(path, "x", encoding='utf-8') as f:
1331                f.write("I'm " + path + " and proud of it.  Blame test_os.\n")
1332
1333        if os_helper.can_symlink():
1334            os.symlink(os.path.abspath(t2_path), self.link_path)
1335            os.symlink('broken', broken_link_path, True)
1336            os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1337            os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
1338            self.sub2_tree = (sub2_path, ["SUB21", "link"],
1339                              ["broken_link", "broken_link2", "broken_link3",
1340                               "tmp3"])
1341        else:
1342            self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
1343
1344        if not support.is_emscripten:
1345            # Emscripten fails with inaccessible directory
1346            os.chmod(sub21_path, 0)
1347        try:
1348            os.listdir(sub21_path)
1349        except PermissionError:
1350            self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1351        else:
1352            os.chmod(sub21_path, stat.S_IRWXU)
1353            os.unlink(tmp5_path)
1354            os.rmdir(sub21_path)
1355            del self.sub2_tree[1][:1]
1356
1357    def test_walk_topdown(self):
1358        # Walk top-down.
1359        all = list(self.walk(self.walk_path))
1360
1361        self.assertEqual(len(all), 4)
1362        # We can't know which order SUB1 and SUB2 will appear in.
1363        # Not flipped:  TESTFN, SUB1, SUB11, SUB2
1364        #     flipped:  TESTFN, SUB2, SUB1, SUB11
1365        flipped = all[0][1][0] != "SUB1"
1366        all[0][1].sort()
1367        all[3 - 2 * flipped][-1].sort()
1368        all[3 - 2 * flipped][1].sort()
1369        self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1370        self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1371        self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1372        self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
1373
1374    def test_walk_prune(self, walk_path=None):
1375        if walk_path is None:
1376            walk_path = self.walk_path
1377        # Prune the search.
1378        all = []
1379        for root, dirs, files in self.walk(walk_path):
1380            all.append((root, dirs, files))
1381            # Don't descend into SUB1.
1382            if 'SUB1' in dirs:
1383                # Note that this also mutates the dirs we appended to all!
1384                dirs.remove('SUB1')
1385
1386        self.assertEqual(len(all), 2)
1387        self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
1388
1389        all[1][-1].sort()
1390        all[1][1].sort()
1391        self.assertEqual(all[1], self.sub2_tree)
1392
1393    def test_file_like_path(self):
1394        self.test_walk_prune(FakePath(self.walk_path))
1395
1396    def test_walk_bottom_up(self):
1397        # Walk bottom-up.
1398        all = list(self.walk(self.walk_path, topdown=False))
1399
1400        self.assertEqual(len(all), 4, all)
1401        # We can't know which order SUB1 and SUB2 will appear in.
1402        # Not flipped:  SUB11, SUB1, SUB2, TESTFN
1403        #     flipped:  SUB2, SUB11, SUB1, TESTFN
1404        flipped = all[3][1][0] != "SUB1"
1405        all[3][1].sort()
1406        all[2 - 2 * flipped][-1].sort()
1407        all[2 - 2 * flipped][1].sort()
1408        self.assertEqual(all[3],
1409                         (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1410        self.assertEqual(all[flipped],
1411                         (self.sub11_path, [], []))
1412        self.assertEqual(all[flipped + 1],
1413                         (self.sub1_path, ["SUB11"], ["tmp2"]))
1414        self.assertEqual(all[2 - 2 * flipped],
1415                         self.sub2_tree)
1416
1417    def test_walk_symlink(self):
1418        if not os_helper.can_symlink():
1419            self.skipTest("need symlink support")
1420
1421        # Walk, following symlinks.
1422        walk_it = self.walk(self.walk_path, follow_symlinks=True)
1423        for root, dirs, files in walk_it:
1424            if root == self.link_path:
1425                self.assertEqual(dirs, [])
1426                self.assertEqual(files, ["tmp4"])
1427                break
1428        else:
1429            self.fail("Didn't follow symlink with followlinks=True")
1430
1431    def test_walk_bad_dir(self):
1432        # Walk top-down.
1433        errors = []
1434        walk_it = self.walk(self.walk_path, onerror=errors.append)
1435        root, dirs, files = next(walk_it)
1436        self.assertEqual(errors, [])
1437        dir1 = 'SUB1'
1438        path1 = os.path.join(root, dir1)
1439        path1new = os.path.join(root, dir1 + '.new')
1440        os.rename(path1, path1new)
1441        try:
1442            roots = [r for r, d, f in walk_it]
1443            self.assertTrue(errors)
1444            self.assertNotIn(path1, roots)
1445            self.assertNotIn(path1new, roots)
1446            for dir2 in dirs:
1447                if dir2 != dir1:
1448                    self.assertIn(os.path.join(root, dir2), roots)
1449        finally:
1450            os.rename(path1new, path1)
1451
1452    def test_walk_many_open_files(self):
1453        depth = 30
1454        base = os.path.join(os_helper.TESTFN, 'deep')
1455        p = os.path.join(base, *(['d']*depth))
1456        os.makedirs(p)
1457
1458        iters = [self.walk(base, topdown=False) for j in range(100)]
1459        for i in range(depth + 1):
1460            expected = (p, ['d'] if i else [], [])
1461            for it in iters:
1462                self.assertEqual(next(it), expected)
1463            p = os.path.dirname(p)
1464
1465        iters = [self.walk(base, topdown=True) for j in range(100)]
1466        p = base
1467        for i in range(depth + 1):
1468            expected = (p, ['d'] if i < depth else [], [])
1469            for it in iters:
1470                self.assertEqual(next(it), expected)
1471            p = os.path.join(p, 'd')
1472
1473
1474@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1475class FwalkTests(WalkTests):
1476    """Tests for os.fwalk()."""
1477
1478    def walk(self, top, **kwargs):
1479        for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
1480            yield (root, dirs, files)
1481
1482    def fwalk(self, *args, **kwargs):
1483        return os.fwalk(*args, **kwargs)
1484
1485    def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1486        """
1487        compare with walk() results.
1488        """
1489        walk_kwargs = walk_kwargs.copy()
1490        fwalk_kwargs = fwalk_kwargs.copy()
1491        for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1492            walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1493            fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
1494
1495            expected = {}
1496            for root, dirs, files in os.walk(**walk_kwargs):
1497                expected[root] = (set(dirs), set(files))
1498
1499            for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
1500                self.assertIn(root, expected)
1501                self.assertEqual(expected[root], (set(dirs), set(files)))
1502
1503    def test_compare_to_walk(self):
1504        kwargs = {'top': os_helper.TESTFN}
1505        self._compare_to_walk(kwargs, kwargs)
1506
1507    def test_dir_fd(self):
1508        try:
1509            fd = os.open(".", os.O_RDONLY)
1510            walk_kwargs = {'top': os_helper.TESTFN}
1511            fwalk_kwargs = walk_kwargs.copy()
1512            fwalk_kwargs['dir_fd'] = fd
1513            self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1514        finally:
1515            os.close(fd)
1516
1517    def test_yields_correct_dir_fd(self):
1518        # check returned file descriptors
1519        for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1520            args = os_helper.TESTFN, topdown, None
1521            for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
1522                # check that the FD is valid
1523                os.fstat(rootfd)
1524                # redundant check
1525                os.stat(rootfd)
1526                # check that listdir() returns consistent information
1527                self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
1528
1529    @unittest.skipIf(
1530        support.is_emscripten, "Cannot dup stdout on Emscripten"
1531    )
1532    def test_fd_leak(self):
1533        # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1534        # we both check that calling fwalk() a large number of times doesn't
1535        # yield EMFILE, and that the minimum allocated FD hasn't changed.
1536        minfd = os.dup(1)
1537        os.close(minfd)
1538        for i in range(256):
1539            for x in self.fwalk(os_helper.TESTFN):
1540                pass
1541        newfd = os.dup(1)
1542        self.addCleanup(os.close, newfd)
1543        self.assertEqual(newfd, minfd)
1544
1545    # fwalk() keeps file descriptors open
1546    test_walk_many_open_files = None
1547
1548
1549class BytesWalkTests(WalkTests):
1550    """Tests for os.walk() with bytes."""
1551    def walk(self, top, **kwargs):
1552        if 'follow_symlinks' in kwargs:
1553            kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1554        for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1555            root = os.fsdecode(broot)
1556            dirs = list(map(os.fsdecode, bdirs))
1557            files = list(map(os.fsdecode, bfiles))
1558            yield (root, dirs, files)
1559            bdirs[:] = list(map(os.fsencode, dirs))
1560            bfiles[:] = list(map(os.fsencode, files))
1561
1562@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1563class BytesFwalkTests(FwalkTests):
1564    """Tests for os.walk() with bytes."""
1565    def fwalk(self, top='.', *args, **kwargs):
1566        for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1567            root = os.fsdecode(broot)
1568            dirs = list(map(os.fsdecode, bdirs))
1569            files = list(map(os.fsdecode, bfiles))
1570            yield (root, dirs, files, topfd)
1571            bdirs[:] = list(map(os.fsencode, dirs))
1572            bfiles[:] = list(map(os.fsencode, files))
1573
1574
1575class MakedirTests(unittest.TestCase):
1576    def setUp(self):
1577        os.mkdir(os_helper.TESTFN)
1578
1579    def test_makedir(self):
1580        base = os_helper.TESTFN
1581        path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1582        os.makedirs(path)             # Should work
1583        path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1584        os.makedirs(path)
1585
1586        # Try paths with a '.' in them
1587        self.assertRaises(OSError, os.makedirs, os.curdir)
1588        path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1589        os.makedirs(path)
1590        path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1591                            'dir5', 'dir6')
1592        os.makedirs(path)
1593
1594    @unittest.skipIf(
1595        support.is_emscripten or support.is_wasi,
1596        "Emscripten's/WASI's umask is a stub."
1597    )
1598    def test_mode(self):
1599        with os_helper.temp_umask(0o002):
1600            base = os_helper.TESTFN
1601            parent = os.path.join(base, 'dir1')
1602            path = os.path.join(parent, 'dir2')
1603            os.makedirs(path, 0o555)
1604            self.assertTrue(os.path.exists(path))
1605            self.assertTrue(os.path.isdir(path))
1606            if os.name != 'nt':
1607                self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1608                self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
1609
1610    @unittest.skipIf(
1611        support.is_emscripten or support.is_wasi,
1612        "Emscripten's/WASI's umask is a stub."
1613    )
1614    def test_exist_ok_existing_directory(self):
1615        path = os.path.join(os_helper.TESTFN, 'dir1')
1616        mode = 0o777
1617        old_mask = os.umask(0o022)
1618        os.makedirs(path, mode)
1619        self.assertRaises(OSError, os.makedirs, path, mode)
1620        self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
1621        os.makedirs(path, 0o776, exist_ok=True)
1622        os.makedirs(path, mode=mode, exist_ok=True)
1623        os.umask(old_mask)
1624
1625        # Issue #25583: A drive root could raise PermissionError on Windows
1626        os.makedirs(os.path.abspath('/'), exist_ok=True)
1627
1628    @unittest.skipIf(
1629        support.is_emscripten or support.is_wasi,
1630        "Emscripten's/WASI's umask is a stub."
1631    )
1632    def test_exist_ok_s_isgid_directory(self):
1633        path = os.path.join(os_helper.TESTFN, 'dir1')
1634        S_ISGID = stat.S_ISGID
1635        mode = 0o777
1636        old_mask = os.umask(0o022)
1637        try:
1638            existing_testfn_mode = stat.S_IMODE(
1639                    os.lstat(os_helper.TESTFN).st_mode)
1640            try:
1641                os.chmod(os_helper.TESTFN, existing_testfn_mode | S_ISGID)
1642            except PermissionError:
1643                raise unittest.SkipTest('Cannot set S_ISGID for dir.')
1644            if (os.lstat(os_helper.TESTFN).st_mode & S_ISGID != S_ISGID):
1645                raise unittest.SkipTest('No support for S_ISGID dir mode.')
1646            # The os should apply S_ISGID from the parent dir for us, but
1647            # this test need not depend on that behavior.  Be explicit.
1648            os.makedirs(path, mode | S_ISGID)
1649            # http://bugs.python.org/issue14992
1650            # Should not fail when the bit is already set.
1651            os.makedirs(path, mode, exist_ok=True)
1652            # remove the bit.
1653            os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
1654            # May work even when the bit is not already set when demanded.
1655            os.makedirs(path, mode | S_ISGID, exist_ok=True)
1656        finally:
1657            os.umask(old_mask)
1658
1659    def test_exist_ok_existing_regular_file(self):
1660        base = os_helper.TESTFN
1661        path = os.path.join(os_helper.TESTFN, 'dir1')
1662        with open(path, 'w', encoding='utf-8') as f:
1663            f.write('abc')
1664        self.assertRaises(OSError, os.makedirs, path)
1665        self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1666        self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1667        os.remove(path)
1668
1669    @unittest.skipUnless(os.name == 'nt', "requires Windows")
1670    def test_win32_mkdir_700(self):
1671        base = os_helper.TESTFN
1672        path = os.path.abspath(os.path.join(os_helper.TESTFN, 'dir'))
1673        os.mkdir(path, mode=0o700)
1674        out = subprocess.check_output(["cacls.exe", path, "/s"], encoding="oem")
1675        os.rmdir(path)
1676        self.assertEqual(
1677            out.strip(),
1678            f'{path} "D:P(A;OICI;FA;;;SY)(A;OICI;FA;;;BA)(A;OICI;FA;;;OW)"',
1679        )
1680
1681    def tearDown(self):
1682        path = os.path.join(os_helper.TESTFN, 'dir1', 'dir2', 'dir3',
1683                            'dir4', 'dir5', 'dir6')
1684        # If the tests failed, the bottom-most directory ('../dir6')
1685        # may not have been created, so we look for the outermost directory
1686        # that exists.
1687        while not os.path.exists(path) and path != os_helper.TESTFN:
1688            path = os.path.dirname(path)
1689
1690        os.removedirs(path)
1691
1692
1693@os_helper.skip_unless_working_chmod
1694class ChownFileTests(unittest.TestCase):
1695
1696    @classmethod
1697    def setUpClass(cls):
1698        os.mkdir(os_helper.TESTFN)
1699
1700    def test_chown_uid_gid_arguments_must_be_index(self):
1701        stat = os.stat(os_helper.TESTFN)
1702        uid = stat.st_uid
1703        gid = stat.st_gid
1704        for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1705            self.assertRaises(TypeError, os.chown, os_helper.TESTFN, value, gid)
1706            self.assertRaises(TypeError, os.chown, os_helper.TESTFN, uid, value)
1707        self.assertIsNone(os.chown(os_helper.TESTFN, uid, gid))
1708        self.assertIsNone(os.chown(os_helper.TESTFN, -1, -1))
1709
1710    @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1711    def test_chown_gid(self):
1712        groups = os.getgroups()
1713        if len(groups) < 2:
1714            self.skipTest("test needs at least 2 groups")
1715
1716        gid_1, gid_2 = groups[:2]
1717        uid = os.stat(os_helper.TESTFN).st_uid
1718
1719        os.chown(os_helper.TESTFN, uid, gid_1)
1720        gid = os.stat(os_helper.TESTFN).st_gid
1721        self.assertEqual(gid, gid_1)
1722
1723        os.chown(os_helper.TESTFN, uid, gid_2)
1724        gid = os.stat(os_helper.TESTFN).st_gid
1725        self.assertEqual(gid, gid_2)
1726
1727    @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1728                         "test needs root privilege and more than one user")
1729    def test_chown_with_root(self):
1730        uid_1, uid_2 = all_users[:2]
1731        gid = os.stat(os_helper.TESTFN).st_gid
1732        os.chown(os_helper.TESTFN, uid_1, gid)
1733        uid = os.stat(os_helper.TESTFN).st_uid
1734        self.assertEqual(uid, uid_1)
1735        os.chown(os_helper.TESTFN, uid_2, gid)
1736        uid = os.stat(os_helper.TESTFN).st_uid
1737        self.assertEqual(uid, uid_2)
1738
1739    @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1740                         "test needs non-root account and more than one user")
1741    def test_chown_without_permission(self):
1742        uid_1, uid_2 = all_users[:2]
1743        gid = os.stat(os_helper.TESTFN).st_gid
1744        with self.assertRaises(PermissionError):
1745            os.chown(os_helper.TESTFN, uid_1, gid)
1746            os.chown(os_helper.TESTFN, uid_2, gid)
1747
1748    @classmethod
1749    def tearDownClass(cls):
1750        os.rmdir(os_helper.TESTFN)
1751
1752
1753class RemoveDirsTests(unittest.TestCase):
1754    def setUp(self):
1755        os.makedirs(os_helper.TESTFN)
1756
1757    def tearDown(self):
1758        os_helper.rmtree(os_helper.TESTFN)
1759
1760    def test_remove_all(self):
1761        dira = os.path.join(os_helper.TESTFN, 'dira')
1762        os.mkdir(dira)
1763        dirb = os.path.join(dira, 'dirb')
1764        os.mkdir(dirb)
1765        os.removedirs(dirb)
1766        self.assertFalse(os.path.exists(dirb))
1767        self.assertFalse(os.path.exists(dira))
1768        self.assertFalse(os.path.exists(os_helper.TESTFN))
1769
1770    def test_remove_partial(self):
1771        dira = os.path.join(os_helper.TESTFN, 'dira')
1772        os.mkdir(dira)
1773        dirb = os.path.join(dira, 'dirb')
1774        os.mkdir(dirb)
1775        create_file(os.path.join(dira, 'file.txt'))
1776        os.removedirs(dirb)
1777        self.assertFalse(os.path.exists(dirb))
1778        self.assertTrue(os.path.exists(dira))
1779        self.assertTrue(os.path.exists(os_helper.TESTFN))
1780
1781    def test_remove_nothing(self):
1782        dira = os.path.join(os_helper.TESTFN, 'dira')
1783        os.mkdir(dira)
1784        dirb = os.path.join(dira, 'dirb')
1785        os.mkdir(dirb)
1786        create_file(os.path.join(dirb, 'file.txt'))
1787        with self.assertRaises(OSError):
1788            os.removedirs(dirb)
1789        self.assertTrue(os.path.exists(dirb))
1790        self.assertTrue(os.path.exists(dira))
1791        self.assertTrue(os.path.exists(os_helper.TESTFN))
1792
1793
1794@unittest.skipIf(support.is_wasi, "WASI has no /dev/null")
1795class DevNullTests(unittest.TestCase):
1796    def test_devnull(self):
1797        with open(os.devnull, 'wb', 0) as f:
1798            f.write(b'hello')
1799            f.close()
1800        with open(os.devnull, 'rb') as f:
1801            self.assertEqual(f.read(), b'')
1802
1803
1804class URandomTests(unittest.TestCase):
1805    def test_urandom_length(self):
1806        self.assertEqual(len(os.urandom(0)), 0)
1807        self.assertEqual(len(os.urandom(1)), 1)
1808        self.assertEqual(len(os.urandom(10)), 10)
1809        self.assertEqual(len(os.urandom(100)), 100)
1810        self.assertEqual(len(os.urandom(1000)), 1000)
1811
1812    def test_urandom_value(self):
1813        data1 = os.urandom(16)
1814        self.assertIsInstance(data1, bytes)
1815        data2 = os.urandom(16)
1816        self.assertNotEqual(data1, data2)
1817
1818    def get_urandom_subprocess(self, count):
1819        code = '\n'.join((
1820            'import os, sys',
1821            'data = os.urandom(%s)' % count,
1822            'sys.stdout.buffer.write(data)',
1823            'sys.stdout.buffer.flush()'))
1824        out = assert_python_ok('-c', code)
1825        stdout = out[1]
1826        self.assertEqual(len(stdout), count)
1827        return stdout
1828
1829    def test_urandom_subprocess(self):
1830        data1 = self.get_urandom_subprocess(16)
1831        data2 = self.get_urandom_subprocess(16)
1832        self.assertNotEqual(data1, data2)
1833
1834
1835@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1836class GetRandomTests(unittest.TestCase):
1837    @classmethod
1838    def setUpClass(cls):
1839        try:
1840            os.getrandom(1)
1841        except OSError as exc:
1842            if exc.errno == errno.ENOSYS:
1843                # Python compiled on a more recent Linux version
1844                # than the current Linux kernel
1845                raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1846            else:
1847                raise
1848
1849    def test_getrandom_type(self):
1850        data = os.getrandom(16)
1851        self.assertIsInstance(data, bytes)
1852        self.assertEqual(len(data), 16)
1853
1854    def test_getrandom0(self):
1855        empty = os.getrandom(0)
1856        self.assertEqual(empty, b'')
1857
1858    def test_getrandom_random(self):
1859        self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1860
1861        # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1862        # resource /dev/random
1863
1864    def test_getrandom_nonblock(self):
1865        # The call must not fail. Check also that the flag exists
1866        try:
1867            os.getrandom(1, os.GRND_NONBLOCK)
1868        except BlockingIOError:
1869            # System urandom is not initialized yet
1870            pass
1871
1872    def test_getrandom_value(self):
1873        data1 = os.getrandom(16)
1874        data2 = os.getrandom(16)
1875        self.assertNotEqual(data1, data2)
1876
1877
1878# os.urandom() doesn't use a file descriptor when it is implemented with the
1879# getentropy() function, the getrandom() function or the getrandom() syscall
1880OS_URANDOM_DONT_USE_FD = (
1881    sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1882    or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1883    or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
1884
1885@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1886                 "os.random() does not use a file descriptor")
1887@unittest.skipIf(sys.platform == "vxworks",
1888                 "VxWorks can't set RLIMIT_NOFILE to 1")
1889class URandomFDTests(unittest.TestCase):
1890    @unittest.skipUnless(resource, "test requires the resource module")
1891    def test_urandom_failure(self):
1892        # Check urandom() failing when it is not able to open /dev/random.
1893        # We spawn a new process to make the test more robust (if getrlimit()
1894        # failed to restore the file descriptor limit after this, the whole
1895        # test suite would crash; this actually happened on the OS X Tiger
1896        # buildbot).
1897        code = """if 1:
1898            import errno
1899            import os
1900            import resource
1901
1902            soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1903            resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1904            try:
1905                os.urandom(16)
1906            except OSError as e:
1907                assert e.errno == errno.EMFILE, e.errno
1908            else:
1909                raise AssertionError("OSError not raised")
1910            """
1911        assert_python_ok('-c', code)
1912
1913    def test_urandom_fd_closed(self):
1914        # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1915        # closed.
1916        code = """if 1:
1917            import os
1918            import sys
1919            import test.support
1920            os.urandom(4)
1921            with test.support.SuppressCrashReport():
1922                os.closerange(3, 256)
1923            sys.stdout.buffer.write(os.urandom(4))
1924            """
1925        rc, out, err = assert_python_ok('-Sc', code)
1926
1927    def test_urandom_fd_reopened(self):
1928        # Issue #21207: urandom() should detect its fd to /dev/urandom
1929        # changed to something else, and reopen it.
1930        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1931        create_file(os_helper.TESTFN, b"x" * 256)
1932
1933        code = """if 1:
1934            import os
1935            import sys
1936            import test.support
1937            os.urandom(4)
1938            with test.support.SuppressCrashReport():
1939                for fd in range(3, 256):
1940                    try:
1941                        os.close(fd)
1942                    except OSError:
1943                        pass
1944                    else:
1945                        # Found the urandom fd (XXX hopefully)
1946                        break
1947                os.closerange(3, 256)
1948            with open({TESTFN!r}, 'rb') as f:
1949                new_fd = f.fileno()
1950                # Issue #26935: posix allows new_fd and fd to be equal but
1951                # some libc implementations have dup2 return an error in this
1952                # case.
1953                if new_fd != fd:
1954                    os.dup2(new_fd, fd)
1955                sys.stdout.buffer.write(os.urandom(4))
1956                sys.stdout.buffer.write(os.urandom(4))
1957            """.format(TESTFN=os_helper.TESTFN)
1958        rc, out, err = assert_python_ok('-Sc', code)
1959        self.assertEqual(len(out), 8)
1960        self.assertNotEqual(out[0:4], out[4:8])
1961        rc, out2, err2 = assert_python_ok('-Sc', code)
1962        self.assertEqual(len(out2), 8)
1963        self.assertNotEqual(out2, out)
1964
1965
1966@contextlib.contextmanager
1967def _execvpe_mockup(defpath=None):
1968    """
1969    Stubs out execv and execve functions when used as context manager.
1970    Records exec calls. The mock execv and execve functions always raise an
1971    exception as they would normally never return.
1972    """
1973    # A list of tuples containing (function name, first arg, args)
1974    # of calls to execv or execve that have been made.
1975    calls = []
1976
1977    def mock_execv(name, *args):
1978        calls.append(('execv', name, args))
1979        raise RuntimeError("execv called")
1980
1981    def mock_execve(name, *args):
1982        calls.append(('execve', name, args))
1983        raise OSError(errno.ENOTDIR, "execve called")
1984
1985    try:
1986        orig_execv = os.execv
1987        orig_execve = os.execve
1988        orig_defpath = os.defpath
1989        os.execv = mock_execv
1990        os.execve = mock_execve
1991        if defpath is not None:
1992            os.defpath = defpath
1993        yield calls
1994    finally:
1995        os.execv = orig_execv
1996        os.execve = orig_execve
1997        os.defpath = orig_defpath
1998
1999@unittest.skipUnless(hasattr(os, 'execv'),
2000                     "need os.execv()")
2001class ExecTests(unittest.TestCase):
2002    @unittest.skipIf(USING_LINUXTHREADS,
2003                     "avoid triggering a linuxthreads bug: see issue #4970")
2004    def test_execvpe_with_bad_program(self):
2005        self.assertRaises(OSError, os.execvpe, 'no such app-',
2006                          ['no such app-'], None)
2007
2008    def test_execv_with_bad_arglist(self):
2009        self.assertRaises(ValueError, os.execv, 'notepad', ())
2010        self.assertRaises(ValueError, os.execv, 'notepad', [])
2011        self.assertRaises(ValueError, os.execv, 'notepad', ('',))
2012        self.assertRaises(ValueError, os.execv, 'notepad', [''])
2013
2014    def test_execvpe_with_bad_arglist(self):
2015        self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
2016        self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
2017        self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
2018
2019    @unittest.skipUnless(hasattr(os, '_execvpe'),
2020                         "No internal os._execvpe function to test.")
2021    def _test_internal_execvpe(self, test_type):
2022        program_path = os.sep + 'absolutepath'
2023        if test_type is bytes:
2024            program = b'executable'
2025            fullpath = os.path.join(os.fsencode(program_path), program)
2026            native_fullpath = fullpath
2027            arguments = [b'progname', 'arg1', 'arg2']
2028        else:
2029            program = 'executable'
2030            arguments = ['progname', 'arg1', 'arg2']
2031            fullpath = os.path.join(program_path, program)
2032            if os.name != "nt":
2033                native_fullpath = os.fsencode(fullpath)
2034            else:
2035                native_fullpath = fullpath
2036        env = {'spam': 'beans'}
2037
2038        # test os._execvpe() with an absolute path
2039        with _execvpe_mockup() as calls:
2040            self.assertRaises(RuntimeError,
2041                os._execvpe, fullpath, arguments)
2042            self.assertEqual(len(calls), 1)
2043            self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
2044
2045        # test os._execvpe() with a relative path:
2046        # os.get_exec_path() returns defpath
2047        with _execvpe_mockup(defpath=program_path) as calls:
2048            self.assertRaises(OSError,
2049                os._execvpe, program, arguments, env=env)
2050            self.assertEqual(len(calls), 1)
2051            self.assertSequenceEqual(calls[0],
2052                ('execve', native_fullpath, (arguments, env)))
2053
2054        # test os._execvpe() with a relative path:
2055        # os.get_exec_path() reads the 'PATH' variable
2056        with _execvpe_mockup() as calls:
2057            env_path = env.copy()
2058            if test_type is bytes:
2059                env_path[b'PATH'] = program_path
2060            else:
2061                env_path['PATH'] = program_path
2062            self.assertRaises(OSError,
2063                os._execvpe, program, arguments, env=env_path)
2064            self.assertEqual(len(calls), 1)
2065            self.assertSequenceEqual(calls[0],
2066                ('execve', native_fullpath, (arguments, env_path)))
2067
2068    def test_internal_execvpe_str(self):
2069        self._test_internal_execvpe(str)
2070        if os.name != "nt":
2071            self._test_internal_execvpe(bytes)
2072
2073    def test_execve_invalid_env(self):
2074        args = [sys.executable, '-c', 'pass']
2075
2076        # null character in the environment variable name
2077        newenv = os.environ.copy()
2078        newenv["FRUIT\0VEGETABLE"] = "cabbage"
2079        with self.assertRaises(ValueError):
2080            os.execve(args[0], args, newenv)
2081
2082        # null character in the environment variable value
2083        newenv = os.environ.copy()
2084        newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2085        with self.assertRaises(ValueError):
2086            os.execve(args[0], args, newenv)
2087
2088        # equal character in the environment variable name
2089        newenv = os.environ.copy()
2090        newenv["FRUIT=ORANGE"] = "lemon"
2091        with self.assertRaises(ValueError):
2092            os.execve(args[0], args, newenv)
2093
2094    @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
2095    def test_execve_with_empty_path(self):
2096        # bpo-32890: Check GetLastError() misuse
2097        try:
2098            os.execve('', ['arg'], {})
2099        except OSError as e:
2100            self.assertTrue(e.winerror is None or e.winerror != 0)
2101        else:
2102            self.fail('No OSError raised')
2103
2104
2105@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2106class Win32ErrorTests(unittest.TestCase):
2107    def setUp(self):
2108        try:
2109            os.stat(os_helper.TESTFN)
2110        except FileNotFoundError:
2111            exists = False
2112        except OSError as exc:
2113            exists = True
2114            self.fail("file %s must not exist; os.stat failed with %s"
2115                      % (os_helper.TESTFN, exc))
2116        else:
2117            self.fail("file %s must not exist" % os_helper.TESTFN)
2118
2119    def test_rename(self):
2120        self.assertRaises(OSError, os.rename, os_helper.TESTFN, os_helper.TESTFN+".bak")
2121
2122    def test_remove(self):
2123        self.assertRaises(OSError, os.remove, os_helper.TESTFN)
2124
2125    def test_chdir(self):
2126        self.assertRaises(OSError, os.chdir, os_helper.TESTFN)
2127
2128    def test_mkdir(self):
2129        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
2130
2131        with open(os_helper.TESTFN, "x") as f:
2132            self.assertRaises(OSError, os.mkdir, os_helper.TESTFN)
2133
2134    def test_utime(self):
2135        self.assertRaises(OSError, os.utime, os_helper.TESTFN, None)
2136
2137    def test_chmod(self):
2138        self.assertRaises(OSError, os.chmod, os_helper.TESTFN, 0)
2139
2140
2141@unittest.skipIf(support.is_wasi, "Cannot create invalid FD on WASI.")
2142class TestInvalidFD(unittest.TestCase):
2143    singles = ["fchdir", "dup", "fdatasync", "fstat",
2144               "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
2145    #singles.append("close")
2146    #We omit close because it doesn't raise an exception on some platforms
2147    def get_single(f):
2148        def helper(self):
2149            if  hasattr(os, f):
2150                self.check(getattr(os, f))
2151        return helper
2152    for f in singles:
2153        locals()["test_"+f] = get_single(f)
2154
2155    def check(self, f, *args, **kwargs):
2156        try:
2157            f(os_helper.make_bad_fd(), *args, **kwargs)
2158        except OSError as e:
2159            self.assertEqual(e.errno, errno.EBADF)
2160        else:
2161            self.fail("%r didn't raise an OSError with a bad file descriptor"
2162                      % f)
2163
2164    def test_fdopen(self):
2165        self.check(os.fdopen, encoding="utf-8")
2166
2167    @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
2168    def test_isatty(self):
2169        self.assertEqual(os.isatty(os_helper.make_bad_fd()), False)
2170
2171    @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
2172    def test_closerange(self):
2173        fd = os_helper.make_bad_fd()
2174        # Make sure none of the descriptors we are about to close are
2175        # currently valid (issue 6542).
2176        for i in range(10):
2177            try: os.fstat(fd+i)
2178            except OSError:
2179                pass
2180            else:
2181                break
2182        if i < 2:
2183            raise unittest.SkipTest(
2184                "Unable to acquire a range of invalid file descriptors")
2185        self.assertEqual(os.closerange(fd, fd + i-1), None)
2186
2187    @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
2188    def test_dup2(self):
2189        self.check(os.dup2, 20)
2190
2191    @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
2192    @unittest.skipIf(
2193        support.is_emscripten,
2194        "dup2() with negative fds is broken on Emscripten (see gh-102179)"
2195    )
2196    def test_dup2_negative_fd(self):
2197        valid_fd = os.open(__file__, os.O_RDONLY)
2198        self.addCleanup(os.close, valid_fd)
2199        fds = [
2200            valid_fd,
2201            -1,
2202            -2**31,
2203        ]
2204        for fd, fd2 in itertools.product(fds, repeat=2):
2205            if fd != fd2:
2206                with self.subTest(fd=fd, fd2=fd2):
2207                    with self.assertRaises(OSError) as ctx:
2208                        os.dup2(fd, fd2)
2209                    self.assertEqual(ctx.exception.errno, errno.EBADF)
2210
2211    @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
2212    def test_fchmod(self):
2213        self.check(os.fchmod, 0)
2214
2215    @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
2216    def test_fchown(self):
2217        self.check(os.fchown, -1, -1)
2218
2219    @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
2220    @unittest.skipIf(
2221        support.is_emscripten or support.is_wasi,
2222        "musl libc issue on Emscripten/WASI, bpo-46390"
2223    )
2224    def test_fpathconf(self):
2225        self.check(os.pathconf, "PC_NAME_MAX")
2226        self.check(os.fpathconf, "PC_NAME_MAX")
2227
2228    @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
2229    def test_ftruncate(self):
2230        self.check(os.truncate, 0)
2231        self.check(os.ftruncate, 0)
2232
2233    @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
2234    def test_lseek(self):
2235        self.check(os.lseek, 0, 0)
2236
2237    @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
2238    def test_read(self):
2239        self.check(os.read, 1)
2240
2241    @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
2242    def test_readv(self):
2243        buf = bytearray(10)
2244        self.check(os.readv, [buf])
2245
2246    @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
2247    def test_tcsetpgrpt(self):
2248        self.check(os.tcsetpgrp, 0)
2249
2250    @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
2251    def test_write(self):
2252        self.check(os.write, b" ")
2253
2254    @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
2255    def test_writev(self):
2256        self.check(os.writev, [b'abc'])
2257
2258    @support.requires_subprocess()
2259    def test_inheritable(self):
2260        self.check(os.get_inheritable)
2261        self.check(os.set_inheritable, True)
2262
2263    @unittest.skipUnless(hasattr(os, 'get_blocking'),
2264                         'needs os.get_blocking() and os.set_blocking()')
2265    def test_blocking(self):
2266        self.check(os.get_blocking)
2267        self.check(os.set_blocking, True)
2268
2269
2270@unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
2271class LinkTests(unittest.TestCase):
2272    def setUp(self):
2273        self.file1 = os_helper.TESTFN
2274        self.file2 = os.path.join(os_helper.TESTFN + "2")
2275
2276    def tearDown(self):
2277        for file in (self.file1, self.file2):
2278            if os.path.exists(file):
2279                os.unlink(file)
2280
2281    def _test_link(self, file1, file2):
2282        create_file(file1)
2283
2284        try:
2285            os.link(file1, file2)
2286        except PermissionError as e:
2287            self.skipTest('os.link(): %s' % e)
2288        with open(file1, "rb") as f1, open(file2, "rb") as f2:
2289            self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
2290
2291    def test_link(self):
2292        self._test_link(self.file1, self.file2)
2293
2294    def test_link_bytes(self):
2295        self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
2296                        bytes(self.file2, sys.getfilesystemencoding()))
2297
2298    def test_unicode_name(self):
2299        try:
2300            os.fsencode("\xf1")
2301        except UnicodeError:
2302            raise unittest.SkipTest("Unable to encode for this platform.")
2303
2304        self.file1 += "\xf1"
2305        self.file2 = self.file1 + "2"
2306        self._test_link(self.file1, self.file2)
2307
2308@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2309class PosixUidGidTests(unittest.TestCase):
2310    # uid_t and gid_t are 32-bit unsigned integers on Linux
2311    UID_OVERFLOW = (1 << 32)
2312    GID_OVERFLOW = (1 << 32)
2313
2314    @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
2315    def test_setuid(self):
2316        if os.getuid() != 0:
2317            self.assertRaises(OSError, os.setuid, 0)
2318        self.assertRaises(TypeError, os.setuid, 'not an int')
2319        self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
2320
2321    @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
2322    def test_setgid(self):
2323        if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2324            self.assertRaises(OSError, os.setgid, 0)
2325        self.assertRaises(TypeError, os.setgid, 'not an int')
2326        self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
2327
2328    @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
2329    def test_seteuid(self):
2330        if os.getuid() != 0:
2331            self.assertRaises(OSError, os.seteuid, 0)
2332        self.assertRaises(TypeError, os.setegid, 'not an int')
2333        self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
2334
2335    @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
2336    def test_setegid(self):
2337        if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2338            self.assertRaises(OSError, os.setegid, 0)
2339        self.assertRaises(TypeError, os.setegid, 'not an int')
2340        self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
2341
2342    @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2343    def test_setreuid(self):
2344        if os.getuid() != 0:
2345            self.assertRaises(OSError, os.setreuid, 0, 0)
2346        self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
2347        self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
2348        self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
2349        self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
2350
2351    @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2352    @support.requires_subprocess()
2353    def test_setreuid_neg1(self):
2354        # Needs to accept -1.  We run this in a subprocess to avoid
2355        # altering the test runner's process state (issue8045).
2356        subprocess.check_call([
2357                sys.executable, '-c',
2358                'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
2359
2360    @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2361    @support.requires_subprocess()
2362    def test_setregid(self):
2363        if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2364            self.assertRaises(OSError, os.setregid, 0, 0)
2365        self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2366        self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2367        self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2368        self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
2369
2370    @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2371    @support.requires_subprocess()
2372    def test_setregid_neg1(self):
2373        # Needs to accept -1.  We run this in a subprocess to avoid
2374        # altering the test runner's process state (issue8045).
2375        subprocess.check_call([
2376                sys.executable, '-c',
2377                'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
2378
2379@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2380class Pep383Tests(unittest.TestCase):
2381    def setUp(self):
2382        if os_helper.TESTFN_UNENCODABLE:
2383            self.dir = os_helper.TESTFN_UNENCODABLE
2384        elif os_helper.TESTFN_NONASCII:
2385            self.dir = os_helper.TESTFN_NONASCII
2386        else:
2387            self.dir = os_helper.TESTFN
2388        self.bdir = os.fsencode(self.dir)
2389
2390        bytesfn = []
2391        def add_filename(fn):
2392            try:
2393                fn = os.fsencode(fn)
2394            except UnicodeEncodeError:
2395                return
2396            bytesfn.append(fn)
2397        add_filename(os_helper.TESTFN_UNICODE)
2398        if os_helper.TESTFN_UNENCODABLE:
2399            add_filename(os_helper.TESTFN_UNENCODABLE)
2400        if os_helper.TESTFN_NONASCII:
2401            add_filename(os_helper.TESTFN_NONASCII)
2402        if not bytesfn:
2403            self.skipTest("couldn't create any non-ascii filename")
2404
2405        self.unicodefn = set()
2406        os.mkdir(self.dir)
2407        try:
2408            for fn in bytesfn:
2409                os_helper.create_empty_file(os.path.join(self.bdir, fn))
2410                fn = os.fsdecode(fn)
2411                if fn in self.unicodefn:
2412                    raise ValueError("duplicate filename")
2413                self.unicodefn.add(fn)
2414        except:
2415            shutil.rmtree(self.dir)
2416            raise
2417
2418    def tearDown(self):
2419        shutil.rmtree(self.dir)
2420
2421    def test_listdir(self):
2422        expected = self.unicodefn
2423        found = set(os.listdir(self.dir))
2424        self.assertEqual(found, expected)
2425        # test listdir without arguments
2426        current_directory = os.getcwd()
2427        try:
2428            os.chdir(os.sep)
2429            self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2430        finally:
2431            os.chdir(current_directory)
2432
2433    def test_open(self):
2434        for fn in self.unicodefn:
2435            f = open(os.path.join(self.dir, fn), 'rb')
2436            f.close()
2437
2438    @unittest.skipUnless(hasattr(os, 'statvfs'),
2439                            "need os.statvfs()")
2440    def test_statvfs(self):
2441        # issue #9645
2442        for fn in self.unicodefn:
2443            # should not fail with file not found error
2444            fullname = os.path.join(self.dir, fn)
2445            os.statvfs(fullname)
2446
2447    def test_stat(self):
2448        for fn in self.unicodefn:
2449            os.stat(os.path.join(self.dir, fn))
2450
2451@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2452class Win32KillTests(unittest.TestCase):
2453    def _kill(self, sig):
2454        # Start sys.executable as a subprocess and communicate from the
2455        # subprocess to the parent that the interpreter is ready. When it
2456        # becomes ready, send *sig* via os.kill to the subprocess and check
2457        # that the return code is equal to *sig*.
2458        import ctypes
2459        from ctypes import wintypes
2460        import msvcrt
2461
2462        # Since we can't access the contents of the process' stdout until the
2463        # process has exited, use PeekNamedPipe to see what's inside stdout
2464        # without waiting. This is done so we can tell that the interpreter
2465        # is started and running at a point where it could handle a signal.
2466        PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2467        PeekNamedPipe.restype = wintypes.BOOL
2468        PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2469                                  ctypes.POINTER(ctypes.c_char), # stdout buf
2470                                  wintypes.DWORD, # Buffer size
2471                                  ctypes.POINTER(wintypes.DWORD), # bytes read
2472                                  ctypes.POINTER(wintypes.DWORD), # bytes avail
2473                                  ctypes.POINTER(wintypes.DWORD)) # bytes left
2474        msg = "running"
2475        proc = subprocess.Popen([sys.executable, "-c",
2476                                 "import sys;"
2477                                 "sys.stdout.write('{}');"
2478                                 "sys.stdout.flush();"
2479                                 "input()".format(msg)],
2480                                stdout=subprocess.PIPE,
2481                                stderr=subprocess.PIPE,
2482                                stdin=subprocess.PIPE)
2483        self.addCleanup(proc.stdout.close)
2484        self.addCleanup(proc.stderr.close)
2485        self.addCleanup(proc.stdin.close)
2486
2487        count, max = 0, 100
2488        while count < max and proc.poll() is None:
2489            # Create a string buffer to store the result of stdout from the pipe
2490            buf = ctypes.create_string_buffer(len(msg))
2491            # Obtain the text currently in proc.stdout
2492            # Bytes read/avail/left are left as NULL and unused
2493            rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2494                                 buf, ctypes.sizeof(buf), None, None, None)
2495            self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2496            if buf.value:
2497                self.assertEqual(msg, buf.value.decode())
2498                break
2499            time.sleep(0.1)
2500            count += 1
2501        else:
2502            self.fail("Did not receive communication from the subprocess")
2503
2504        os.kill(proc.pid, sig)
2505        self.assertEqual(proc.wait(), sig)
2506
2507    def test_kill_sigterm(self):
2508        # SIGTERM doesn't mean anything special, but make sure it works
2509        self._kill(signal.SIGTERM)
2510
2511    def test_kill_int(self):
2512        # os.kill on Windows can take an int which gets set as the exit code
2513        self._kill(100)
2514
2515    @unittest.skipIf(mmap is None, "requires mmap")
2516    def _kill_with_event(self, event, name):
2517        tagname = "test_os_%s" % uuid.uuid1()
2518        m = mmap.mmap(-1, 1, tagname)
2519        m[0] = 0
2520        # Run a script which has console control handling enabled.
2521        proc = subprocess.Popen([sys.executable,
2522                   os.path.join(os.path.dirname(__file__),
2523                                "win_console_handler.py"), tagname],
2524                   creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2525        # Let the interpreter startup before we send signals. See #3137.
2526        count, max = 0, 100
2527        while count < max and proc.poll() is None:
2528            if m[0] == 1:
2529                break
2530            time.sleep(0.1)
2531            count += 1
2532        else:
2533            # Forcefully kill the process if we weren't able to signal it.
2534            os.kill(proc.pid, signal.SIGINT)
2535            self.fail("Subprocess didn't finish initialization")
2536        os.kill(proc.pid, event)
2537        # proc.send_signal(event) could also be done here.
2538        # Allow time for the signal to be passed and the process to exit.
2539        time.sleep(0.5)
2540        if not proc.poll():
2541            # Forcefully kill the process if we weren't able to signal it.
2542            os.kill(proc.pid, signal.SIGINT)
2543            self.fail("subprocess did not stop on {}".format(name))
2544
2545    @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
2546    @support.requires_subprocess()
2547    def test_CTRL_C_EVENT(self):
2548        from ctypes import wintypes
2549        import ctypes
2550
2551        # Make a NULL value by creating a pointer with no argument.
2552        NULL = ctypes.POINTER(ctypes.c_int)()
2553        SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2554        SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2555                                          wintypes.BOOL)
2556        SetConsoleCtrlHandler.restype = wintypes.BOOL
2557
2558        # Calling this with NULL and FALSE causes the calling process to
2559        # handle Ctrl+C, rather than ignore it. This property is inherited
2560        # by subprocesses.
2561        SetConsoleCtrlHandler(NULL, 0)
2562
2563        self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2564
2565    @support.requires_subprocess()
2566    def test_CTRL_BREAK_EVENT(self):
2567        self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2568
2569
2570@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2571class Win32ListdirTests(unittest.TestCase):
2572    """Test listdir on Windows."""
2573
2574    def setUp(self):
2575        self.created_paths = []
2576        for i in range(2):
2577            dir_name = 'SUB%d' % i
2578            dir_path = os.path.join(os_helper.TESTFN, dir_name)
2579            file_name = 'FILE%d' % i
2580            file_path = os.path.join(os_helper.TESTFN, file_name)
2581            os.makedirs(dir_path)
2582            with open(file_path, 'w', encoding='utf-8') as f:
2583                f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2584            self.created_paths.extend([dir_name, file_name])
2585        self.created_paths.sort()
2586
2587    def tearDown(self):
2588        shutil.rmtree(os_helper.TESTFN)
2589
2590    def test_listdir_no_extended_path(self):
2591        """Test when the path is not an "extended" path."""
2592        # unicode
2593        self.assertEqual(
2594                sorted(os.listdir(os_helper.TESTFN)),
2595                self.created_paths)
2596
2597        # bytes
2598        self.assertEqual(
2599                sorted(os.listdir(os.fsencode(os_helper.TESTFN))),
2600                [os.fsencode(path) for path in self.created_paths])
2601
2602    def test_listdir_extended_path(self):
2603        """Test when the path starts with '\\\\?\\'."""
2604        # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
2605        # unicode
2606        path = '\\\\?\\' + os.path.abspath(os_helper.TESTFN)
2607        self.assertEqual(
2608                sorted(os.listdir(path)),
2609                self.created_paths)
2610
2611        # bytes
2612        path = b'\\\\?\\' + os.fsencode(os.path.abspath(os_helper.TESTFN))
2613        self.assertEqual(
2614                sorted(os.listdir(path)),
2615                [os.fsencode(path) for path in self.created_paths])
2616
2617
2618@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2619class ReadlinkTests(unittest.TestCase):
2620    filelink = 'readlinktest'
2621    filelink_target = os.path.abspath(__file__)
2622    filelinkb = os.fsencode(filelink)
2623    filelinkb_target = os.fsencode(filelink_target)
2624
2625    def assertPathEqual(self, left, right):
2626        left = os.path.normcase(left)
2627        right = os.path.normcase(right)
2628        if sys.platform == 'win32':
2629            # Bad practice to blindly strip the prefix as it may be required to
2630            # correctly refer to the file, but we're only comparing paths here.
2631            has_prefix = lambda p: p.startswith(
2632                b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\')
2633            if has_prefix(left):
2634                left = left[4:]
2635            if has_prefix(right):
2636                right = right[4:]
2637        self.assertEqual(left, right)
2638
2639    def setUp(self):
2640        self.assertTrue(os.path.exists(self.filelink_target))
2641        self.assertTrue(os.path.exists(self.filelinkb_target))
2642        self.assertFalse(os.path.exists(self.filelink))
2643        self.assertFalse(os.path.exists(self.filelinkb))
2644
2645    def test_not_symlink(self):
2646        filelink_target = FakePath(self.filelink_target)
2647        self.assertRaises(OSError, os.readlink, self.filelink_target)
2648        self.assertRaises(OSError, os.readlink, filelink_target)
2649
2650    def test_missing_link(self):
2651        self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2652        self.assertRaises(FileNotFoundError, os.readlink,
2653                          FakePath('missing-link'))
2654
2655    @os_helper.skip_unless_symlink
2656    def test_pathlike(self):
2657        os.symlink(self.filelink_target, self.filelink)
2658        self.addCleanup(os_helper.unlink, self.filelink)
2659        filelink = FakePath(self.filelink)
2660        self.assertPathEqual(os.readlink(filelink), self.filelink_target)
2661
2662    @os_helper.skip_unless_symlink
2663    def test_pathlike_bytes(self):
2664        os.symlink(self.filelinkb_target, self.filelinkb)
2665        self.addCleanup(os_helper.unlink, self.filelinkb)
2666        path = os.readlink(FakePath(self.filelinkb))
2667        self.assertPathEqual(path, self.filelinkb_target)
2668        self.assertIsInstance(path, bytes)
2669
2670    @os_helper.skip_unless_symlink
2671    def test_bytes(self):
2672        os.symlink(self.filelinkb_target, self.filelinkb)
2673        self.addCleanup(os_helper.unlink, self.filelinkb)
2674        path = os.readlink(self.filelinkb)
2675        self.assertPathEqual(path, self.filelinkb_target)
2676        self.assertIsInstance(path, bytes)
2677
2678
2679@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2680@os_helper.skip_unless_symlink
2681class Win32SymlinkTests(unittest.TestCase):
2682    filelink = 'filelinktest'
2683    filelink_target = os.path.abspath(__file__)
2684    dirlink = 'dirlinktest'
2685    dirlink_target = os.path.dirname(filelink_target)
2686    missing_link = 'missing link'
2687
2688    def setUp(self):
2689        assert os.path.exists(self.dirlink_target)
2690        assert os.path.exists(self.filelink_target)
2691        assert not os.path.exists(self.dirlink)
2692        assert not os.path.exists(self.filelink)
2693        assert not os.path.exists(self.missing_link)
2694
2695    def tearDown(self):
2696        if os.path.exists(self.filelink):
2697            os.remove(self.filelink)
2698        if os.path.exists(self.dirlink):
2699            os.rmdir(self.dirlink)
2700        if os.path.lexists(self.missing_link):
2701            os.remove(self.missing_link)
2702
2703    def test_directory_link(self):
2704        os.symlink(self.dirlink_target, self.dirlink)
2705        self.assertTrue(os.path.exists(self.dirlink))
2706        self.assertTrue(os.path.isdir(self.dirlink))
2707        self.assertTrue(os.path.islink(self.dirlink))
2708        self.check_stat(self.dirlink, self.dirlink_target)
2709
2710    def test_file_link(self):
2711        os.symlink(self.filelink_target, self.filelink)
2712        self.assertTrue(os.path.exists(self.filelink))
2713        self.assertTrue(os.path.isfile(self.filelink))
2714        self.assertTrue(os.path.islink(self.filelink))
2715        self.check_stat(self.filelink, self.filelink_target)
2716
2717    def _create_missing_dir_link(self):
2718        'Create a "directory" link to a non-existent target'
2719        linkname = self.missing_link
2720        if os.path.lexists(linkname):
2721            os.remove(linkname)
2722        target = r'c:\\target does not exist.29r3c740'
2723        assert not os.path.exists(target)
2724        target_is_dir = True
2725        os.symlink(target, linkname, target_is_dir)
2726
2727    def test_remove_directory_link_to_missing_target(self):
2728        self._create_missing_dir_link()
2729        # For compatibility with Unix, os.remove will check the
2730        #  directory status and call RemoveDirectory if the symlink
2731        #  was created with target_is_dir==True.
2732        os.remove(self.missing_link)
2733
2734    def test_isdir_on_directory_link_to_missing_target(self):
2735        self._create_missing_dir_link()
2736        self.assertFalse(os.path.isdir(self.missing_link))
2737
2738    def test_rmdir_on_directory_link_to_missing_target(self):
2739        self._create_missing_dir_link()
2740        os.rmdir(self.missing_link)
2741
2742    def check_stat(self, link, target):
2743        self.assertEqual(os.stat(link), os.stat(target))
2744        self.assertNotEqual(os.lstat(link), os.stat(link))
2745
2746        bytes_link = os.fsencode(link)
2747        self.assertEqual(os.stat(bytes_link), os.stat(target))
2748        self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
2749
2750    def test_12084(self):
2751        level1 = os.path.abspath(os_helper.TESTFN)
2752        level2 = os.path.join(level1, "level2")
2753        level3 = os.path.join(level2, "level3")
2754        self.addCleanup(os_helper.rmtree, level1)
2755
2756        os.mkdir(level1)
2757        os.mkdir(level2)
2758        os.mkdir(level3)
2759
2760        file1 = os.path.abspath(os.path.join(level1, "file1"))
2761        create_file(file1)
2762
2763        orig_dir = os.getcwd()
2764        try:
2765            os.chdir(level2)
2766            link = os.path.join(level2, "link")
2767            os.symlink(os.path.relpath(file1), "link")
2768            self.assertIn("link", os.listdir(os.getcwd()))
2769
2770            # Check os.stat calls from the same dir as the link
2771            self.assertEqual(os.stat(file1), os.stat("link"))
2772
2773            # Check os.stat calls from a dir below the link
2774            os.chdir(level1)
2775            self.assertEqual(os.stat(file1),
2776                             os.stat(os.path.relpath(link)))
2777
2778            # Check os.stat calls from a dir above the link
2779            os.chdir(level3)
2780            self.assertEqual(os.stat(file1),
2781                             os.stat(os.path.relpath(link)))
2782        finally:
2783            os.chdir(orig_dir)
2784
2785    @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2786                            and os.path.exists(r'C:\ProgramData'),
2787                            'Test directories not found')
2788    def test_29248(self):
2789        # os.symlink() calls CreateSymbolicLink, which creates
2790        # the reparse data buffer with the print name stored
2791        # first, so the offset is always 0. CreateSymbolicLink
2792        # stores the "PrintName" DOS path (e.g. "C:\") first,
2793        # with an offset of 0, followed by the "SubstituteName"
2794        # NT path (e.g. "\??\C:\"). The "All Users" link, on
2795        # the other hand, seems to have been created manually
2796        # with an inverted order.
2797        target = os.readlink(r'C:\Users\All Users')
2798        self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2799
2800    def test_buffer_overflow(self):
2801        # Older versions would have a buffer overflow when detecting
2802        # whether a link source was a directory. This test ensures we
2803        # no longer crash, but does not otherwise validate the behavior
2804        segment = 'X' * 27
2805        path = os.path.join(*[segment] * 10)
2806        test_cases = [
2807            # overflow with absolute src
2808            ('\\' + path, segment),
2809            # overflow dest with relative src
2810            (segment, path),
2811            # overflow when joining src
2812            (path[:180], path[:180]),
2813        ]
2814        for src, dest in test_cases:
2815            try:
2816                os.symlink(src, dest)
2817            except FileNotFoundError:
2818                pass
2819            else:
2820                try:
2821                    os.remove(dest)
2822                except OSError:
2823                    pass
2824            # Also test with bytes, since that is a separate code path.
2825            try:
2826                os.symlink(os.fsencode(src), os.fsencode(dest))
2827            except FileNotFoundError:
2828                pass
2829            else:
2830                try:
2831                    os.remove(dest)
2832                except OSError:
2833                    pass
2834
2835    def test_appexeclink(self):
2836        root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
2837        if not os.path.isdir(root):
2838            self.skipTest("test requires a WindowsApps directory")
2839
2840        aliases = [os.path.join(root, a)
2841                   for a in fnmatch.filter(os.listdir(root), '*.exe')]
2842
2843        for alias in aliases:
2844            if support.verbose:
2845                print()
2846                print("Testing with", alias)
2847            st = os.lstat(alias)
2848            self.assertEqual(st, os.stat(alias))
2849            self.assertFalse(stat.S_ISLNK(st.st_mode))
2850            self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
2851            # testing the first one we see is sufficient
2852            break
2853        else:
2854            self.skipTest("test requires an app execution alias")
2855
2856@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2857class Win32JunctionTests(unittest.TestCase):
2858    junction = 'junctiontest'
2859    junction_target = os.path.dirname(os.path.abspath(__file__))
2860
2861    def setUp(self):
2862        assert os.path.exists(self.junction_target)
2863        assert not os.path.lexists(self.junction)
2864
2865    def tearDown(self):
2866        if os.path.lexists(self.junction):
2867            os.unlink(self.junction)
2868
2869    def test_create_junction(self):
2870        _winapi.CreateJunction(self.junction_target, self.junction)
2871        self.assertTrue(os.path.lexists(self.junction))
2872        self.assertTrue(os.path.exists(self.junction))
2873        self.assertTrue(os.path.isdir(self.junction))
2874        self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
2875        self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
2876
2877        # bpo-37834: Junctions are not recognized as links.
2878        self.assertFalse(os.path.islink(self.junction))
2879        self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
2880                         os.path.normcase(os.readlink(self.junction)))
2881
2882    def test_unlink_removes_junction(self):
2883        _winapi.CreateJunction(self.junction_target, self.junction)
2884        self.assertTrue(os.path.exists(self.junction))
2885        self.assertTrue(os.path.lexists(self.junction))
2886
2887        os.unlink(self.junction)
2888        self.assertFalse(os.path.exists(self.junction))
2889
2890@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2891class Win32NtTests(unittest.TestCase):
2892    def test_getfinalpathname_handles(self):
2893        nt = import_helper.import_module('nt')
2894        ctypes = import_helper.import_module('ctypes')
2895        import ctypes.wintypes
2896
2897        kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2898        kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2899
2900        kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2901        kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2902                                                 ctypes.wintypes.LPDWORD)
2903
2904        # This is a pseudo-handle that doesn't need to be closed
2905        hproc = kernel.GetCurrentProcess()
2906
2907        handle_count = ctypes.wintypes.DWORD()
2908        ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2909        self.assertEqual(1, ok)
2910
2911        before_count = handle_count.value
2912
2913        # The first two test the error path, __file__ tests the success path
2914        filenames = [
2915            r'\\?\C:',
2916            r'\\?\NUL',
2917            r'\\?\CONIN',
2918            __file__,
2919        ]
2920
2921        for _ in range(10):
2922            for name in filenames:
2923                try:
2924                    nt._getfinalpathname(name)
2925                except Exception:
2926                    # Failure is expected
2927                    pass
2928                try:
2929                    os.stat(name)
2930                except Exception:
2931                    pass
2932
2933        ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2934        self.assertEqual(1, ok)
2935
2936        handle_delta = handle_count.value - before_count
2937
2938        self.assertEqual(0, handle_delta)
2939
2940    @support.requires_subprocess()
2941    def test_stat_unlink_race(self):
2942        # bpo-46785: the implementation of os.stat() falls back to reading
2943        # the parent directory if CreateFileW() fails with a permission
2944        # error. If reading the parent directory fails because the file or
2945        # directory are subsequently unlinked, or because the volume or
2946        # share are no longer available, then the original permission error
2947        # should not be restored.
2948        filename =  os_helper.TESTFN
2949        self.addCleanup(os_helper.unlink, filename)
2950        deadline = time.time() + 5
2951        command = textwrap.dedent("""\
2952            import os
2953            import sys
2954            import time
2955
2956            filename = sys.argv[1]
2957            deadline = float(sys.argv[2])
2958
2959            while time.time() < deadline:
2960                try:
2961                    with open(filename, "w") as f:
2962                        pass
2963                except OSError:
2964                    pass
2965                try:
2966                    os.remove(filename)
2967                except OSError:
2968                    pass
2969            """)
2970
2971        with subprocess.Popen([sys.executable, '-c', command, filename, str(deadline)]) as proc:
2972            while time.time() < deadline:
2973                try:
2974                    os.stat(filename)
2975                except FileNotFoundError as e:
2976                    assert e.winerror == 2  # ERROR_FILE_NOT_FOUND
2977            try:
2978                proc.wait(1)
2979            except subprocess.TimeoutExpired:
2980                proc.terminate()
2981
2982
2983@os_helper.skip_unless_symlink
2984class NonLocalSymlinkTests(unittest.TestCase):
2985
2986    def setUp(self):
2987        r"""
2988        Create this structure:
2989
2990        base
2991         \___ some_dir
2992        """
2993        os.makedirs('base/some_dir')
2994
2995    def tearDown(self):
2996        shutil.rmtree('base')
2997
2998    def test_directory_link_nonlocal(self):
2999        """
3000        The symlink target should resolve relative to the link, not relative
3001        to the current directory.
3002
3003        Then, link base/some_link -> base/some_dir and ensure that some_link
3004        is resolved as a directory.
3005
3006        In issue13772, it was discovered that directory detection failed if
3007        the symlink target was not specified relative to the current
3008        directory, which was a defect in the implementation.
3009        """
3010        src = os.path.join('base', 'some_link')
3011        os.symlink('some_dir', src)
3012        assert os.path.isdir(src)
3013
3014
3015class FSEncodingTests(unittest.TestCase):
3016    def test_nop(self):
3017        self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
3018        self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
3019
3020    def test_identity(self):
3021        # assert fsdecode(fsencode(x)) == x
3022        for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
3023            try:
3024                bytesfn = os.fsencode(fn)
3025            except UnicodeEncodeError:
3026                continue
3027            self.assertEqual(os.fsdecode(bytesfn), fn)
3028
3029
3030
3031class DeviceEncodingTests(unittest.TestCase):
3032
3033    def test_bad_fd(self):
3034        # Return None when an fd doesn't actually exist.
3035        self.assertIsNone(os.device_encoding(123456))
3036
3037    @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
3038            (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
3039            'test requires a tty and either Windows or nl_langinfo(CODESET)')
3040    @unittest.skipIf(
3041        support.is_emscripten, "Cannot get encoding of stdin on Emscripten"
3042    )
3043    def test_device_encoding(self):
3044        encoding = os.device_encoding(0)
3045        self.assertIsNotNone(encoding)
3046        self.assertTrue(codecs.lookup(encoding))
3047
3048
3049@support.requires_subprocess()
3050class PidTests(unittest.TestCase):
3051    @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
3052    def test_getppid(self):
3053        p = subprocess.Popen([sys.executable, '-c',
3054                              'import os; print(os.getppid())'],
3055                             stdout=subprocess.PIPE)
3056        stdout, _ = p.communicate()
3057        # We are the parent of our subprocess
3058        self.assertEqual(int(stdout), os.getpid())
3059
3060    def check_waitpid(self, code, exitcode, callback=None):
3061        if sys.platform == 'win32':
3062            # On Windows, os.spawnv() simply joins arguments with spaces:
3063            # arguments need to be quoted
3064            args = [f'"{sys.executable}"', '-c', f'"{code}"']
3065        else:
3066            args = [sys.executable, '-c', code]
3067        pid = os.spawnv(os.P_NOWAIT, sys.executable, args)
3068
3069        if callback is not None:
3070            callback(pid)
3071
3072        # don't use support.wait_process() to test directly os.waitpid()
3073        # and os.waitstatus_to_exitcode()
3074        pid2, status = os.waitpid(pid, 0)
3075        self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
3076        self.assertEqual(pid2, pid)
3077
3078    def test_waitpid(self):
3079        self.check_waitpid(code='pass', exitcode=0)
3080
3081    def test_waitstatus_to_exitcode(self):
3082        exitcode = 23
3083        code = f'import sys; sys.exit({exitcode})'
3084        self.check_waitpid(code, exitcode=exitcode)
3085
3086        with self.assertRaises(TypeError):
3087            os.waitstatus_to_exitcode(0.0)
3088
3089    @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3090    def test_waitpid_windows(self):
3091        # bpo-40138: test os.waitpid() and os.waitstatus_to_exitcode()
3092        # with exit code larger than INT_MAX.
3093        STATUS_CONTROL_C_EXIT = 0xC000013A
3094        code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})'
3095        self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT)
3096
3097    @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3098    def test_waitstatus_to_exitcode_windows(self):
3099        max_exitcode = 2 ** 32 - 1
3100        for exitcode in (0, 1, 5, max_exitcode):
3101            self.assertEqual(os.waitstatus_to_exitcode(exitcode << 8),
3102                             exitcode)
3103
3104        # invalid values
3105        with self.assertRaises(ValueError):
3106            os.waitstatus_to_exitcode((max_exitcode + 1) << 8)
3107        with self.assertRaises(OverflowError):
3108            os.waitstatus_to_exitcode(-1)
3109
3110    # Skip the test on Windows
3111    @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'need signal.SIGKILL')
3112    def test_waitstatus_to_exitcode_kill(self):
3113        code = f'import time; time.sleep({support.LONG_TIMEOUT})'
3114        signum = signal.SIGKILL
3115
3116        def kill_process(pid):
3117            os.kill(pid, signum)
3118
3119        self.check_waitpid(code, exitcode=-signum, callback=kill_process)
3120
3121
3122@support.requires_subprocess()
3123class SpawnTests(unittest.TestCase):
3124    @staticmethod
3125    def quote_args(args):
3126        # On Windows, os.spawn* simply joins arguments with spaces:
3127        # arguments need to be quoted
3128        if os.name != 'nt':
3129            return args
3130        return [f'"{arg}"' if " " in arg.strip() else arg for arg in args]
3131
3132    def create_args(self, *, with_env=False, use_bytes=False):
3133        self.exitcode = 17
3134
3135        filename = os_helper.TESTFN
3136        self.addCleanup(os_helper.unlink, filename)
3137
3138        if not with_env:
3139            code = 'import sys; sys.exit(%s)' % self.exitcode
3140        else:
3141            self.env = dict(os.environ)
3142            # create an unique key
3143            self.key = str(uuid.uuid4())
3144            self.env[self.key] = self.key
3145            # read the variable from os.environ to check that it exists
3146            code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
3147                    % (self.key, self.exitcode))
3148
3149        with open(filename, "w", encoding="utf-8") as fp:
3150            fp.write(code)
3151
3152        program = sys.executable
3153        args = self.quote_args([program, filename])
3154        if use_bytes:
3155            program = os.fsencode(program)
3156            args = [os.fsencode(a) for a in args]
3157            self.env = {os.fsencode(k): os.fsencode(v)
3158                        for k, v in self.env.items()}
3159
3160        return program, args
3161
3162    @requires_os_func('spawnl')
3163    def test_spawnl(self):
3164        program, args = self.create_args()
3165        exitcode = os.spawnl(os.P_WAIT, program, *args)
3166        self.assertEqual(exitcode, self.exitcode)
3167
3168    @requires_os_func('spawnle')
3169    def test_spawnle(self):
3170        program, args = self.create_args(with_env=True)
3171        exitcode = os.spawnle(os.P_WAIT, program, *args, self.env)
3172        self.assertEqual(exitcode, self.exitcode)
3173
3174    @requires_os_func('spawnlp')
3175    def test_spawnlp(self):
3176        program, args = self.create_args()
3177        exitcode = os.spawnlp(os.P_WAIT, program, *args)
3178        self.assertEqual(exitcode, self.exitcode)
3179
3180    @requires_os_func('spawnlpe')
3181    def test_spawnlpe(self):
3182        program, args = self.create_args(with_env=True)
3183        exitcode = os.spawnlpe(os.P_WAIT, program, *args, self.env)
3184        self.assertEqual(exitcode, self.exitcode)
3185
3186    @requires_os_func('spawnv')
3187    def test_spawnv(self):
3188        program, args = self.create_args()
3189        exitcode = os.spawnv(os.P_WAIT, program, args)
3190        self.assertEqual(exitcode, self.exitcode)
3191
3192        # Test for PyUnicode_FSConverter()
3193        exitcode = os.spawnv(os.P_WAIT, FakePath(program), args)
3194        self.assertEqual(exitcode, self.exitcode)
3195
3196    @requires_os_func('spawnve')
3197    def test_spawnve(self):
3198        program, args = self.create_args(with_env=True)
3199        exitcode = os.spawnve(os.P_WAIT, program, args, self.env)
3200        self.assertEqual(exitcode, self.exitcode)
3201
3202    @requires_os_func('spawnvp')
3203    def test_spawnvp(self):
3204        program, args = self.create_args()
3205        exitcode = os.spawnvp(os.P_WAIT, program, args)
3206        self.assertEqual(exitcode, self.exitcode)
3207
3208    @requires_os_func('spawnvpe')
3209    def test_spawnvpe(self):
3210        program, args = self.create_args(with_env=True)
3211        exitcode = os.spawnvpe(os.P_WAIT, program, args, self.env)
3212        self.assertEqual(exitcode, self.exitcode)
3213
3214    @requires_os_func('spawnv')
3215    def test_nowait(self):
3216        program, args = self.create_args()
3217        pid = os.spawnv(os.P_NOWAIT, program, args)
3218        support.wait_process(pid, exitcode=self.exitcode)
3219
3220    @requires_os_func('spawnve')
3221    def test_spawnve_bytes(self):
3222        # Test bytes handling in parse_arglist and parse_envlist (#28114)
3223        program, args = self.create_args(with_env=True, use_bytes=True)
3224        exitcode = os.spawnve(os.P_WAIT, program, args, self.env)
3225        self.assertEqual(exitcode, self.exitcode)
3226
3227    @requires_os_func('spawnl')
3228    def test_spawnl_noargs(self):
3229        program, __ = self.create_args()
3230        self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, program)
3231        self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, program, '')
3232
3233    @requires_os_func('spawnle')
3234    def test_spawnle_noargs(self):
3235        program, __ = self.create_args()
3236        self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, program, {})
3237        self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, program, '', {})
3238
3239    @requires_os_func('spawnv')
3240    def test_spawnv_noargs(self):
3241        program, __ = self.create_args()
3242        self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, program, ())
3243        self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, program, [])
3244        self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, program, ('',))
3245        self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, program, [''])
3246
3247    @requires_os_func('spawnve')
3248    def test_spawnve_noargs(self):
3249        program, __ = self.create_args()
3250        self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, program, (), {})
3251        self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, program, [], {})
3252        self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, program, ('',), {})
3253        self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, program, [''], {})
3254
3255    def _test_invalid_env(self, spawn):
3256        program = sys.executable
3257        args = self.quote_args([program, '-c', 'pass'])
3258
3259        # null character in the environment variable name
3260        newenv = os.environ.copy()
3261        newenv["FRUIT\0VEGETABLE"] = "cabbage"
3262        try:
3263            exitcode = spawn(os.P_WAIT, program, args, newenv)
3264        except ValueError:
3265            pass
3266        else:
3267            self.assertEqual(exitcode, 127)
3268
3269        # null character in the environment variable value
3270        newenv = os.environ.copy()
3271        newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
3272        try:
3273            exitcode = spawn(os.P_WAIT, program, args, newenv)
3274        except ValueError:
3275            pass
3276        else:
3277            self.assertEqual(exitcode, 127)
3278
3279        # equal character in the environment variable name
3280        newenv = os.environ.copy()
3281        newenv["FRUIT=ORANGE"] = "lemon"
3282        try:
3283            exitcode = spawn(os.P_WAIT, program, args, newenv)
3284        except ValueError:
3285            pass
3286        else:
3287            self.assertEqual(exitcode, 127)
3288
3289        # equal character in the environment variable value
3290        filename = os_helper.TESTFN
3291        self.addCleanup(os_helper.unlink, filename)
3292        with open(filename, "w", encoding="utf-8") as fp:
3293            fp.write('import sys, os\n'
3294                     'if os.getenv("FRUIT") != "orange=lemon":\n'
3295                     '    raise AssertionError')
3296
3297        args = self.quote_args([program, filename])
3298        newenv = os.environ.copy()
3299        newenv["FRUIT"] = "orange=lemon"
3300        exitcode = spawn(os.P_WAIT, program, args, newenv)
3301        self.assertEqual(exitcode, 0)
3302
3303    @requires_os_func('spawnve')
3304    def test_spawnve_invalid_env(self):
3305        self._test_invalid_env(os.spawnve)
3306
3307    @requires_os_func('spawnvpe')
3308    def test_spawnvpe_invalid_env(self):
3309        self._test_invalid_env(os.spawnvpe)
3310
3311
3312# The introduction of this TestCase caused at least two different errors on
3313# *nix buildbots. Temporarily skip this to let the buildbots move along.
3314@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
3315@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
3316class LoginTests(unittest.TestCase):
3317    def test_getlogin(self):
3318        user_name = os.getlogin()
3319        self.assertNotEqual(len(user_name), 0)
3320
3321
3322@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
3323                     "needs os.getpriority and os.setpriority")
3324class ProgramPriorityTests(unittest.TestCase):
3325    """Tests for os.getpriority() and os.setpriority()."""
3326
3327    def test_set_get_priority(self):
3328
3329        base = os.getpriority(os.PRIO_PROCESS, os.getpid())
3330        os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
3331        try:
3332            new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
3333            if base >= 19 and new_prio <= 19:
3334                raise unittest.SkipTest("unable to reliably test setpriority "
3335                                        "at current nice level of %s" % base)
3336            else:
3337                self.assertEqual(new_prio, base + 1)
3338        finally:
3339            try:
3340                os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
3341            except OSError as err:
3342                if err.errno != errno.EACCES:
3343                    raise
3344
3345
3346@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
3347class TestSendfile(unittest.IsolatedAsyncioTestCase):
3348
3349    DATA = b"12345abcde" * 16 * 1024  # 160 KiB
3350    SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
3351                               not sys.platform.startswith("solaris") and \
3352                               not sys.platform.startswith("sunos")
3353    requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
3354            'requires headers and trailers support')
3355    requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
3356            'test is only meaningful on 32-bit builds')
3357
3358    @classmethod
3359    def setUpClass(cls):
3360        create_file(os_helper.TESTFN, cls.DATA)
3361
3362    @classmethod
3363    def tearDownClass(cls):
3364        os_helper.unlink(os_helper.TESTFN)
3365
3366    @staticmethod
3367    async def chunks(reader):
3368        while not reader.at_eof():
3369            yield await reader.read()
3370
3371    async def handle_new_client(self, reader, writer):
3372        self.server_buffer = b''.join([x async for x in self.chunks(reader)])
3373        writer.close()
3374        self.server.close()  # The test server processes a single client only
3375
3376    async def asyncSetUp(self):
3377        self.server_buffer = b''
3378        self.server = await asyncio.start_server(self.handle_new_client,
3379                                                 socket_helper.HOSTv4)
3380        server_name = self.server.sockets[0].getsockname()
3381        self.client = socket.socket()
3382        self.client.setblocking(False)
3383        await asyncio.get_running_loop().sock_connect(self.client, server_name)
3384        self.sockno = self.client.fileno()
3385        self.file = open(os_helper.TESTFN, 'rb')
3386        self.fileno = self.file.fileno()
3387
3388    async def asyncTearDown(self):
3389        self.file.close()
3390        self.client.close()
3391        await self.server.wait_closed()
3392
3393    # Use the test subject instead of asyncio.loop.sendfile
3394    @staticmethod
3395    async def async_sendfile(*args, **kwargs):
3396        return await asyncio.to_thread(os.sendfile, *args, **kwargs)
3397
3398    @staticmethod
3399    async def sendfile_wrapper(*args, **kwargs):
3400        """A higher level wrapper representing how an application is
3401        supposed to use sendfile().
3402        """
3403        while True:
3404            try:
3405                return await TestSendfile.async_sendfile(*args, **kwargs)
3406            except OSError as err:
3407                if err.errno == errno.ECONNRESET:
3408                    # disconnected
3409                    raise
3410                elif err.errno in (errno.EAGAIN, errno.EBUSY):
3411                    # we have to retry send data
3412                    continue
3413                else:
3414                    raise
3415
3416    async def test_send_whole_file(self):
3417        # normal send
3418        total_sent = 0
3419        offset = 0
3420        nbytes = 4096
3421        while total_sent < len(self.DATA):
3422            sent = await self.sendfile_wrapper(self.sockno, self.fileno,
3423                                               offset, nbytes)
3424            if sent == 0:
3425                break
3426            offset += sent
3427            total_sent += sent
3428            self.assertTrue(sent <= nbytes)
3429            self.assertEqual(offset, total_sent)
3430
3431        self.assertEqual(total_sent, len(self.DATA))
3432        self.client.shutdown(socket.SHUT_RDWR)
3433        self.client.close()
3434        await self.server.wait_closed()
3435        self.assertEqual(len(self.server_buffer), len(self.DATA))
3436        self.assertEqual(self.server_buffer, self.DATA)
3437
3438    async def test_send_at_certain_offset(self):
3439        # start sending a file at a certain offset
3440        total_sent = 0
3441        offset = len(self.DATA) // 2
3442        must_send = len(self.DATA) - offset
3443        nbytes = 4096
3444        while total_sent < must_send:
3445            sent = await self.sendfile_wrapper(self.sockno, self.fileno,
3446                                               offset, nbytes)
3447            if sent == 0:
3448                break
3449            offset += sent
3450            total_sent += sent
3451            self.assertTrue(sent <= nbytes)
3452
3453        self.client.shutdown(socket.SHUT_RDWR)
3454        self.client.close()
3455        await self.server.wait_closed()
3456        expected = self.DATA[len(self.DATA) // 2:]
3457        self.assertEqual(total_sent, len(expected))
3458        self.assertEqual(len(self.server_buffer), len(expected))
3459        self.assertEqual(self.server_buffer, expected)
3460
3461    async def test_offset_overflow(self):
3462        # specify an offset > file size
3463        offset = len(self.DATA) + 4096
3464        try:
3465            sent = await self.async_sendfile(self.sockno, self.fileno,
3466                                             offset, 4096)
3467        except OSError as e:
3468            # Solaris can raise EINVAL if offset >= file length, ignore.
3469            if e.errno != errno.EINVAL:
3470                raise
3471        else:
3472            self.assertEqual(sent, 0)
3473        self.client.shutdown(socket.SHUT_RDWR)
3474        self.client.close()
3475        await self.server.wait_closed()
3476        self.assertEqual(self.server_buffer, b'')
3477
3478    async def test_invalid_offset(self):
3479        with self.assertRaises(OSError) as cm:
3480            await self.async_sendfile(self.sockno, self.fileno, -1, 4096)
3481        self.assertEqual(cm.exception.errno, errno.EINVAL)
3482
3483    async def test_keywords(self):
3484        # Keyword arguments should be supported
3485        await self.async_sendfile(out_fd=self.sockno, in_fd=self.fileno,
3486                                  offset=0, count=4096)
3487        if self.SUPPORT_HEADERS_TRAILERS:
3488            await self.async_sendfile(out_fd=self.sockno, in_fd=self.fileno,
3489                                      offset=0, count=4096,
3490                                      headers=(), trailers=(), flags=0)
3491
3492    # --- headers / trailers tests
3493
3494    @requires_headers_trailers
3495    async def test_headers(self):
3496        total_sent = 0
3497        expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
3498        sent = await self.async_sendfile(self.sockno, self.fileno, 0, 4096,
3499                                         headers=[b"x" * 512, b"y" * 256])
3500        self.assertLessEqual(sent, 512 + 256 + 4096)
3501        total_sent += sent
3502        offset = 4096
3503        while total_sent < len(expected_data):
3504            nbytes = min(len(expected_data) - total_sent, 4096)
3505            sent = await self.sendfile_wrapper(self.sockno, self.fileno,
3506                                               offset, nbytes)
3507            if sent == 0:
3508                break
3509            self.assertLessEqual(sent, nbytes)
3510            total_sent += sent
3511            offset += sent
3512
3513        self.assertEqual(total_sent, len(expected_data))
3514        self.client.close()
3515        await self.server.wait_closed()
3516        self.assertEqual(hash(self.server_buffer), hash(expected_data))
3517
3518    @requires_headers_trailers
3519    async def test_trailers(self):
3520        TESTFN2 = os_helper.TESTFN + "2"
3521        file_data = b"abcdef"
3522
3523        self.addCleanup(os_helper.unlink, TESTFN2)
3524        create_file(TESTFN2, file_data)
3525
3526        with open(TESTFN2, 'rb') as f:
3527            await self.async_sendfile(self.sockno, f.fileno(), 0, 5,
3528                                      trailers=[b"123456", b"789"])
3529            self.client.close()
3530            await self.server.wait_closed()
3531            self.assertEqual(self.server_buffer, b"abcde123456789")
3532
3533    @requires_headers_trailers
3534    @requires_32b
3535    async def test_headers_overflow_32bits(self):
3536        self.server.handler_instance.accumulate = False
3537        with self.assertRaises(OSError) as cm:
3538            await self.async_sendfile(self.sockno, self.fileno, 0, 0,
3539                                      headers=[b"x" * 2**16] * 2**15)
3540        self.assertEqual(cm.exception.errno, errno.EINVAL)
3541
3542    @requires_headers_trailers
3543    @requires_32b
3544    async def test_trailers_overflow_32bits(self):
3545        self.server.handler_instance.accumulate = False
3546        with self.assertRaises(OSError) as cm:
3547            await self.async_sendfile(self.sockno, self.fileno, 0, 0,
3548                                      trailers=[b"x" * 2**16] * 2**15)
3549        self.assertEqual(cm.exception.errno, errno.EINVAL)
3550
3551    @requires_headers_trailers
3552    @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3553                         'test needs os.SF_NODISKIO')
3554    async def test_flags(self):
3555        try:
3556            await self.async_sendfile(self.sockno, self.fileno, 0, 4096,
3557                                      flags=os.SF_NODISKIO)
3558        except OSError as err:
3559            if err.errno not in (errno.EBUSY, errno.EAGAIN):
3560                raise
3561
3562
3563def supports_extended_attributes():
3564    if not hasattr(os, "setxattr"):
3565        return False
3566
3567    try:
3568        with open(os_helper.TESTFN, "xb", 0) as fp:
3569            try:
3570                os.setxattr(fp.fileno(), b"user.test", b"")
3571            except OSError:
3572                return False
3573    finally:
3574        os_helper.unlink(os_helper.TESTFN)
3575
3576    return True
3577
3578
3579@unittest.skipUnless(supports_extended_attributes(),
3580                     "no non-broken extended attribute support")
3581# Kernels < 2.6.39 don't respect setxattr flags.
3582@support.requires_linux_version(2, 6, 39)
3583class ExtendedAttributeTests(unittest.TestCase):
3584
3585    def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
3586        fn = os_helper.TESTFN
3587        self.addCleanup(os_helper.unlink, fn)
3588        create_file(fn)
3589
3590        with self.assertRaises(OSError) as cm:
3591            getxattr(fn, s("user.test"), **kwargs)
3592        self.assertEqual(cm.exception.errno, errno.ENODATA)
3593
3594        init_xattr = listxattr(fn)
3595        self.assertIsInstance(init_xattr, list)
3596
3597        setxattr(fn, s("user.test"), b"", **kwargs)
3598        xattr = set(init_xattr)
3599        xattr.add("user.test")
3600        self.assertEqual(set(listxattr(fn)), xattr)
3601        self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3602        setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3603        self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
3604
3605        with self.assertRaises(OSError) as cm:
3606            setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
3607        self.assertEqual(cm.exception.errno, errno.EEXIST)
3608
3609        with self.assertRaises(OSError) as cm:
3610            setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
3611        self.assertEqual(cm.exception.errno, errno.ENODATA)
3612
3613        setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
3614        xattr.add("user.test2")
3615        self.assertEqual(set(listxattr(fn)), xattr)
3616        removexattr(fn, s("user.test"), **kwargs)
3617
3618        with self.assertRaises(OSError) as cm:
3619            getxattr(fn, s("user.test"), **kwargs)
3620        self.assertEqual(cm.exception.errno, errno.ENODATA)
3621
3622        xattr.remove("user.test")
3623        self.assertEqual(set(listxattr(fn)), xattr)
3624        self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3625        setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3626        self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3627        removexattr(fn, s("user.test"), **kwargs)
3628        many = sorted("user.test{}".format(i) for i in range(100))
3629        for thing in many:
3630            setxattr(fn, thing, b"x", **kwargs)
3631        self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
3632
3633    def _check_xattrs(self, *args, **kwargs):
3634        self._check_xattrs_str(str, *args, **kwargs)
3635        os_helper.unlink(os_helper.TESTFN)
3636
3637        self._check_xattrs_str(os.fsencode, *args, **kwargs)
3638        os_helper.unlink(os_helper.TESTFN)
3639
3640    def test_simple(self):
3641        self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3642                           os.listxattr)
3643
3644    def test_lpath(self):
3645        self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3646                           os.listxattr, follow_symlinks=False)
3647
3648    def test_fds(self):
3649        def getxattr(path, *args):
3650            with open(path, "rb") as fp:
3651                return os.getxattr(fp.fileno(), *args)
3652        def setxattr(path, *args):
3653            with open(path, "wb", 0) as fp:
3654                os.setxattr(fp.fileno(), *args)
3655        def removexattr(path, *args):
3656            with open(path, "wb", 0) as fp:
3657                os.removexattr(fp.fileno(), *args)
3658        def listxattr(path, *args):
3659            with open(path, "rb") as fp:
3660                return os.listxattr(fp.fileno(), *args)
3661        self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3662
3663
3664@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3665class TermsizeTests(unittest.TestCase):
3666    def test_does_not_crash(self):
3667        """Check if get_terminal_size() returns a meaningful value.
3668
3669        There's no easy portable way to actually check the size of the
3670        terminal, so let's check if it returns something sensible instead.
3671        """
3672        try:
3673            size = os.get_terminal_size()
3674        except OSError as e:
3675            if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3676                # Under win32 a generic OSError can be thrown if the
3677                # handle cannot be retrieved
3678                self.skipTest("failed to query terminal size")
3679            raise
3680
3681        self.assertGreaterEqual(size.columns, 0)
3682        self.assertGreaterEqual(size.lines, 0)
3683
3684    def test_stty_match(self):
3685        """Check if stty returns the same results
3686
3687        stty actually tests stdin, so get_terminal_size is invoked on
3688        stdin explicitly. If stty succeeded, then get_terminal_size()
3689        should work too.
3690        """
3691        try:
3692            size = (
3693                subprocess.check_output(
3694                    ["stty", "size"], stderr=subprocess.DEVNULL, text=True
3695                ).split()
3696            )
3697        except (FileNotFoundError, subprocess.CalledProcessError,
3698                PermissionError):
3699            self.skipTest("stty invocation failed")
3700        expected = (int(size[1]), int(size[0])) # reversed order
3701
3702        try:
3703            actual = os.get_terminal_size(sys.__stdin__.fileno())
3704        except OSError as e:
3705            if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3706                # Under win32 a generic OSError can be thrown if the
3707                # handle cannot be retrieved
3708                self.skipTest("failed to query terminal size")
3709            raise
3710        self.assertEqual(expected, actual)
3711
3712
3713@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
3714@support.requires_linux_version(3, 17)
3715class MemfdCreateTests(unittest.TestCase):
3716    def test_memfd_create(self):
3717        fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3718        self.assertNotEqual(fd, -1)
3719        self.addCleanup(os.close, fd)
3720        self.assertFalse(os.get_inheritable(fd))
3721        with open(fd, "wb", closefd=False) as f:
3722            f.write(b'memfd_create')
3723            self.assertEqual(f.tell(), 12)
3724
3725        fd2 = os.memfd_create("Hi")
3726        self.addCleanup(os.close, fd2)
3727        self.assertFalse(os.get_inheritable(fd2))
3728
3729
3730@unittest.skipUnless(hasattr(os, 'eventfd'), 'requires os.eventfd')
3731@support.requires_linux_version(2, 6, 30)
3732class EventfdTests(unittest.TestCase):
3733    def test_eventfd_initval(self):
3734        def pack(value):
3735            """Pack as native uint64_t
3736            """
3737            return struct.pack("@Q", value)
3738        size = 8  # read/write 8 bytes
3739        initval = 42
3740        fd = os.eventfd(initval)
3741        self.assertNotEqual(fd, -1)
3742        self.addCleanup(os.close, fd)
3743        self.assertFalse(os.get_inheritable(fd))
3744
3745        # test with raw read/write
3746        res = os.read(fd, size)
3747        self.assertEqual(res, pack(initval))
3748
3749        os.write(fd, pack(23))
3750        res = os.read(fd, size)
3751        self.assertEqual(res, pack(23))
3752
3753        os.write(fd, pack(40))
3754        os.write(fd, pack(2))
3755        res = os.read(fd, size)
3756        self.assertEqual(res, pack(42))
3757
3758        # test with eventfd_read/eventfd_write
3759        os.eventfd_write(fd, 20)
3760        os.eventfd_write(fd, 3)
3761        res = os.eventfd_read(fd)
3762        self.assertEqual(res, 23)
3763
3764    def test_eventfd_semaphore(self):
3765        initval = 2
3766        flags = os.EFD_CLOEXEC | os.EFD_SEMAPHORE | os.EFD_NONBLOCK
3767        fd = os.eventfd(initval, flags)
3768        self.assertNotEqual(fd, -1)
3769        self.addCleanup(os.close, fd)
3770
3771        # semaphore starts has initval 2, two reads return '1'
3772        res = os.eventfd_read(fd)
3773        self.assertEqual(res, 1)
3774        res = os.eventfd_read(fd)
3775        self.assertEqual(res, 1)
3776        # third read would block
3777        with self.assertRaises(BlockingIOError):
3778            os.eventfd_read(fd)
3779        with self.assertRaises(BlockingIOError):
3780            os.read(fd, 8)
3781
3782        # increase semaphore counter, read one
3783        os.eventfd_write(fd, 1)
3784        res = os.eventfd_read(fd)
3785        self.assertEqual(res, 1)
3786        # next read would block, too
3787        with self.assertRaises(BlockingIOError):
3788            os.eventfd_read(fd)
3789
3790    def test_eventfd_select(self):
3791        flags = os.EFD_CLOEXEC | os.EFD_NONBLOCK
3792        fd = os.eventfd(0, flags)
3793        self.assertNotEqual(fd, -1)
3794        self.addCleanup(os.close, fd)
3795
3796        # counter is zero, only writeable
3797        rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3798        self.assertEqual((rfd, wfd, xfd), ([], [fd], []))
3799
3800        # counter is non-zero, read and writeable
3801        os.eventfd_write(fd, 23)
3802        rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3803        self.assertEqual((rfd, wfd, xfd), ([fd], [fd], []))
3804        self.assertEqual(os.eventfd_read(fd), 23)
3805
3806        # counter at max, only readable
3807        os.eventfd_write(fd, (2**64) - 2)
3808        rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3809        self.assertEqual((rfd, wfd, xfd), ([fd], [], []))
3810        os.eventfd_read(fd)
3811
3812
3813class OSErrorTests(unittest.TestCase):
3814    def setUp(self):
3815        class Str(str):
3816            pass
3817
3818        self.bytes_filenames = []
3819        self.unicode_filenames = []
3820        if os_helper.TESTFN_UNENCODABLE is not None:
3821            decoded = os_helper.TESTFN_UNENCODABLE
3822        else:
3823            decoded = os_helper.TESTFN
3824        self.unicode_filenames.append(decoded)
3825        self.unicode_filenames.append(Str(decoded))
3826        if os_helper.TESTFN_UNDECODABLE is not None:
3827            encoded = os_helper.TESTFN_UNDECODABLE
3828        else:
3829            encoded = os.fsencode(os_helper.TESTFN)
3830        self.bytes_filenames.append(encoded)
3831        self.bytes_filenames.append(bytearray(encoded))
3832        self.bytes_filenames.append(memoryview(encoded))
3833
3834        self.filenames = self.bytes_filenames + self.unicode_filenames
3835
3836    def test_oserror_filename(self):
3837        funcs = [
3838            (self.filenames, os.chdir,),
3839            (self.filenames, os.lstat,),
3840            (self.filenames, os.open, os.O_RDONLY),
3841            (self.filenames, os.rmdir,),
3842            (self.filenames, os.stat,),
3843            (self.filenames, os.unlink,),
3844        ]
3845        if sys.platform == "win32":
3846            funcs.extend((
3847                (self.bytes_filenames, os.rename, b"dst"),
3848                (self.bytes_filenames, os.replace, b"dst"),
3849                (self.unicode_filenames, os.rename, "dst"),
3850                (self.unicode_filenames, os.replace, "dst"),
3851                (self.unicode_filenames, os.listdir, ),
3852            ))
3853        else:
3854            funcs.extend((
3855                (self.filenames, os.listdir,),
3856                (self.filenames, os.rename, "dst"),
3857                (self.filenames, os.replace, "dst"),
3858            ))
3859        if os_helper.can_chmod():
3860            funcs.append((self.filenames, os.chmod, 0o777))
3861        if hasattr(os, "chown"):
3862            funcs.append((self.filenames, os.chown, 0, 0))
3863        if hasattr(os, "lchown"):
3864            funcs.append((self.filenames, os.lchown, 0, 0))
3865        if hasattr(os, "truncate"):
3866            funcs.append((self.filenames, os.truncate, 0))
3867        if hasattr(os, "chflags"):
3868            funcs.append((self.filenames, os.chflags, 0))
3869        if hasattr(os, "lchflags"):
3870            funcs.append((self.filenames, os.lchflags, 0))
3871        if hasattr(os, "chroot"):
3872            funcs.append((self.filenames, os.chroot,))
3873        if hasattr(os, "link"):
3874            if sys.platform == "win32":
3875                funcs.append((self.bytes_filenames, os.link, b"dst"))
3876                funcs.append((self.unicode_filenames, os.link, "dst"))
3877            else:
3878                funcs.append((self.filenames, os.link, "dst"))
3879        if hasattr(os, "listxattr"):
3880            funcs.extend((
3881                (self.filenames, os.listxattr,),
3882                (self.filenames, os.getxattr, "user.test"),
3883                (self.filenames, os.setxattr, "user.test", b'user'),
3884                (self.filenames, os.removexattr, "user.test"),
3885            ))
3886        if hasattr(os, "lchmod"):
3887            funcs.append((self.filenames, os.lchmod, 0o777))
3888        if hasattr(os, "readlink"):
3889            funcs.append((self.filenames, os.readlink,))
3890
3891
3892        for filenames, func, *func_args in funcs:
3893            for name in filenames:
3894                try:
3895                    if isinstance(name, (str, bytes)):
3896                        func(name, *func_args)
3897                    else:
3898                        with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3899                            func(name, *func_args)
3900                except OSError as err:
3901                    self.assertIs(err.filename, name, str(func))
3902                except UnicodeDecodeError:
3903                    pass
3904                else:
3905                    self.fail("No exception thrown by {}".format(func))
3906
3907class CPUCountTests(unittest.TestCase):
3908    def test_cpu_count(self):
3909        cpus = os.cpu_count()
3910        if cpus is not None:
3911            self.assertIsInstance(cpus, int)
3912            self.assertGreater(cpus, 0)
3913        else:
3914            self.skipTest("Could not determine the number of CPUs")
3915
3916
3917# FD inheritance check is only useful for systems with process support.
3918@support.requires_subprocess()
3919class FDInheritanceTests(unittest.TestCase):
3920    def test_get_set_inheritable(self):
3921        fd = os.open(__file__, os.O_RDONLY)
3922        self.addCleanup(os.close, fd)
3923        self.assertEqual(os.get_inheritable(fd), False)
3924
3925        os.set_inheritable(fd, True)
3926        self.assertEqual(os.get_inheritable(fd), True)
3927
3928    @unittest.skipIf(fcntl is None, "need fcntl")
3929    def test_get_inheritable_cloexec(self):
3930        fd = os.open(__file__, os.O_RDONLY)
3931        self.addCleanup(os.close, fd)
3932        self.assertEqual(os.get_inheritable(fd), False)
3933
3934        # clear FD_CLOEXEC flag
3935        flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3936        flags &= ~fcntl.FD_CLOEXEC
3937        fcntl.fcntl(fd, fcntl.F_SETFD, flags)
3938
3939        self.assertEqual(os.get_inheritable(fd), True)
3940
3941    @unittest.skipIf(fcntl is None, "need fcntl")
3942    def test_set_inheritable_cloexec(self):
3943        fd = os.open(__file__, os.O_RDONLY)
3944        self.addCleanup(os.close, fd)
3945        self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3946                         fcntl.FD_CLOEXEC)
3947
3948        os.set_inheritable(fd, True)
3949        self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3950                         0)
3951
3952    @unittest.skipUnless(hasattr(os, 'O_PATH'), "need os.O_PATH")
3953    def test_get_set_inheritable_o_path(self):
3954        fd = os.open(__file__, os.O_PATH)
3955        self.addCleanup(os.close, fd)
3956        self.assertEqual(os.get_inheritable(fd), False)
3957
3958        os.set_inheritable(fd, True)
3959        self.assertEqual(os.get_inheritable(fd), True)
3960
3961        os.set_inheritable(fd, False)
3962        self.assertEqual(os.get_inheritable(fd), False)
3963
3964    def test_get_set_inheritable_badf(self):
3965        fd = os_helper.make_bad_fd()
3966
3967        with self.assertRaises(OSError) as ctx:
3968            os.get_inheritable(fd)
3969        self.assertEqual(ctx.exception.errno, errno.EBADF)
3970
3971        with self.assertRaises(OSError) as ctx:
3972            os.set_inheritable(fd, True)
3973        self.assertEqual(ctx.exception.errno, errno.EBADF)
3974
3975        with self.assertRaises(OSError) as ctx:
3976            os.set_inheritable(fd, False)
3977        self.assertEqual(ctx.exception.errno, errno.EBADF)
3978
3979    def test_open(self):
3980        fd = os.open(__file__, os.O_RDONLY)
3981        self.addCleanup(os.close, fd)
3982        self.assertEqual(os.get_inheritable(fd), False)
3983
3984    @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3985    def test_pipe(self):
3986        rfd, wfd = os.pipe()
3987        self.addCleanup(os.close, rfd)
3988        self.addCleanup(os.close, wfd)
3989        self.assertEqual(os.get_inheritable(rfd), False)
3990        self.assertEqual(os.get_inheritable(wfd), False)
3991
3992    def test_dup(self):
3993        fd1 = os.open(__file__, os.O_RDONLY)
3994        self.addCleanup(os.close, fd1)
3995
3996        fd2 = os.dup(fd1)
3997        self.addCleanup(os.close, fd2)
3998        self.assertEqual(os.get_inheritable(fd2), False)
3999
4000    def test_dup_standard_stream(self):
4001        fd = os.dup(1)
4002        self.addCleanup(os.close, fd)
4003        self.assertGreater(fd, 0)
4004
4005    @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
4006    def test_dup_nul(self):
4007        # os.dup() was creating inheritable fds for character files.
4008        fd1 = os.open('NUL', os.O_RDONLY)
4009        self.addCleanup(os.close, fd1)
4010        fd2 = os.dup(fd1)
4011        self.addCleanup(os.close, fd2)
4012        self.assertFalse(os.get_inheritable(fd2))
4013
4014    @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
4015    def test_dup2(self):
4016        fd = os.open(__file__, os.O_RDONLY)
4017        self.addCleanup(os.close, fd)
4018
4019        # inheritable by default
4020        fd2 = os.open(__file__, os.O_RDONLY)
4021        self.addCleanup(os.close, fd2)
4022        self.assertEqual(os.dup2(fd, fd2), fd2)
4023        self.assertTrue(os.get_inheritable(fd2))
4024
4025        # force non-inheritable
4026        fd3 = os.open(__file__, os.O_RDONLY)
4027        self.addCleanup(os.close, fd3)
4028        self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
4029        self.assertFalse(os.get_inheritable(fd3))
4030
4031    @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
4032    def test_openpty(self):
4033        master_fd, slave_fd = os.openpty()
4034        self.addCleanup(os.close, master_fd)
4035        self.addCleanup(os.close, slave_fd)
4036        self.assertEqual(os.get_inheritable(master_fd), False)
4037        self.assertEqual(os.get_inheritable(slave_fd), False)
4038
4039
4040class PathTConverterTests(unittest.TestCase):
4041    # tuples of (function name, allows fd arguments, additional arguments to
4042    # function, cleanup function)
4043    functions = [
4044        ('stat', True, (), None),
4045        ('lstat', False, (), None),
4046        ('access', False, (os.F_OK,), None),
4047        ('chflags', False, (0,), None),
4048        ('lchflags', False, (0,), None),
4049        ('open', False, (os.O_RDONLY,), getattr(os, 'close', None)),
4050    ]
4051
4052    def test_path_t_converter(self):
4053        str_filename = os_helper.TESTFN
4054        if os.name == 'nt':
4055            bytes_fspath = bytes_filename = None
4056        else:
4057            bytes_filename = os.fsencode(os_helper.TESTFN)
4058            bytes_fspath = FakePath(bytes_filename)
4059        fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
4060        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
4061        self.addCleanup(os.close, fd)
4062
4063        int_fspath = FakePath(fd)
4064        str_fspath = FakePath(str_filename)
4065
4066        for name, allow_fd, extra_args, cleanup_fn in self.functions:
4067            with self.subTest(name=name):
4068                try:
4069                    fn = getattr(os, name)
4070                except AttributeError:
4071                    continue
4072
4073                for path in (str_filename, bytes_filename, str_fspath,
4074                             bytes_fspath):
4075                    if path is None:
4076                        continue
4077                    with self.subTest(name=name, path=path):
4078                        result = fn(path, *extra_args)
4079                        if cleanup_fn is not None:
4080                            cleanup_fn(result)
4081
4082                with self.assertRaisesRegex(
4083                        TypeError, 'to return str or bytes'):
4084                    fn(int_fspath, *extra_args)
4085
4086                if allow_fd:
4087                    result = fn(fd, *extra_args)  # should not fail
4088                    if cleanup_fn is not None:
4089                        cleanup_fn(result)
4090                else:
4091                    with self.assertRaisesRegex(
4092                            TypeError,
4093                            'os.PathLike'):
4094                        fn(fd, *extra_args)
4095
4096    def test_path_t_converter_and_custom_class(self):
4097        msg = r'__fspath__\(\) to return str or bytes, not %s'
4098        with self.assertRaisesRegex(TypeError, msg % r'int'):
4099            os.stat(FakePath(2))
4100        with self.assertRaisesRegex(TypeError, msg % r'float'):
4101            os.stat(FakePath(2.34))
4102        with self.assertRaisesRegex(TypeError, msg % r'object'):
4103            os.stat(FakePath(object()))
4104
4105
4106@unittest.skipUnless(hasattr(os, 'get_blocking'),
4107                     'needs os.get_blocking() and os.set_blocking()')
4108@unittest.skipIf(support.is_emscripten, "Cannot unset blocking flag")
4109class BlockingTests(unittest.TestCase):
4110    def test_blocking(self):
4111        fd = os.open(__file__, os.O_RDONLY)
4112        self.addCleanup(os.close, fd)
4113        self.assertEqual(os.get_blocking(fd), True)
4114
4115        os.set_blocking(fd, False)
4116        self.assertEqual(os.get_blocking(fd), False)
4117
4118        os.set_blocking(fd, True)
4119        self.assertEqual(os.get_blocking(fd), True)
4120
4121
4122
4123class ExportsTests(unittest.TestCase):
4124    def test_os_all(self):
4125        self.assertIn('open', os.__all__)
4126        self.assertIn('walk', os.__all__)
4127
4128
4129class TestDirEntry(unittest.TestCase):
4130    def setUp(self):
4131        self.path = os.path.realpath(os_helper.TESTFN)
4132        self.addCleanup(os_helper.rmtree, self.path)
4133        os.mkdir(self.path)
4134
4135    def test_uninstantiable(self):
4136        self.assertRaises(TypeError, os.DirEntry)
4137
4138    def test_unpickable(self):
4139        filename = create_file(os.path.join(self.path, "file.txt"), b'python')
4140        entry = [entry for entry in os.scandir(self.path)].pop()
4141        self.assertIsInstance(entry, os.DirEntry)
4142        self.assertEqual(entry.name, "file.txt")
4143        import pickle
4144        self.assertRaises(TypeError, pickle.dumps, entry, filename)
4145
4146
4147class TestScandir(unittest.TestCase):
4148    check_no_resource_warning = warnings_helper.check_no_resource_warning
4149
4150    def setUp(self):
4151        self.path = os.path.realpath(os_helper.TESTFN)
4152        self.bytes_path = os.fsencode(self.path)
4153        self.addCleanup(os_helper.rmtree, self.path)
4154        os.mkdir(self.path)
4155
4156    def create_file(self, name="file.txt"):
4157        path = self.bytes_path if isinstance(name, bytes) else self.path
4158        filename = os.path.join(path, name)
4159        create_file(filename, b'python')
4160        return filename
4161
4162    def get_entries(self, names):
4163        entries = dict((entry.name, entry)
4164                       for entry in os.scandir(self.path))
4165        self.assertEqual(sorted(entries.keys()), names)
4166        return entries
4167
4168    def assert_stat_equal(self, stat1, stat2, skip_fields):
4169        if skip_fields:
4170            for attr in dir(stat1):
4171                if not attr.startswith("st_"):
4172                    continue
4173                if attr in ("st_dev", "st_ino", "st_nlink"):
4174                    continue
4175                self.assertEqual(getattr(stat1, attr),
4176                                 getattr(stat2, attr),
4177                                 (stat1, stat2, attr))
4178        else:
4179            self.assertEqual(stat1, stat2)
4180
4181    def test_uninstantiable(self):
4182        scandir_iter = os.scandir(self.path)
4183        self.assertRaises(TypeError, type(scandir_iter))
4184        scandir_iter.close()
4185
4186    def test_unpickable(self):
4187        filename = self.create_file("file.txt")
4188        scandir_iter = os.scandir(self.path)
4189        import pickle
4190        self.assertRaises(TypeError, pickle.dumps, scandir_iter, filename)
4191        scandir_iter.close()
4192
4193    def check_entry(self, entry, name, is_dir, is_file, is_symlink):
4194        self.assertIsInstance(entry, os.DirEntry)
4195        self.assertEqual(entry.name, name)
4196        self.assertEqual(entry.path, os.path.join(self.path, name))
4197        self.assertEqual(entry.inode(),
4198                         os.stat(entry.path, follow_symlinks=False).st_ino)
4199
4200        entry_stat = os.stat(entry.path)
4201        self.assertEqual(entry.is_dir(),
4202                         stat.S_ISDIR(entry_stat.st_mode))
4203        self.assertEqual(entry.is_file(),
4204                         stat.S_ISREG(entry_stat.st_mode))
4205        self.assertEqual(entry.is_symlink(),
4206                         os.path.islink(entry.path))
4207
4208        entry_lstat = os.stat(entry.path, follow_symlinks=False)
4209        self.assertEqual(entry.is_dir(follow_symlinks=False),
4210                         stat.S_ISDIR(entry_lstat.st_mode))
4211        self.assertEqual(entry.is_file(follow_symlinks=False),
4212                         stat.S_ISREG(entry_lstat.st_mode))
4213
4214        self.assert_stat_equal(entry.stat(),
4215                               entry_stat,
4216                               os.name == 'nt' and not is_symlink)
4217        self.assert_stat_equal(entry.stat(follow_symlinks=False),
4218                               entry_lstat,
4219                               os.name == 'nt')
4220
4221    def test_attributes(self):
4222        link = hasattr(os, 'link')
4223        symlink = os_helper.can_symlink()
4224
4225        dirname = os.path.join(self.path, "dir")
4226        os.mkdir(dirname)
4227        filename = self.create_file("file.txt")
4228        if link:
4229            try:
4230                os.link(filename, os.path.join(self.path, "link_file.txt"))
4231            except PermissionError as e:
4232                self.skipTest('os.link(): %s' % e)
4233        if symlink:
4234            os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
4235                       target_is_directory=True)
4236            os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
4237
4238        names = ['dir', 'file.txt']
4239        if link:
4240            names.append('link_file.txt')
4241        if symlink:
4242            names.extend(('symlink_dir', 'symlink_file.txt'))
4243        entries = self.get_entries(names)
4244
4245        entry = entries['dir']
4246        self.check_entry(entry, 'dir', True, False, False)
4247
4248        entry = entries['file.txt']
4249        self.check_entry(entry, 'file.txt', False, True, False)
4250
4251        if link:
4252            entry = entries['link_file.txt']
4253            self.check_entry(entry, 'link_file.txt', False, True, False)
4254
4255        if symlink:
4256            entry = entries['symlink_dir']
4257            self.check_entry(entry, 'symlink_dir', True, False, True)
4258
4259            entry = entries['symlink_file.txt']
4260            self.check_entry(entry, 'symlink_file.txt', False, True, True)
4261
4262    def get_entry(self, name):
4263        path = self.bytes_path if isinstance(name, bytes) else self.path
4264        entries = list(os.scandir(path))
4265        self.assertEqual(len(entries), 1)
4266
4267        entry = entries[0]
4268        self.assertEqual(entry.name, name)
4269        return entry
4270
4271    def create_file_entry(self, name='file.txt'):
4272        filename = self.create_file(name=name)
4273        return self.get_entry(os.path.basename(filename))
4274
4275    def test_current_directory(self):
4276        filename = self.create_file()
4277        old_dir = os.getcwd()
4278        try:
4279            os.chdir(self.path)
4280
4281            # call scandir() without parameter: it must list the content
4282            # of the current directory
4283            entries = dict((entry.name, entry) for entry in os.scandir())
4284            self.assertEqual(sorted(entries.keys()),
4285                             [os.path.basename(filename)])
4286        finally:
4287            os.chdir(old_dir)
4288
4289    def test_repr(self):
4290        entry = self.create_file_entry()
4291        self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
4292
4293    def test_fspath_protocol(self):
4294        entry = self.create_file_entry()
4295        self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
4296
4297    def test_fspath_protocol_bytes(self):
4298        bytes_filename = os.fsencode('bytesfile.txt')
4299        bytes_entry = self.create_file_entry(name=bytes_filename)
4300        fspath = os.fspath(bytes_entry)
4301        self.assertIsInstance(fspath, bytes)
4302        self.assertEqual(fspath,
4303                         os.path.join(os.fsencode(self.path),bytes_filename))
4304
4305    def test_removed_dir(self):
4306        path = os.path.join(self.path, 'dir')
4307
4308        os.mkdir(path)
4309        entry = self.get_entry('dir')
4310        os.rmdir(path)
4311
4312        # On POSIX, is_dir() result depends if scandir() filled d_type or not
4313        if os.name == 'nt':
4314            self.assertTrue(entry.is_dir())
4315        self.assertFalse(entry.is_file())
4316        self.assertFalse(entry.is_symlink())
4317        if os.name == 'nt':
4318            self.assertRaises(FileNotFoundError, entry.inode)
4319            # don't fail
4320            entry.stat()
4321            entry.stat(follow_symlinks=False)
4322        else:
4323            self.assertGreater(entry.inode(), 0)
4324            self.assertRaises(FileNotFoundError, entry.stat)
4325            self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4326
4327    def test_removed_file(self):
4328        entry = self.create_file_entry()
4329        os.unlink(entry.path)
4330
4331        self.assertFalse(entry.is_dir())
4332        # On POSIX, is_dir() result depends if scandir() filled d_type or not
4333        if os.name == 'nt':
4334            self.assertTrue(entry.is_file())
4335        self.assertFalse(entry.is_symlink())
4336        if os.name == 'nt':
4337            self.assertRaises(FileNotFoundError, entry.inode)
4338            # don't fail
4339            entry.stat()
4340            entry.stat(follow_symlinks=False)
4341        else:
4342            self.assertGreater(entry.inode(), 0)
4343            self.assertRaises(FileNotFoundError, entry.stat)
4344            self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4345
4346    def test_broken_symlink(self):
4347        if not os_helper.can_symlink():
4348            return self.skipTest('cannot create symbolic link')
4349
4350        filename = self.create_file("file.txt")
4351        os.symlink(filename,
4352                   os.path.join(self.path, "symlink.txt"))
4353        entries = self.get_entries(['file.txt', 'symlink.txt'])
4354        entry = entries['symlink.txt']
4355        os.unlink(filename)
4356
4357        self.assertGreater(entry.inode(), 0)
4358        self.assertFalse(entry.is_dir())
4359        self.assertFalse(entry.is_file())  # broken symlink returns False
4360        self.assertFalse(entry.is_dir(follow_symlinks=False))
4361        self.assertFalse(entry.is_file(follow_symlinks=False))
4362        self.assertTrue(entry.is_symlink())
4363        self.assertRaises(FileNotFoundError, entry.stat)
4364        # don't fail
4365        entry.stat(follow_symlinks=False)
4366
4367    def test_bytes(self):
4368        self.create_file("file.txt")
4369
4370        path_bytes = os.fsencode(self.path)
4371        entries = list(os.scandir(path_bytes))
4372        self.assertEqual(len(entries), 1, entries)
4373        entry = entries[0]
4374
4375        self.assertEqual(entry.name, b'file.txt')
4376        self.assertEqual(entry.path,
4377                         os.fsencode(os.path.join(self.path, 'file.txt')))
4378
4379    def test_bytes_like(self):
4380        self.create_file("file.txt")
4381
4382        for cls in bytearray, memoryview:
4383            path_bytes = cls(os.fsencode(self.path))
4384            with self.assertWarns(DeprecationWarning):
4385                entries = list(os.scandir(path_bytes))
4386            self.assertEqual(len(entries), 1, entries)
4387            entry = entries[0]
4388
4389            self.assertEqual(entry.name, b'file.txt')
4390            self.assertEqual(entry.path,
4391                             os.fsencode(os.path.join(self.path, 'file.txt')))
4392            self.assertIs(type(entry.name), bytes)
4393            self.assertIs(type(entry.path), bytes)
4394
4395    @unittest.skipUnless(os.listdir in os.supports_fd,
4396                         'fd support for listdir required for this test.')
4397    def test_fd(self):
4398        self.assertIn(os.scandir, os.supports_fd)
4399        self.create_file('file.txt')
4400        expected_names = ['file.txt']
4401        if os_helper.can_symlink():
4402            os.symlink('file.txt', os.path.join(self.path, 'link'))
4403            expected_names.append('link')
4404
4405        with os_helper.open_dir_fd(self.path) as fd:
4406            with os.scandir(fd) as it:
4407                entries = list(it)
4408            names = [entry.name for entry in entries]
4409            self.assertEqual(sorted(names), expected_names)
4410            self.assertEqual(names, os.listdir(fd))
4411            for entry in entries:
4412                self.assertEqual(entry.path, entry.name)
4413                self.assertEqual(os.fspath(entry), entry.name)
4414                self.assertEqual(entry.is_symlink(), entry.name == 'link')
4415                if os.stat in os.supports_dir_fd:
4416                    st = os.stat(entry.name, dir_fd=fd)
4417                    self.assertEqual(entry.stat(), st)
4418                    st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
4419                    self.assertEqual(entry.stat(follow_symlinks=False), st)
4420
4421    @unittest.skipIf(support.is_wasi, "WASI maps '' to cwd")
4422    def test_empty_path(self):
4423        self.assertRaises(FileNotFoundError, os.scandir, '')
4424
4425    def test_consume_iterator_twice(self):
4426        self.create_file("file.txt")
4427        iterator = os.scandir(self.path)
4428
4429        entries = list(iterator)
4430        self.assertEqual(len(entries), 1, entries)
4431
4432        # check than consuming the iterator twice doesn't raise exception
4433        entries2 = list(iterator)
4434        self.assertEqual(len(entries2), 0, entries2)
4435
4436    def test_bad_path_type(self):
4437        for obj in [1.234, {}, []]:
4438            self.assertRaises(TypeError, os.scandir, obj)
4439
4440    def test_close(self):
4441        self.create_file("file.txt")
4442        self.create_file("file2.txt")
4443        iterator = os.scandir(self.path)
4444        next(iterator)
4445        iterator.close()
4446        # multiple closes
4447        iterator.close()
4448        with self.check_no_resource_warning():
4449            del iterator
4450
4451    def test_context_manager(self):
4452        self.create_file("file.txt")
4453        self.create_file("file2.txt")
4454        with os.scandir(self.path) as iterator:
4455            next(iterator)
4456        with self.check_no_resource_warning():
4457            del iterator
4458
4459    def test_context_manager_close(self):
4460        self.create_file("file.txt")
4461        self.create_file("file2.txt")
4462        with os.scandir(self.path) as iterator:
4463            next(iterator)
4464            iterator.close()
4465
4466    def test_context_manager_exception(self):
4467        self.create_file("file.txt")
4468        self.create_file("file2.txt")
4469        with self.assertRaises(ZeroDivisionError):
4470            with os.scandir(self.path) as iterator:
4471                next(iterator)
4472                1/0
4473        with self.check_no_resource_warning():
4474            del iterator
4475
4476    def test_resource_warning(self):
4477        self.create_file("file.txt")
4478        self.create_file("file2.txt")
4479        iterator = os.scandir(self.path)
4480        next(iterator)
4481        with self.assertWarns(ResourceWarning):
4482            del iterator
4483            support.gc_collect()
4484        # exhausted iterator
4485        iterator = os.scandir(self.path)
4486        list(iterator)
4487        with self.check_no_resource_warning():
4488            del iterator
4489
4490
4491class TestPEP519(unittest.TestCase):
4492
4493    # Abstracted so it can be overridden to test pure Python implementation
4494    # if a C version is provided.
4495    fspath = staticmethod(os.fspath)
4496
4497    def test_return_bytes(self):
4498        for b in b'hello', b'goodbye', b'some/path/and/file':
4499            self.assertEqual(b, self.fspath(b))
4500
4501    def test_return_string(self):
4502        for s in 'hello', 'goodbye', 'some/path/and/file':
4503            self.assertEqual(s, self.fspath(s))
4504
4505    def test_fsencode_fsdecode(self):
4506        for p in "path/like/object", b"path/like/object":
4507            pathlike = FakePath(p)
4508
4509            self.assertEqual(p, self.fspath(pathlike))
4510            self.assertEqual(b"path/like/object", os.fsencode(pathlike))
4511            self.assertEqual("path/like/object", os.fsdecode(pathlike))
4512
4513    def test_pathlike(self):
4514        self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
4515        self.assertTrue(issubclass(FakePath, os.PathLike))
4516        self.assertTrue(isinstance(FakePath('x'), os.PathLike))
4517
4518    def test_garbage_in_exception_out(self):
4519        vapor = type('blah', (), {})
4520        for o in int, type, os, vapor():
4521            self.assertRaises(TypeError, self.fspath, o)
4522
4523    def test_argument_required(self):
4524        self.assertRaises(TypeError, self.fspath)
4525
4526    def test_bad_pathlike(self):
4527        # __fspath__ returns a value other than str or bytes.
4528        self.assertRaises(TypeError, self.fspath, FakePath(42))
4529        # __fspath__ attribute that is not callable.
4530        c = type('foo', (), {})
4531        c.__fspath__ = 1
4532        self.assertRaises(TypeError, self.fspath, c())
4533        # __fspath__ raises an exception.
4534        self.assertRaises(ZeroDivisionError, self.fspath,
4535                          FakePath(ZeroDivisionError()))
4536
4537    def test_pathlike_subclasshook(self):
4538        # bpo-38878: subclasshook causes subclass checks
4539        # true on abstract implementation.
4540        class A(os.PathLike):
4541            pass
4542        self.assertFalse(issubclass(FakePath, A))
4543        self.assertTrue(issubclass(FakePath, os.PathLike))
4544
4545    def test_pathlike_class_getitem(self):
4546        self.assertIsInstance(os.PathLike[bytes], types.GenericAlias)
4547
4548
4549class TimesTests(unittest.TestCase):
4550    def test_times(self):
4551        times = os.times()
4552        self.assertIsInstance(times, os.times_result)
4553
4554        for field in ('user', 'system', 'children_user', 'children_system',
4555                      'elapsed'):
4556            value = getattr(times, field)
4557            self.assertIsInstance(value, float)
4558
4559        if os.name == 'nt':
4560            self.assertEqual(times.children_user, 0)
4561            self.assertEqual(times.children_system, 0)
4562            self.assertEqual(times.elapsed, 0)
4563
4564
4565@support.requires_fork()
4566class ForkTests(unittest.TestCase):
4567    def test_fork(self):
4568        # bpo-42540: ensure os.fork() with non-default memory allocator does
4569        # not crash on exit.
4570        code = """if 1:
4571            import os
4572            from test import support
4573            pid = os.fork()
4574            if pid != 0:
4575                support.wait_process(pid, exitcode=0)
4576        """
4577        assert_python_ok("-c", code)
4578        assert_python_ok("-c", code, PYTHONMALLOC="malloc_debug")
4579
4580
4581# Only test if the C version is provided, otherwise TestPEP519 already tested
4582# the pure Python implementation.
4583if hasattr(os, "_fspath"):
4584    class TestPEP519PurePython(TestPEP519):
4585
4586        """Explicitly test the pure Python implementation of os.fspath()."""
4587
4588        fspath = staticmethod(os._fspath)
4589
4590
4591if __name__ == "__main__":
4592    unittest.main()
4593