1# As a test suite for the os module, this is woefully inadequate, but this 2# does add tests for a few functions which have been determined to be more 3# portable than they had been thought to be. 4 5import asyncio 6import codecs 7import contextlib 8import decimal 9import errno 10import fnmatch 11import fractions 12import itertools 13import locale 14import os 15import pickle 16import select 17import shutil 18import signal 19import socket 20import stat 21import struct 22import subprocess 23import sys 24import sysconfig 25import tempfile 26import textwrap 27import time 28import types 29import unittest 30import uuid 31import warnings 32from test import support 33from test.support import import_helper 34from test.support import os_helper 35from test.support import socket_helper 36from test.support import warnings_helper 37from platform import win32_is_iot 38 39try: 40 import resource 41except ImportError: 42 resource = None 43try: 44 import fcntl 45except ImportError: 46 fcntl = None 47try: 48 import _winapi 49except ImportError: 50 _winapi = None 51try: 52 import pwd 53 all_users = [u.pw_uid for u in pwd.getpwall()] 54except (ImportError, AttributeError): 55 all_users = [] 56try: 57 from _testcapi import INT_MAX, PY_SSIZE_T_MAX 58except ImportError: 59 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize 60 61try: 62 import mmap 63except ImportError: 64 mmap = None 65 66from test.support.script_helper import assert_python_ok 67from test.support import unix_shell 68from test.support.os_helper import FakePath 69 70 71root_in_posix = False 72if hasattr(os, 'geteuid'): 73 root_in_posix = (os.geteuid() == 0) 74 75# Detect whether we're on a Linux system that uses the (now outdated 76# and unmaintained) linuxthreads threading library. There's an issue 77# when combining linuxthreads with a failed execv call: see 78# http://bugs.python.org/issue4970. 79if hasattr(sys, 'thread_info') and sys.thread_info.version: 80 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads") 81else: 82 USING_LINUXTHREADS = False 83 84# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group. 85HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0 86 87 88def requires_os_func(name): 89 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name) 90 91 92def create_file(filename, content=b'content'): 93 with open(filename, "xb", 0) as fp: 94 fp.write(content) 95 96 97# bpo-41625: On AIX, splice() only works with a socket, not with a pipe. 98requires_splice_pipe = unittest.skipIf(sys.platform.startswith("aix"), 99 'on AIX, splice() only accepts sockets') 100 101 102def tearDownModule(): 103 asyncio.set_event_loop_policy(None) 104 105 106class MiscTests(unittest.TestCase): 107 def test_getcwd(self): 108 cwd = os.getcwd() 109 self.assertIsInstance(cwd, str) 110 111 def test_getcwd_long_path(self): 112 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On 113 # Windows, MAX_PATH is defined as 260 characters, but Windows supports 114 # longer path if longer paths support is enabled. Internally, the os 115 # module uses MAXPATHLEN which is at least 1024. 116 # 117 # Use a directory name of 200 characters to fit into Windows MAX_PATH 118 # limit. 119 # 120 # On Windows, the test can stop when trying to create a path longer 121 # than MAX_PATH if long paths support is disabled: 122 # see RtlAreLongPathsEnabled(). 123 min_len = 2000 # characters 124 # On VxWorks, PATH_MAX is defined as 1024 bytes. Creating a path 125 # longer than PATH_MAX will fail. 126 if sys.platform == 'vxworks': 127 min_len = 1000 128 dirlen = 200 # characters 129 dirname = 'python_test_dir_' 130 dirname = dirname + ('a' * (dirlen - len(dirname))) 131 132 with tempfile.TemporaryDirectory() as tmpdir: 133 with os_helper.change_cwd(tmpdir) as path: 134 expected = path 135 136 while True: 137 cwd = os.getcwd() 138 self.assertEqual(cwd, expected) 139 140 need = min_len - (len(cwd) + len(os.path.sep)) 141 if need <= 0: 142 break 143 if len(dirname) > need and need > 0: 144 dirname = dirname[:need] 145 146 path = os.path.join(path, dirname) 147 try: 148 os.mkdir(path) 149 # On Windows, chdir() can fail 150 # even if mkdir() succeeded 151 os.chdir(path) 152 except FileNotFoundError: 153 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and 154 # ERROR_FILENAME_EXCED_RANGE (206) errors 155 # ("The filename or extension is too long") 156 break 157 except OSError as exc: 158 if exc.errno == errno.ENAMETOOLONG: 159 break 160 else: 161 raise 162 163 expected = path 164 165 if support.verbose: 166 print(f"Tested current directory length: {len(cwd)}") 167 168 def test_getcwdb(self): 169 cwd = os.getcwdb() 170 self.assertIsInstance(cwd, bytes) 171 self.assertEqual(os.fsdecode(cwd), os.getcwd()) 172 173 174# Tests creating TESTFN 175class FileTests(unittest.TestCase): 176 def setUp(self): 177 if os.path.lexists(os_helper.TESTFN): 178 os.unlink(os_helper.TESTFN) 179 tearDown = setUp 180 181 def test_access(self): 182 f = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR) 183 os.close(f) 184 self.assertTrue(os.access(os_helper.TESTFN, os.W_OK)) 185 186 @unittest.skipIf( 187 support.is_emscripten, "Test is unstable under Emscripten." 188 ) 189 @unittest.skipIf( 190 support.is_wasi, "WASI does not support dup." 191 ) 192 def test_closerange(self): 193 first = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR) 194 # We must allocate two consecutive file descriptors, otherwise 195 # it will mess up other file descriptors (perhaps even the three 196 # standard ones). 197 second = os.dup(first) 198 try: 199 retries = 0 200 while second != first + 1: 201 os.close(first) 202 retries += 1 203 if retries > 10: 204 # XXX test skipped 205 self.skipTest("couldn't allocate two consecutive fds") 206 first, second = second, os.dup(second) 207 finally: 208 os.close(second) 209 # close a fd that is open, and one that isn't 210 os.closerange(first, first + 2) 211 self.assertRaises(OSError, os.write, first, b"a") 212 213 @support.cpython_only 214 def test_rename(self): 215 path = os_helper.TESTFN 216 old = sys.getrefcount(path) 217 self.assertRaises(TypeError, os.rename, path, 0) 218 new = sys.getrefcount(path) 219 self.assertEqual(old, new) 220 221 def test_read(self): 222 with open(os_helper.TESTFN, "w+b") as fobj: 223 fobj.write(b"spam") 224 fobj.flush() 225 fd = fobj.fileno() 226 os.lseek(fd, 0, 0) 227 s = os.read(fd, 4) 228 self.assertEqual(type(s), bytes) 229 self.assertEqual(s, b"spam") 230 231 @support.cpython_only 232 # Skip the test on 32-bit platforms: the number of bytes must fit in a 233 # Py_ssize_t type 234 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX, 235 "needs INT_MAX < PY_SSIZE_T_MAX") 236 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False) 237 def test_large_read(self, size): 238 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 239 create_file(os_helper.TESTFN, b'test') 240 241 # Issue #21932: Make sure that os.read() does not raise an 242 # OverflowError for size larger than INT_MAX 243 with open(os_helper.TESTFN, "rb") as fp: 244 data = os.read(fp.fileno(), size) 245 246 # The test does not try to read more than 2 GiB at once because the 247 # operating system is free to return less bytes than requested. 248 self.assertEqual(data, b'test') 249 250 def test_write(self): 251 # os.write() accepts bytes- and buffer-like objects but not strings 252 fd = os.open(os_helper.TESTFN, os.O_CREAT | os.O_WRONLY) 253 self.assertRaises(TypeError, os.write, fd, "beans") 254 os.write(fd, b"bacon\n") 255 os.write(fd, bytearray(b"eggs\n")) 256 os.write(fd, memoryview(b"spam\n")) 257 os.close(fd) 258 with open(os_helper.TESTFN, "rb") as fobj: 259 self.assertEqual(fobj.read().splitlines(), 260 [b"bacon", b"eggs", b"spam"]) 261 262 def write_windows_console(self, *args): 263 retcode = subprocess.call(args, 264 # use a new console to not flood the test output 265 creationflags=subprocess.CREATE_NEW_CONSOLE, 266 # use a shell to hide the console window (SW_HIDE) 267 shell=True) 268 self.assertEqual(retcode, 0) 269 270 @unittest.skipUnless(sys.platform == 'win32', 271 'test specific to the Windows console') 272 def test_write_windows_console(self): 273 # Issue #11395: the Windows console returns an error (12: not enough 274 # space error) on writing into stdout if stdout mode is binary and the 275 # length is greater than 66,000 bytes (or less, depending on heap 276 # usage). 277 code = "print('x' * 100000)" 278 self.write_windows_console(sys.executable, "-c", code) 279 self.write_windows_console(sys.executable, "-u", "-c", code) 280 281 def fdopen_helper(self, *args): 282 fd = os.open(os_helper.TESTFN, os.O_RDONLY) 283 f = os.fdopen(fd, *args, encoding="utf-8") 284 f.close() 285 286 def test_fdopen(self): 287 fd = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR) 288 os.close(fd) 289 290 self.fdopen_helper() 291 self.fdopen_helper('r') 292 self.fdopen_helper('r', 100) 293 294 def test_replace(self): 295 TESTFN2 = os_helper.TESTFN + ".2" 296 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 297 self.addCleanup(os_helper.unlink, TESTFN2) 298 299 create_file(os_helper.TESTFN, b"1") 300 create_file(TESTFN2, b"2") 301 302 os.replace(os_helper.TESTFN, TESTFN2) 303 self.assertRaises(FileNotFoundError, os.stat, os_helper.TESTFN) 304 with open(TESTFN2, 'r', encoding='utf-8') as f: 305 self.assertEqual(f.read(), "1") 306 307 def test_open_keywords(self): 308 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777, 309 dir_fd=None) 310 os.close(f) 311 312 def test_symlink_keywords(self): 313 symlink = support.get_attribute(os, "symlink") 314 try: 315 symlink(src='target', dst=os_helper.TESTFN, 316 target_is_directory=False, dir_fd=None) 317 except (NotImplementedError, OSError): 318 pass # No OS support or unprivileged user 319 320 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()') 321 def test_copy_file_range_invalid_values(self): 322 with self.assertRaises(ValueError): 323 os.copy_file_range(0, 1, -10) 324 325 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()') 326 def test_copy_file_range(self): 327 TESTFN2 = os_helper.TESTFN + ".3" 328 data = b'0123456789' 329 330 create_file(os_helper.TESTFN, data) 331 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 332 333 in_file = open(os_helper.TESTFN, 'rb') 334 self.addCleanup(in_file.close) 335 in_fd = in_file.fileno() 336 337 out_file = open(TESTFN2, 'w+b') 338 self.addCleanup(os_helper.unlink, TESTFN2) 339 self.addCleanup(out_file.close) 340 out_fd = out_file.fileno() 341 342 try: 343 i = os.copy_file_range(in_fd, out_fd, 5) 344 except OSError as e: 345 # Handle the case in which Python was compiled 346 # in a system with the syscall but without support 347 # in the kernel. 348 if e.errno != errno.ENOSYS: 349 raise 350 self.skipTest(e) 351 else: 352 # The number of copied bytes can be less than 353 # the number of bytes originally requested. 354 self.assertIn(i, range(0, 6)); 355 356 with open(TESTFN2, 'rb') as in_file: 357 self.assertEqual(in_file.read(), data[:i]) 358 359 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()') 360 def test_copy_file_range_offset(self): 361 TESTFN4 = os_helper.TESTFN + ".4" 362 data = b'0123456789' 363 bytes_to_copy = 6 364 in_skip = 3 365 out_seek = 5 366 367 create_file(os_helper.TESTFN, data) 368 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 369 370 in_file = open(os_helper.TESTFN, 'rb') 371 self.addCleanup(in_file.close) 372 in_fd = in_file.fileno() 373 374 out_file = open(TESTFN4, 'w+b') 375 self.addCleanup(os_helper.unlink, TESTFN4) 376 self.addCleanup(out_file.close) 377 out_fd = out_file.fileno() 378 379 try: 380 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy, 381 offset_src=in_skip, 382 offset_dst=out_seek) 383 except OSError as e: 384 # Handle the case in which Python was compiled 385 # in a system with the syscall but without support 386 # in the kernel. 387 if e.errno != errno.ENOSYS: 388 raise 389 self.skipTest(e) 390 else: 391 # The number of copied bytes can be less than 392 # the number of bytes originally requested. 393 self.assertIn(i, range(0, bytes_to_copy+1)); 394 395 with open(TESTFN4, 'rb') as in_file: 396 read = in_file.read() 397 # seeked bytes (5) are zero'ed 398 self.assertEqual(read[:out_seek], b'\x00'*out_seek) 399 # 012 are skipped (in_skip) 400 # 345678 are copied in the file (in_skip + bytes_to_copy) 401 self.assertEqual(read[out_seek:], 402 data[in_skip:in_skip+i]) 403 404 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()') 405 def test_splice_invalid_values(self): 406 with self.assertRaises(ValueError): 407 os.splice(0, 1, -10) 408 409 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()') 410 @requires_splice_pipe 411 def test_splice(self): 412 TESTFN2 = os_helper.TESTFN + ".3" 413 data = b'0123456789' 414 415 create_file(os_helper.TESTFN, data) 416 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 417 418 in_file = open(os_helper.TESTFN, 'rb') 419 self.addCleanup(in_file.close) 420 in_fd = in_file.fileno() 421 422 read_fd, write_fd = os.pipe() 423 self.addCleanup(lambda: os.close(read_fd)) 424 self.addCleanup(lambda: os.close(write_fd)) 425 426 try: 427 i = os.splice(in_fd, write_fd, 5) 428 except OSError as e: 429 # Handle the case in which Python was compiled 430 # in a system with the syscall but without support 431 # in the kernel. 432 if e.errno != errno.ENOSYS: 433 raise 434 self.skipTest(e) 435 else: 436 # The number of copied bytes can be less than 437 # the number of bytes originally requested. 438 self.assertIn(i, range(0, 6)); 439 440 self.assertEqual(os.read(read_fd, 100), data[:i]) 441 442 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()') 443 @requires_splice_pipe 444 def test_splice_offset_in(self): 445 TESTFN4 = os_helper.TESTFN + ".4" 446 data = b'0123456789' 447 bytes_to_copy = 6 448 in_skip = 3 449 450 create_file(os_helper.TESTFN, data) 451 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 452 453 in_file = open(os_helper.TESTFN, 'rb') 454 self.addCleanup(in_file.close) 455 in_fd = in_file.fileno() 456 457 read_fd, write_fd = os.pipe() 458 self.addCleanup(lambda: os.close(read_fd)) 459 self.addCleanup(lambda: os.close(write_fd)) 460 461 try: 462 i = os.splice(in_fd, write_fd, bytes_to_copy, offset_src=in_skip) 463 except OSError as e: 464 # Handle the case in which Python was compiled 465 # in a system with the syscall but without support 466 # in the kernel. 467 if e.errno != errno.ENOSYS: 468 raise 469 self.skipTest(e) 470 else: 471 # The number of copied bytes can be less than 472 # the number of bytes originally requested. 473 self.assertIn(i, range(0, bytes_to_copy+1)); 474 475 read = os.read(read_fd, 100) 476 # 012 are skipped (in_skip) 477 # 345678 are copied in the file (in_skip + bytes_to_copy) 478 self.assertEqual(read, data[in_skip:in_skip+i]) 479 480 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()') 481 @requires_splice_pipe 482 def test_splice_offset_out(self): 483 TESTFN4 = os_helper.TESTFN + ".4" 484 data = b'0123456789' 485 bytes_to_copy = 6 486 out_seek = 3 487 488 create_file(os_helper.TESTFN, data) 489 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 490 491 read_fd, write_fd = os.pipe() 492 self.addCleanup(lambda: os.close(read_fd)) 493 self.addCleanup(lambda: os.close(write_fd)) 494 os.write(write_fd, data) 495 496 out_file = open(TESTFN4, 'w+b') 497 self.addCleanup(os_helper.unlink, TESTFN4) 498 self.addCleanup(out_file.close) 499 out_fd = out_file.fileno() 500 501 try: 502 i = os.splice(read_fd, out_fd, bytes_to_copy, offset_dst=out_seek) 503 except OSError as e: 504 # Handle the case in which Python was compiled 505 # in a system with the syscall but without support 506 # in the kernel. 507 if e.errno != errno.ENOSYS: 508 raise 509 self.skipTest(e) 510 else: 511 # The number of copied bytes can be less than 512 # the number of bytes originally requested. 513 self.assertIn(i, range(0, bytes_to_copy+1)); 514 515 with open(TESTFN4, 'rb') as in_file: 516 read = in_file.read() 517 # seeked bytes (5) are zero'ed 518 self.assertEqual(read[:out_seek], b'\x00'*out_seek) 519 # 012 are skipped (in_skip) 520 # 345678 are copied in the file (in_skip + bytes_to_copy) 521 self.assertEqual(read[out_seek:], data[:i]) 522 523 524# Test attributes on return values from os.*stat* family. 525class StatAttributeTests(unittest.TestCase): 526 def setUp(self): 527 self.fname = os_helper.TESTFN 528 self.addCleanup(os_helper.unlink, self.fname) 529 create_file(self.fname, b"ABC") 530 531 def check_stat_attributes(self, fname): 532 result = os.stat(fname) 533 534 # Make sure direct access works 535 self.assertEqual(result[stat.ST_SIZE], 3) 536 self.assertEqual(result.st_size, 3) 537 538 # Make sure all the attributes are there 539 members = dir(result) 540 for name in dir(stat): 541 if name[:3] == 'ST_': 542 attr = name.lower() 543 if name.endswith("TIME"): 544 def trunc(x): return int(x) 545 else: 546 def trunc(x): return x 547 self.assertEqual(trunc(getattr(result, attr)), 548 result[getattr(stat, name)]) 549 self.assertIn(attr, members) 550 551 # Make sure that the st_?time and st_?time_ns fields roughly agree 552 # (they should always agree up to around tens-of-microseconds) 553 for name in 'st_atime st_mtime st_ctime'.split(): 554 floaty = int(getattr(result, name) * 100000) 555 nanosecondy = getattr(result, name + "_ns") // 10000 556 self.assertAlmostEqual(floaty, nanosecondy, delta=2) 557 558 try: 559 result[200] 560 self.fail("No exception raised") 561 except IndexError: 562 pass 563 564 # Make sure that assignment fails 565 try: 566 result.st_mode = 1 567 self.fail("No exception raised") 568 except AttributeError: 569 pass 570 571 try: 572 result.st_rdev = 1 573 self.fail("No exception raised") 574 except (AttributeError, TypeError): 575 pass 576 577 try: 578 result.parrot = 1 579 self.fail("No exception raised") 580 except AttributeError: 581 pass 582 583 # Use the stat_result constructor with a too-short tuple. 584 try: 585 result2 = os.stat_result((10,)) 586 self.fail("No exception raised") 587 except TypeError: 588 pass 589 590 # Use the constructor with a too-long tuple. 591 try: 592 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) 593 except TypeError: 594 pass 595 596 def test_stat_attributes(self): 597 self.check_stat_attributes(self.fname) 598 599 def test_stat_attributes_bytes(self): 600 try: 601 fname = self.fname.encode(sys.getfilesystemencoding()) 602 except UnicodeEncodeError: 603 self.skipTest("cannot encode %a for the filesystem" % self.fname) 604 self.check_stat_attributes(fname) 605 606 def test_stat_result_pickle(self): 607 result = os.stat(self.fname) 608 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 609 p = pickle.dumps(result, proto) 610 self.assertIn(b'stat_result', p) 611 if proto < 4: 612 self.assertIn(b'cos\nstat_result\n', p) 613 unpickled = pickle.loads(p) 614 self.assertEqual(result, unpickled) 615 616 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()') 617 def test_statvfs_attributes(self): 618 result = os.statvfs(self.fname) 619 620 # Make sure direct access works 621 self.assertEqual(result.f_bfree, result[3]) 622 623 # Make sure all the attributes are there. 624 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files', 625 'ffree', 'favail', 'flag', 'namemax') 626 for value, member in enumerate(members): 627 self.assertEqual(getattr(result, 'f_' + member), result[value]) 628 629 self.assertTrue(isinstance(result.f_fsid, int)) 630 631 # Test that the size of the tuple doesn't change 632 self.assertEqual(len(result), 10) 633 634 # Make sure that assignment really fails 635 try: 636 result.f_bfree = 1 637 self.fail("No exception raised") 638 except AttributeError: 639 pass 640 641 try: 642 result.parrot = 1 643 self.fail("No exception raised") 644 except AttributeError: 645 pass 646 647 # Use the constructor with a too-short tuple. 648 try: 649 result2 = os.statvfs_result((10,)) 650 self.fail("No exception raised") 651 except TypeError: 652 pass 653 654 # Use the constructor with a too-long tuple. 655 try: 656 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) 657 except TypeError: 658 pass 659 660 @unittest.skipUnless(hasattr(os, 'statvfs'), 661 "need os.statvfs()") 662 def test_statvfs_result_pickle(self): 663 result = os.statvfs(self.fname) 664 665 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 666 p = pickle.dumps(result, proto) 667 self.assertIn(b'statvfs_result', p) 668 if proto < 4: 669 self.assertIn(b'cos\nstatvfs_result\n', p) 670 unpickled = pickle.loads(p) 671 self.assertEqual(result, unpickled) 672 673 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 674 def test_1686475(self): 675 # Verify that an open file can be stat'ed 676 try: 677 os.stat(r"c:\pagefile.sys") 678 except FileNotFoundError: 679 self.skipTest(r'c:\pagefile.sys does not exist') 680 except OSError as e: 681 self.fail("Could not stat pagefile.sys") 682 683 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 684 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 685 def test_15261(self): 686 # Verify that stat'ing a closed fd does not cause crash 687 r, w = os.pipe() 688 try: 689 os.stat(r) # should not raise error 690 finally: 691 os.close(r) 692 os.close(w) 693 with self.assertRaises(OSError) as ctx: 694 os.stat(r) 695 self.assertEqual(ctx.exception.errno, errno.EBADF) 696 697 def check_file_attributes(self, result): 698 self.assertTrue(hasattr(result, 'st_file_attributes')) 699 self.assertTrue(isinstance(result.st_file_attributes, int)) 700 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF) 701 702 @unittest.skipUnless(sys.platform == "win32", 703 "st_file_attributes is Win32 specific") 704 def test_file_attributes(self): 705 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set) 706 result = os.stat(self.fname) 707 self.check_file_attributes(result) 708 self.assertEqual( 709 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY, 710 0) 711 712 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set) 713 dirname = os_helper.TESTFN + "dir" 714 os.mkdir(dirname) 715 self.addCleanup(os.rmdir, dirname) 716 717 result = os.stat(dirname) 718 self.check_file_attributes(result) 719 self.assertEqual( 720 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY, 721 stat.FILE_ATTRIBUTE_DIRECTORY) 722 723 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 724 def test_access_denied(self): 725 # Default to FindFirstFile WIN32_FIND_DATA when access is 726 # denied. See issue 28075. 727 # os.environ['TEMP'] should be located on a volume that 728 # supports file ACLs. 729 fname = os.path.join(os.environ['TEMP'], self.fname) 730 self.addCleanup(os_helper.unlink, fname) 731 create_file(fname, b'ABC') 732 # Deny the right to [S]YNCHRONIZE on the file to 733 # force CreateFile to fail with ERROR_ACCESS_DENIED. 734 DETACHED_PROCESS = 8 735 subprocess.check_call( 736 # bpo-30584: Use security identifier *S-1-5-32-545 instead 737 # of localized "Users" to not depend on the locale. 738 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'], 739 creationflags=DETACHED_PROCESS 740 ) 741 result = os.stat(fname) 742 self.assertNotEqual(result.st_size, 0) 743 744 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 745 def test_stat_block_device(self): 746 # bpo-38030: os.stat fails for block devices 747 # Test a filename like "//./C:" 748 fname = "//./" + os.path.splitdrive(os.getcwd())[0] 749 result = os.stat(fname) 750 self.assertEqual(result.st_mode, stat.S_IFBLK) 751 752 753class UtimeTests(unittest.TestCase): 754 def setUp(self): 755 self.dirname = os_helper.TESTFN 756 self.fname = os.path.join(self.dirname, "f1") 757 758 self.addCleanup(os_helper.rmtree, self.dirname) 759 os.mkdir(self.dirname) 760 create_file(self.fname) 761 762 def support_subsecond(self, filename): 763 # Heuristic to check if the filesystem supports timestamp with 764 # subsecond resolution: check if float and int timestamps are different 765 st = os.stat(filename) 766 return ((st.st_atime != st[7]) 767 or (st.st_mtime != st[8]) 768 or (st.st_ctime != st[9])) 769 770 def _test_utime(self, set_time, filename=None): 771 if not filename: 772 filename = self.fname 773 774 support_subsecond = self.support_subsecond(filename) 775 if support_subsecond: 776 # Timestamp with a resolution of 1 microsecond (10^-6). 777 # 778 # The resolution of the C internal function used by os.utime() 779 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable 780 # test with a resolution of 1 ns requires more work: 781 # see the issue #15745. 782 atime_ns = 1002003000 # 1.002003 seconds 783 mtime_ns = 4005006000 # 4.005006 seconds 784 else: 785 # use a resolution of 1 second 786 atime_ns = 5 * 10**9 787 mtime_ns = 8 * 10**9 788 789 set_time(filename, (atime_ns, mtime_ns)) 790 st = os.stat(filename) 791 792 if support_subsecond: 793 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6) 794 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6) 795 else: 796 self.assertEqual(st.st_atime, atime_ns * 1e-9) 797 self.assertEqual(st.st_mtime, mtime_ns * 1e-9) 798 self.assertEqual(st.st_atime_ns, atime_ns) 799 self.assertEqual(st.st_mtime_ns, mtime_ns) 800 801 def test_utime(self): 802 def set_time(filename, ns): 803 # test the ns keyword parameter 804 os.utime(filename, ns=ns) 805 self._test_utime(set_time) 806 807 @staticmethod 808 def ns_to_sec(ns): 809 # Convert a number of nanosecond (int) to a number of seconds (float). 810 # Round towards infinity by adding 0.5 nanosecond to avoid rounding 811 # issue, os.utime() rounds towards minus infinity. 812 return (ns * 1e-9) + 0.5e-9 813 814 def test_utime_by_indexed(self): 815 # pass times as floating point seconds as the second indexed parameter 816 def set_time(filename, ns): 817 atime_ns, mtime_ns = ns 818 atime = self.ns_to_sec(atime_ns) 819 mtime = self.ns_to_sec(mtime_ns) 820 # test utimensat(timespec), utimes(timeval), utime(utimbuf) 821 # or utime(time_t) 822 os.utime(filename, (atime, mtime)) 823 self._test_utime(set_time) 824 825 def test_utime_by_times(self): 826 def set_time(filename, ns): 827 atime_ns, mtime_ns = ns 828 atime = self.ns_to_sec(atime_ns) 829 mtime = self.ns_to_sec(mtime_ns) 830 # test the times keyword parameter 831 os.utime(filename, times=(atime, mtime)) 832 self._test_utime(set_time) 833 834 @unittest.skipUnless(os.utime in os.supports_follow_symlinks, 835 "follow_symlinks support for utime required " 836 "for this test.") 837 def test_utime_nofollow_symlinks(self): 838 def set_time(filename, ns): 839 # use follow_symlinks=False to test utimensat(timespec) 840 # or lutimes(timeval) 841 os.utime(filename, ns=ns, follow_symlinks=False) 842 self._test_utime(set_time) 843 844 @unittest.skipUnless(os.utime in os.supports_fd, 845 "fd support for utime required for this test.") 846 def test_utime_fd(self): 847 def set_time(filename, ns): 848 with open(filename, 'wb', 0) as fp: 849 # use a file descriptor to test futimens(timespec) 850 # or futimes(timeval) 851 os.utime(fp.fileno(), ns=ns) 852 self._test_utime(set_time) 853 854 @unittest.skipUnless(os.utime in os.supports_dir_fd, 855 "dir_fd support for utime required for this test.") 856 def test_utime_dir_fd(self): 857 def set_time(filename, ns): 858 dirname, name = os.path.split(filename) 859 with os_helper.open_dir_fd(dirname) as dirfd: 860 # pass dir_fd to test utimensat(timespec) or futimesat(timeval) 861 os.utime(name, dir_fd=dirfd, ns=ns) 862 self._test_utime(set_time) 863 864 def test_utime_directory(self): 865 def set_time(filename, ns): 866 # test calling os.utime() on a directory 867 os.utime(filename, ns=ns) 868 self._test_utime(set_time, filename=self.dirname) 869 870 def _test_utime_current(self, set_time): 871 # Get the system clock 872 current = time.time() 873 874 # Call os.utime() to set the timestamp to the current system clock 875 set_time(self.fname) 876 877 if not self.support_subsecond(self.fname): 878 delta = 1.0 879 else: 880 # On Windows, the usual resolution of time.time() is 15.6 ms. 881 # bpo-30649: Tolerate 50 ms for slow Windows buildbots. 882 # 883 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use 884 # also 50 ms on other platforms. 885 delta = 0.050 886 st = os.stat(self.fname) 887 msg = ("st_time=%r, current=%r, dt=%r" 888 % (st.st_mtime, current, st.st_mtime - current)) 889 self.assertAlmostEqual(st.st_mtime, current, 890 delta=delta, msg=msg) 891 892 def test_utime_current(self): 893 def set_time(filename): 894 # Set to the current time in the new way 895 os.utime(self.fname) 896 self._test_utime_current(set_time) 897 898 def test_utime_current_old(self): 899 def set_time(filename): 900 # Set to the current time in the old explicit way. 901 os.utime(self.fname, None) 902 self._test_utime_current(set_time) 903 904 def get_file_system(self, path): 905 if sys.platform == 'win32': 906 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\' 907 import ctypes 908 kernel32 = ctypes.windll.kernel32 909 buf = ctypes.create_unicode_buffer("", 100) 910 ok = kernel32.GetVolumeInformationW(root, None, 0, 911 None, None, None, 912 buf, len(buf)) 913 if ok: 914 return buf.value 915 # return None if the filesystem is unknown 916 917 def test_large_time(self): 918 # Many filesystems are limited to the year 2038. At least, the test 919 # pass with NTFS filesystem. 920 if self.get_file_system(self.dirname) != "NTFS": 921 self.skipTest("requires NTFS") 922 923 large = 5000000000 # some day in 2128 924 os.utime(self.fname, (large, large)) 925 self.assertEqual(os.stat(self.fname).st_mtime, large) 926 927 def test_utime_invalid_arguments(self): 928 # seconds and nanoseconds parameters are mutually exclusive 929 with self.assertRaises(ValueError): 930 os.utime(self.fname, (5, 5), ns=(5, 5)) 931 with self.assertRaises(TypeError): 932 os.utime(self.fname, [5, 5]) 933 with self.assertRaises(TypeError): 934 os.utime(self.fname, (5,)) 935 with self.assertRaises(TypeError): 936 os.utime(self.fname, (5, 5, 5)) 937 with self.assertRaises(TypeError): 938 os.utime(self.fname, ns=[5, 5]) 939 with self.assertRaises(TypeError): 940 os.utime(self.fname, ns=(5,)) 941 with self.assertRaises(TypeError): 942 os.utime(self.fname, ns=(5, 5, 5)) 943 944 if os.utime not in os.supports_follow_symlinks: 945 with self.assertRaises(NotImplementedError): 946 os.utime(self.fname, (5, 5), follow_symlinks=False) 947 if os.utime not in os.supports_fd: 948 with open(self.fname, 'wb', 0) as fp: 949 with self.assertRaises(TypeError): 950 os.utime(fp.fileno(), (5, 5)) 951 if os.utime not in os.supports_dir_fd: 952 with self.assertRaises(NotImplementedError): 953 os.utime(self.fname, (5, 5), dir_fd=0) 954 955 @support.cpython_only 956 def test_issue31577(self): 957 # The interpreter shouldn't crash in case utime() received a bad 958 # ns argument. 959 def get_bad_int(divmod_ret_val): 960 class BadInt: 961 def __divmod__(*args): 962 return divmod_ret_val 963 return BadInt() 964 with self.assertRaises(TypeError): 965 os.utime(self.fname, ns=(get_bad_int(42), 1)) 966 with self.assertRaises(TypeError): 967 os.utime(self.fname, ns=(get_bad_int(()), 1)) 968 with self.assertRaises(TypeError): 969 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1)) 970 971 972from test import mapping_tests 973 974class EnvironTests(mapping_tests.BasicTestMappingProtocol): 975 """check that os.environ object conform to mapping protocol""" 976 type2test = None 977 978 def setUp(self): 979 self.__save = dict(os.environ) 980 if os.supports_bytes_environ: 981 self.__saveb = dict(os.environb) 982 for key, value in self._reference().items(): 983 os.environ[key] = value 984 985 def tearDown(self): 986 os.environ.clear() 987 os.environ.update(self.__save) 988 if os.supports_bytes_environ: 989 os.environb.clear() 990 os.environb.update(self.__saveb) 991 992 def _reference(self): 993 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"} 994 995 def _empty_mapping(self): 996 os.environ.clear() 997 return os.environ 998 999 # Bug 1110478 1000 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell), 1001 'requires a shell') 1002 @unittest.skipUnless(hasattr(os, 'popen'), "needs os.popen()") 1003 @support.requires_subprocess() 1004 def test_update2(self): 1005 os.environ.clear() 1006 os.environ.update(HELLO="World") 1007 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen: 1008 value = popen.read().strip() 1009 self.assertEqual(value, "World") 1010 1011 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell), 1012 'requires a shell') 1013 @unittest.skipUnless(hasattr(os, 'popen'), "needs os.popen()") 1014 @support.requires_subprocess() 1015 def test_os_popen_iter(self): 1016 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'" 1017 % unix_shell) as popen: 1018 it = iter(popen) 1019 self.assertEqual(next(it), "line1\n") 1020 self.assertEqual(next(it), "line2\n") 1021 self.assertEqual(next(it), "line3\n") 1022 self.assertRaises(StopIteration, next, it) 1023 1024 # Verify environ keys and values from the OS are of the 1025 # correct str type. 1026 def test_keyvalue_types(self): 1027 for key, val in os.environ.items(): 1028 self.assertEqual(type(key), str) 1029 self.assertEqual(type(val), str) 1030 1031 def test_items(self): 1032 for key, value in self._reference().items(): 1033 self.assertEqual(os.environ.get(key), value) 1034 1035 # Issue 7310 1036 def test___repr__(self): 1037 """Check that the repr() of os.environ looks like environ({...}).""" 1038 env = os.environ 1039 formatted_items = ", ".join( 1040 f"{key!r}: {value!r}" 1041 for key, value in env.items() 1042 ) 1043 self.assertEqual(repr(env), f"environ({{{formatted_items}}})") 1044 1045 def test_get_exec_path(self): 1046 defpath_list = os.defpath.split(os.pathsep) 1047 test_path = ['/monty', '/python', '', '/flying/circus'] 1048 test_env = {'PATH': os.pathsep.join(test_path)} 1049 1050 saved_environ = os.environ 1051 try: 1052 os.environ = dict(test_env) 1053 # Test that defaulting to os.environ works. 1054 self.assertSequenceEqual(test_path, os.get_exec_path()) 1055 self.assertSequenceEqual(test_path, os.get_exec_path(env=None)) 1056 finally: 1057 os.environ = saved_environ 1058 1059 # No PATH environment variable 1060 self.assertSequenceEqual(defpath_list, os.get_exec_path({})) 1061 # Empty PATH environment variable 1062 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''})) 1063 # Supplied PATH environment variable 1064 self.assertSequenceEqual(test_path, os.get_exec_path(test_env)) 1065 1066 if os.supports_bytes_environ: 1067 # env cannot contain 'PATH' and b'PATH' keys 1068 try: 1069 # ignore BytesWarning warning 1070 with warnings.catch_warnings(record=True): 1071 mixed_env = {'PATH': '1', b'PATH': b'2'} 1072 except BytesWarning: 1073 # mixed_env cannot be created with python -bb 1074 pass 1075 else: 1076 self.assertRaises(ValueError, os.get_exec_path, mixed_env) 1077 1078 # bytes key and/or value 1079 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}), 1080 ['abc']) 1081 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}), 1082 ['abc']) 1083 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}), 1084 ['abc']) 1085 1086 @unittest.skipUnless(os.supports_bytes_environ, 1087 "os.environb required for this test.") 1088 def test_environb(self): 1089 # os.environ -> os.environb 1090 value = 'euro\u20ac' 1091 try: 1092 value_bytes = value.encode(sys.getfilesystemencoding(), 1093 'surrogateescape') 1094 except UnicodeEncodeError: 1095 msg = "U+20AC character is not encodable to %s" % ( 1096 sys.getfilesystemencoding(),) 1097 self.skipTest(msg) 1098 os.environ['unicode'] = value 1099 self.assertEqual(os.environ['unicode'], value) 1100 self.assertEqual(os.environb[b'unicode'], value_bytes) 1101 1102 # os.environb -> os.environ 1103 value = b'\xff' 1104 os.environb[b'bytes'] = value 1105 self.assertEqual(os.environb[b'bytes'], value) 1106 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape') 1107 self.assertEqual(os.environ['bytes'], value_str) 1108 1109 @support.requires_subprocess() 1110 def test_putenv_unsetenv(self): 1111 name = "PYTHONTESTVAR" 1112 value = "testvalue" 1113 code = f'import os; print(repr(os.environ.get({name!r})))' 1114 1115 with os_helper.EnvironmentVarGuard() as env: 1116 env.pop(name, None) 1117 1118 os.putenv(name, value) 1119 proc = subprocess.run([sys.executable, '-c', code], check=True, 1120 stdout=subprocess.PIPE, text=True) 1121 self.assertEqual(proc.stdout.rstrip(), repr(value)) 1122 1123 os.unsetenv(name) 1124 proc = subprocess.run([sys.executable, '-c', code], check=True, 1125 stdout=subprocess.PIPE, text=True) 1126 self.assertEqual(proc.stdout.rstrip(), repr(None)) 1127 1128 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415). 1129 @support.requires_mac_ver(10, 6) 1130 def test_putenv_unsetenv_error(self): 1131 # Empty variable name is invalid. 1132 # "=" and null character are not allowed in a variable name. 1133 for name in ('', '=name', 'na=me', 'name=', 'name\0', 'na\0me'): 1134 self.assertRaises((OSError, ValueError), os.putenv, name, "value") 1135 self.assertRaises((OSError, ValueError), os.unsetenv, name) 1136 1137 if sys.platform == "win32": 1138 # On Windows, an environment variable string ("name=value" string) 1139 # is limited to 32,767 characters 1140 longstr = 'x' * 32_768 1141 self.assertRaises(ValueError, os.putenv, longstr, "1") 1142 self.assertRaises(ValueError, os.putenv, "X", longstr) 1143 self.assertRaises(ValueError, os.unsetenv, longstr) 1144 1145 def test_key_type(self): 1146 missing = 'missingkey' 1147 self.assertNotIn(missing, os.environ) 1148 1149 with self.assertRaises(KeyError) as cm: 1150 os.environ[missing] 1151 self.assertIs(cm.exception.args[0], missing) 1152 self.assertTrue(cm.exception.__suppress_context__) 1153 1154 with self.assertRaises(KeyError) as cm: 1155 del os.environ[missing] 1156 self.assertIs(cm.exception.args[0], missing) 1157 self.assertTrue(cm.exception.__suppress_context__) 1158 1159 def _test_environ_iteration(self, collection): 1160 iterator = iter(collection) 1161 new_key = "__new_key__" 1162 1163 next(iterator) # start iteration over os.environ.items 1164 1165 # add a new key in os.environ mapping 1166 os.environ[new_key] = "test_environ_iteration" 1167 1168 try: 1169 next(iterator) # force iteration over modified mapping 1170 self.assertEqual(os.environ[new_key], "test_environ_iteration") 1171 finally: 1172 del os.environ[new_key] 1173 1174 def test_iter_error_when_changing_os_environ(self): 1175 self._test_environ_iteration(os.environ) 1176 1177 def test_iter_error_when_changing_os_environ_items(self): 1178 self._test_environ_iteration(os.environ.items()) 1179 1180 def test_iter_error_when_changing_os_environ_values(self): 1181 self._test_environ_iteration(os.environ.values()) 1182 1183 def _test_underlying_process_env(self, var, expected): 1184 if not (unix_shell and os.path.exists(unix_shell)): 1185 return 1186 elif not support.has_subprocess_support: 1187 return 1188 1189 with os.popen(f"{unix_shell} -c 'echo ${var}'") as popen: 1190 value = popen.read().strip() 1191 1192 self.assertEqual(expected, value) 1193 1194 def test_or_operator(self): 1195 overridden_key = '_TEST_VAR_' 1196 original_value = 'original_value' 1197 os.environ[overridden_key] = original_value 1198 1199 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'} 1200 expected = dict(os.environ) 1201 expected.update(new_vars_dict) 1202 1203 actual = os.environ | new_vars_dict 1204 self.assertDictEqual(expected, actual) 1205 self.assertEqual('3', actual[overridden_key]) 1206 1207 new_vars_items = new_vars_dict.items() 1208 self.assertIs(NotImplemented, os.environ.__or__(new_vars_items)) 1209 1210 self._test_underlying_process_env('_A_', '') 1211 self._test_underlying_process_env(overridden_key, original_value) 1212 1213 def test_ior_operator(self): 1214 overridden_key = '_TEST_VAR_' 1215 os.environ[overridden_key] = 'original_value' 1216 1217 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'} 1218 expected = dict(os.environ) 1219 expected.update(new_vars_dict) 1220 1221 os.environ |= new_vars_dict 1222 self.assertEqual(expected, os.environ) 1223 self.assertEqual('3', os.environ[overridden_key]) 1224 1225 self._test_underlying_process_env('_A_', '1') 1226 self._test_underlying_process_env(overridden_key, '3') 1227 1228 def test_ior_operator_invalid_dicts(self): 1229 os_environ_copy = os.environ.copy() 1230 with self.assertRaises(TypeError): 1231 dict_with_bad_key = {1: '_A_'} 1232 os.environ |= dict_with_bad_key 1233 1234 with self.assertRaises(TypeError): 1235 dict_with_bad_val = {'_A_': 1} 1236 os.environ |= dict_with_bad_val 1237 1238 # Check nothing was added. 1239 self.assertEqual(os_environ_copy, os.environ) 1240 1241 def test_ior_operator_key_value_iterable(self): 1242 overridden_key = '_TEST_VAR_' 1243 os.environ[overridden_key] = 'original_value' 1244 1245 new_vars_items = (('_A_', '1'), ('_B_', '2'), (overridden_key, '3')) 1246 expected = dict(os.environ) 1247 expected.update(new_vars_items) 1248 1249 os.environ |= new_vars_items 1250 self.assertEqual(expected, os.environ) 1251 self.assertEqual('3', os.environ[overridden_key]) 1252 1253 self._test_underlying_process_env('_A_', '1') 1254 self._test_underlying_process_env(overridden_key, '3') 1255 1256 def test_ror_operator(self): 1257 overridden_key = '_TEST_VAR_' 1258 original_value = 'original_value' 1259 os.environ[overridden_key] = original_value 1260 1261 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'} 1262 expected = dict(new_vars_dict) 1263 expected.update(os.environ) 1264 1265 actual = new_vars_dict | os.environ 1266 self.assertDictEqual(expected, actual) 1267 self.assertEqual(original_value, actual[overridden_key]) 1268 1269 new_vars_items = new_vars_dict.items() 1270 self.assertIs(NotImplemented, os.environ.__ror__(new_vars_items)) 1271 1272 self._test_underlying_process_env('_A_', '') 1273 self._test_underlying_process_env(overridden_key, original_value) 1274 1275 1276class WalkTests(unittest.TestCase): 1277 """Tests for os.walk().""" 1278 1279 # Wrapper to hide minor differences between os.walk and os.fwalk 1280 # to tests both functions with the same code base 1281 def walk(self, top, **kwargs): 1282 if 'follow_symlinks' in kwargs: 1283 kwargs['followlinks'] = kwargs.pop('follow_symlinks') 1284 return os.walk(top, **kwargs) 1285 1286 def setUp(self): 1287 join = os.path.join 1288 self.addCleanup(os_helper.rmtree, os_helper.TESTFN) 1289 1290 # Build: 1291 # TESTFN/ 1292 # TEST1/ a file kid and two directory kids 1293 # tmp1 1294 # SUB1/ a file kid and a directory kid 1295 # tmp2 1296 # SUB11/ no kids 1297 # SUB2/ a file kid and a dirsymlink kid 1298 # tmp3 1299 # SUB21/ not readable 1300 # tmp5 1301 # link/ a symlink to TESTFN.2 1302 # broken_link 1303 # broken_link2 1304 # broken_link3 1305 # TEST2/ 1306 # tmp4 a lone file 1307 self.walk_path = join(os_helper.TESTFN, "TEST1") 1308 self.sub1_path = join(self.walk_path, "SUB1") 1309 self.sub11_path = join(self.sub1_path, "SUB11") 1310 sub2_path = join(self.walk_path, "SUB2") 1311 sub21_path = join(sub2_path, "SUB21") 1312 tmp1_path = join(self.walk_path, "tmp1") 1313 tmp2_path = join(self.sub1_path, "tmp2") 1314 tmp3_path = join(sub2_path, "tmp3") 1315 tmp5_path = join(sub21_path, "tmp3") 1316 self.link_path = join(sub2_path, "link") 1317 t2_path = join(os_helper.TESTFN, "TEST2") 1318 tmp4_path = join(os_helper.TESTFN, "TEST2", "tmp4") 1319 broken_link_path = join(sub2_path, "broken_link") 1320 broken_link2_path = join(sub2_path, "broken_link2") 1321 broken_link3_path = join(sub2_path, "broken_link3") 1322 1323 # Create stuff. 1324 os.makedirs(self.sub11_path) 1325 os.makedirs(sub2_path) 1326 os.makedirs(sub21_path) 1327 os.makedirs(t2_path) 1328 1329 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path: 1330 with open(path, "x", encoding='utf-8') as f: 1331 f.write("I'm " + path + " and proud of it. Blame test_os.\n") 1332 1333 if os_helper.can_symlink(): 1334 os.symlink(os.path.abspath(t2_path), self.link_path) 1335 os.symlink('broken', broken_link_path, True) 1336 os.symlink(join('tmp3', 'broken'), broken_link2_path, True) 1337 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True) 1338 self.sub2_tree = (sub2_path, ["SUB21", "link"], 1339 ["broken_link", "broken_link2", "broken_link3", 1340 "tmp3"]) 1341 else: 1342 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"]) 1343 1344 if not support.is_emscripten: 1345 # Emscripten fails with inaccessible directory 1346 os.chmod(sub21_path, 0) 1347 try: 1348 os.listdir(sub21_path) 1349 except PermissionError: 1350 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU) 1351 else: 1352 os.chmod(sub21_path, stat.S_IRWXU) 1353 os.unlink(tmp5_path) 1354 os.rmdir(sub21_path) 1355 del self.sub2_tree[1][:1] 1356 1357 def test_walk_topdown(self): 1358 # Walk top-down. 1359 all = list(self.walk(self.walk_path)) 1360 1361 self.assertEqual(len(all), 4) 1362 # We can't know which order SUB1 and SUB2 will appear in. 1363 # Not flipped: TESTFN, SUB1, SUB11, SUB2 1364 # flipped: TESTFN, SUB2, SUB1, SUB11 1365 flipped = all[0][1][0] != "SUB1" 1366 all[0][1].sort() 1367 all[3 - 2 * flipped][-1].sort() 1368 all[3 - 2 * flipped][1].sort() 1369 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"])) 1370 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"])) 1371 self.assertEqual(all[2 + flipped], (self.sub11_path, [], [])) 1372 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree) 1373 1374 def test_walk_prune(self, walk_path=None): 1375 if walk_path is None: 1376 walk_path = self.walk_path 1377 # Prune the search. 1378 all = [] 1379 for root, dirs, files in self.walk(walk_path): 1380 all.append((root, dirs, files)) 1381 # Don't descend into SUB1. 1382 if 'SUB1' in dirs: 1383 # Note that this also mutates the dirs we appended to all! 1384 dirs.remove('SUB1') 1385 1386 self.assertEqual(len(all), 2) 1387 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"])) 1388 1389 all[1][-1].sort() 1390 all[1][1].sort() 1391 self.assertEqual(all[1], self.sub2_tree) 1392 1393 def test_file_like_path(self): 1394 self.test_walk_prune(FakePath(self.walk_path)) 1395 1396 def test_walk_bottom_up(self): 1397 # Walk bottom-up. 1398 all = list(self.walk(self.walk_path, topdown=False)) 1399 1400 self.assertEqual(len(all), 4, all) 1401 # We can't know which order SUB1 and SUB2 will appear in. 1402 # Not flipped: SUB11, SUB1, SUB2, TESTFN 1403 # flipped: SUB2, SUB11, SUB1, TESTFN 1404 flipped = all[3][1][0] != "SUB1" 1405 all[3][1].sort() 1406 all[2 - 2 * flipped][-1].sort() 1407 all[2 - 2 * flipped][1].sort() 1408 self.assertEqual(all[3], 1409 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"])) 1410 self.assertEqual(all[flipped], 1411 (self.sub11_path, [], [])) 1412 self.assertEqual(all[flipped + 1], 1413 (self.sub1_path, ["SUB11"], ["tmp2"])) 1414 self.assertEqual(all[2 - 2 * flipped], 1415 self.sub2_tree) 1416 1417 def test_walk_symlink(self): 1418 if not os_helper.can_symlink(): 1419 self.skipTest("need symlink support") 1420 1421 # Walk, following symlinks. 1422 walk_it = self.walk(self.walk_path, follow_symlinks=True) 1423 for root, dirs, files in walk_it: 1424 if root == self.link_path: 1425 self.assertEqual(dirs, []) 1426 self.assertEqual(files, ["tmp4"]) 1427 break 1428 else: 1429 self.fail("Didn't follow symlink with followlinks=True") 1430 1431 def test_walk_bad_dir(self): 1432 # Walk top-down. 1433 errors = [] 1434 walk_it = self.walk(self.walk_path, onerror=errors.append) 1435 root, dirs, files = next(walk_it) 1436 self.assertEqual(errors, []) 1437 dir1 = 'SUB1' 1438 path1 = os.path.join(root, dir1) 1439 path1new = os.path.join(root, dir1 + '.new') 1440 os.rename(path1, path1new) 1441 try: 1442 roots = [r for r, d, f in walk_it] 1443 self.assertTrue(errors) 1444 self.assertNotIn(path1, roots) 1445 self.assertNotIn(path1new, roots) 1446 for dir2 in dirs: 1447 if dir2 != dir1: 1448 self.assertIn(os.path.join(root, dir2), roots) 1449 finally: 1450 os.rename(path1new, path1) 1451 1452 def test_walk_many_open_files(self): 1453 depth = 30 1454 base = os.path.join(os_helper.TESTFN, 'deep') 1455 p = os.path.join(base, *(['d']*depth)) 1456 os.makedirs(p) 1457 1458 iters = [self.walk(base, topdown=False) for j in range(100)] 1459 for i in range(depth + 1): 1460 expected = (p, ['d'] if i else [], []) 1461 for it in iters: 1462 self.assertEqual(next(it), expected) 1463 p = os.path.dirname(p) 1464 1465 iters = [self.walk(base, topdown=True) for j in range(100)] 1466 p = base 1467 for i in range(depth + 1): 1468 expected = (p, ['d'] if i < depth else [], []) 1469 for it in iters: 1470 self.assertEqual(next(it), expected) 1471 p = os.path.join(p, 'd') 1472 1473 1474@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()") 1475class FwalkTests(WalkTests): 1476 """Tests for os.fwalk().""" 1477 1478 def walk(self, top, **kwargs): 1479 for root, dirs, files, root_fd in self.fwalk(top, **kwargs): 1480 yield (root, dirs, files) 1481 1482 def fwalk(self, *args, **kwargs): 1483 return os.fwalk(*args, **kwargs) 1484 1485 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs): 1486 """ 1487 compare with walk() results. 1488 """ 1489 walk_kwargs = walk_kwargs.copy() 1490 fwalk_kwargs = fwalk_kwargs.copy() 1491 for topdown, follow_symlinks in itertools.product((True, False), repeat=2): 1492 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks) 1493 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks) 1494 1495 expected = {} 1496 for root, dirs, files in os.walk(**walk_kwargs): 1497 expected[root] = (set(dirs), set(files)) 1498 1499 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs): 1500 self.assertIn(root, expected) 1501 self.assertEqual(expected[root], (set(dirs), set(files))) 1502 1503 def test_compare_to_walk(self): 1504 kwargs = {'top': os_helper.TESTFN} 1505 self._compare_to_walk(kwargs, kwargs) 1506 1507 def test_dir_fd(self): 1508 try: 1509 fd = os.open(".", os.O_RDONLY) 1510 walk_kwargs = {'top': os_helper.TESTFN} 1511 fwalk_kwargs = walk_kwargs.copy() 1512 fwalk_kwargs['dir_fd'] = fd 1513 self._compare_to_walk(walk_kwargs, fwalk_kwargs) 1514 finally: 1515 os.close(fd) 1516 1517 def test_yields_correct_dir_fd(self): 1518 # check returned file descriptors 1519 for topdown, follow_symlinks in itertools.product((True, False), repeat=2): 1520 args = os_helper.TESTFN, topdown, None 1521 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks): 1522 # check that the FD is valid 1523 os.fstat(rootfd) 1524 # redundant check 1525 os.stat(rootfd) 1526 # check that listdir() returns consistent information 1527 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files)) 1528 1529 @unittest.skipIf( 1530 support.is_emscripten, "Cannot dup stdout on Emscripten" 1531 ) 1532 def test_fd_leak(self): 1533 # Since we're opening a lot of FDs, we must be careful to avoid leaks: 1534 # we both check that calling fwalk() a large number of times doesn't 1535 # yield EMFILE, and that the minimum allocated FD hasn't changed. 1536 minfd = os.dup(1) 1537 os.close(minfd) 1538 for i in range(256): 1539 for x in self.fwalk(os_helper.TESTFN): 1540 pass 1541 newfd = os.dup(1) 1542 self.addCleanup(os.close, newfd) 1543 self.assertEqual(newfd, minfd) 1544 1545 # fwalk() keeps file descriptors open 1546 test_walk_many_open_files = None 1547 1548 1549class BytesWalkTests(WalkTests): 1550 """Tests for os.walk() with bytes.""" 1551 def walk(self, top, **kwargs): 1552 if 'follow_symlinks' in kwargs: 1553 kwargs['followlinks'] = kwargs.pop('follow_symlinks') 1554 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs): 1555 root = os.fsdecode(broot) 1556 dirs = list(map(os.fsdecode, bdirs)) 1557 files = list(map(os.fsdecode, bfiles)) 1558 yield (root, dirs, files) 1559 bdirs[:] = list(map(os.fsencode, dirs)) 1560 bfiles[:] = list(map(os.fsencode, files)) 1561 1562@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()") 1563class BytesFwalkTests(FwalkTests): 1564 """Tests for os.walk() with bytes.""" 1565 def fwalk(self, top='.', *args, **kwargs): 1566 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs): 1567 root = os.fsdecode(broot) 1568 dirs = list(map(os.fsdecode, bdirs)) 1569 files = list(map(os.fsdecode, bfiles)) 1570 yield (root, dirs, files, topfd) 1571 bdirs[:] = list(map(os.fsencode, dirs)) 1572 bfiles[:] = list(map(os.fsencode, files)) 1573 1574 1575class MakedirTests(unittest.TestCase): 1576 def setUp(self): 1577 os.mkdir(os_helper.TESTFN) 1578 1579 def test_makedir(self): 1580 base = os_helper.TESTFN 1581 path = os.path.join(base, 'dir1', 'dir2', 'dir3') 1582 os.makedirs(path) # Should work 1583 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4') 1584 os.makedirs(path) 1585 1586 # Try paths with a '.' in them 1587 self.assertRaises(OSError, os.makedirs, os.curdir) 1588 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir) 1589 os.makedirs(path) 1590 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4', 1591 'dir5', 'dir6') 1592 os.makedirs(path) 1593 1594 @unittest.skipIf( 1595 support.is_emscripten or support.is_wasi, 1596 "Emscripten's/WASI's umask is a stub." 1597 ) 1598 def test_mode(self): 1599 with os_helper.temp_umask(0o002): 1600 base = os_helper.TESTFN 1601 parent = os.path.join(base, 'dir1') 1602 path = os.path.join(parent, 'dir2') 1603 os.makedirs(path, 0o555) 1604 self.assertTrue(os.path.exists(path)) 1605 self.assertTrue(os.path.isdir(path)) 1606 if os.name != 'nt': 1607 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555) 1608 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775) 1609 1610 @unittest.skipIf( 1611 support.is_emscripten or support.is_wasi, 1612 "Emscripten's/WASI's umask is a stub." 1613 ) 1614 def test_exist_ok_existing_directory(self): 1615 path = os.path.join(os_helper.TESTFN, 'dir1') 1616 mode = 0o777 1617 old_mask = os.umask(0o022) 1618 os.makedirs(path, mode) 1619 self.assertRaises(OSError, os.makedirs, path, mode) 1620 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False) 1621 os.makedirs(path, 0o776, exist_ok=True) 1622 os.makedirs(path, mode=mode, exist_ok=True) 1623 os.umask(old_mask) 1624 1625 # Issue #25583: A drive root could raise PermissionError on Windows 1626 os.makedirs(os.path.abspath('/'), exist_ok=True) 1627 1628 @unittest.skipIf( 1629 support.is_emscripten or support.is_wasi, 1630 "Emscripten's/WASI's umask is a stub." 1631 ) 1632 def test_exist_ok_s_isgid_directory(self): 1633 path = os.path.join(os_helper.TESTFN, 'dir1') 1634 S_ISGID = stat.S_ISGID 1635 mode = 0o777 1636 old_mask = os.umask(0o022) 1637 try: 1638 existing_testfn_mode = stat.S_IMODE( 1639 os.lstat(os_helper.TESTFN).st_mode) 1640 try: 1641 os.chmod(os_helper.TESTFN, existing_testfn_mode | S_ISGID) 1642 except PermissionError: 1643 raise unittest.SkipTest('Cannot set S_ISGID for dir.') 1644 if (os.lstat(os_helper.TESTFN).st_mode & S_ISGID != S_ISGID): 1645 raise unittest.SkipTest('No support for S_ISGID dir mode.') 1646 # The os should apply S_ISGID from the parent dir for us, but 1647 # this test need not depend on that behavior. Be explicit. 1648 os.makedirs(path, mode | S_ISGID) 1649 # http://bugs.python.org/issue14992 1650 # Should not fail when the bit is already set. 1651 os.makedirs(path, mode, exist_ok=True) 1652 # remove the bit. 1653 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID) 1654 # May work even when the bit is not already set when demanded. 1655 os.makedirs(path, mode | S_ISGID, exist_ok=True) 1656 finally: 1657 os.umask(old_mask) 1658 1659 def test_exist_ok_existing_regular_file(self): 1660 base = os_helper.TESTFN 1661 path = os.path.join(os_helper.TESTFN, 'dir1') 1662 with open(path, 'w', encoding='utf-8') as f: 1663 f.write('abc') 1664 self.assertRaises(OSError, os.makedirs, path) 1665 self.assertRaises(OSError, os.makedirs, path, exist_ok=False) 1666 self.assertRaises(OSError, os.makedirs, path, exist_ok=True) 1667 os.remove(path) 1668 1669 @unittest.skipUnless(os.name == 'nt', "requires Windows") 1670 def test_win32_mkdir_700(self): 1671 base = os_helper.TESTFN 1672 path = os.path.abspath(os.path.join(os_helper.TESTFN, 'dir')) 1673 os.mkdir(path, mode=0o700) 1674 out = subprocess.check_output(["cacls.exe", path, "/s"], encoding="oem") 1675 os.rmdir(path) 1676 self.assertEqual( 1677 out.strip(), 1678 f'{path} "D:P(A;OICI;FA;;;SY)(A;OICI;FA;;;BA)(A;OICI;FA;;;OW)"', 1679 ) 1680 1681 def tearDown(self): 1682 path = os.path.join(os_helper.TESTFN, 'dir1', 'dir2', 'dir3', 1683 'dir4', 'dir5', 'dir6') 1684 # If the tests failed, the bottom-most directory ('../dir6') 1685 # may not have been created, so we look for the outermost directory 1686 # that exists. 1687 while not os.path.exists(path) and path != os_helper.TESTFN: 1688 path = os.path.dirname(path) 1689 1690 os.removedirs(path) 1691 1692 1693@os_helper.skip_unless_working_chmod 1694class ChownFileTests(unittest.TestCase): 1695 1696 @classmethod 1697 def setUpClass(cls): 1698 os.mkdir(os_helper.TESTFN) 1699 1700 def test_chown_uid_gid_arguments_must_be_index(self): 1701 stat = os.stat(os_helper.TESTFN) 1702 uid = stat.st_uid 1703 gid = stat.st_gid 1704 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)): 1705 self.assertRaises(TypeError, os.chown, os_helper.TESTFN, value, gid) 1706 self.assertRaises(TypeError, os.chown, os_helper.TESTFN, uid, value) 1707 self.assertIsNone(os.chown(os_helper.TESTFN, uid, gid)) 1708 self.assertIsNone(os.chown(os_helper.TESTFN, -1, -1)) 1709 1710 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups') 1711 def test_chown_gid(self): 1712 groups = os.getgroups() 1713 if len(groups) < 2: 1714 self.skipTest("test needs at least 2 groups") 1715 1716 gid_1, gid_2 = groups[:2] 1717 uid = os.stat(os_helper.TESTFN).st_uid 1718 1719 os.chown(os_helper.TESTFN, uid, gid_1) 1720 gid = os.stat(os_helper.TESTFN).st_gid 1721 self.assertEqual(gid, gid_1) 1722 1723 os.chown(os_helper.TESTFN, uid, gid_2) 1724 gid = os.stat(os_helper.TESTFN).st_gid 1725 self.assertEqual(gid, gid_2) 1726 1727 @unittest.skipUnless(root_in_posix and len(all_users) > 1, 1728 "test needs root privilege and more than one user") 1729 def test_chown_with_root(self): 1730 uid_1, uid_2 = all_users[:2] 1731 gid = os.stat(os_helper.TESTFN).st_gid 1732 os.chown(os_helper.TESTFN, uid_1, gid) 1733 uid = os.stat(os_helper.TESTFN).st_uid 1734 self.assertEqual(uid, uid_1) 1735 os.chown(os_helper.TESTFN, uid_2, gid) 1736 uid = os.stat(os_helper.TESTFN).st_uid 1737 self.assertEqual(uid, uid_2) 1738 1739 @unittest.skipUnless(not root_in_posix and len(all_users) > 1, 1740 "test needs non-root account and more than one user") 1741 def test_chown_without_permission(self): 1742 uid_1, uid_2 = all_users[:2] 1743 gid = os.stat(os_helper.TESTFN).st_gid 1744 with self.assertRaises(PermissionError): 1745 os.chown(os_helper.TESTFN, uid_1, gid) 1746 os.chown(os_helper.TESTFN, uid_2, gid) 1747 1748 @classmethod 1749 def tearDownClass(cls): 1750 os.rmdir(os_helper.TESTFN) 1751 1752 1753class RemoveDirsTests(unittest.TestCase): 1754 def setUp(self): 1755 os.makedirs(os_helper.TESTFN) 1756 1757 def tearDown(self): 1758 os_helper.rmtree(os_helper.TESTFN) 1759 1760 def test_remove_all(self): 1761 dira = os.path.join(os_helper.TESTFN, 'dira') 1762 os.mkdir(dira) 1763 dirb = os.path.join(dira, 'dirb') 1764 os.mkdir(dirb) 1765 os.removedirs(dirb) 1766 self.assertFalse(os.path.exists(dirb)) 1767 self.assertFalse(os.path.exists(dira)) 1768 self.assertFalse(os.path.exists(os_helper.TESTFN)) 1769 1770 def test_remove_partial(self): 1771 dira = os.path.join(os_helper.TESTFN, 'dira') 1772 os.mkdir(dira) 1773 dirb = os.path.join(dira, 'dirb') 1774 os.mkdir(dirb) 1775 create_file(os.path.join(dira, 'file.txt')) 1776 os.removedirs(dirb) 1777 self.assertFalse(os.path.exists(dirb)) 1778 self.assertTrue(os.path.exists(dira)) 1779 self.assertTrue(os.path.exists(os_helper.TESTFN)) 1780 1781 def test_remove_nothing(self): 1782 dira = os.path.join(os_helper.TESTFN, 'dira') 1783 os.mkdir(dira) 1784 dirb = os.path.join(dira, 'dirb') 1785 os.mkdir(dirb) 1786 create_file(os.path.join(dirb, 'file.txt')) 1787 with self.assertRaises(OSError): 1788 os.removedirs(dirb) 1789 self.assertTrue(os.path.exists(dirb)) 1790 self.assertTrue(os.path.exists(dira)) 1791 self.assertTrue(os.path.exists(os_helper.TESTFN)) 1792 1793 1794@unittest.skipIf(support.is_wasi, "WASI has no /dev/null") 1795class DevNullTests(unittest.TestCase): 1796 def test_devnull(self): 1797 with open(os.devnull, 'wb', 0) as f: 1798 f.write(b'hello') 1799 f.close() 1800 with open(os.devnull, 'rb') as f: 1801 self.assertEqual(f.read(), b'') 1802 1803 1804class URandomTests(unittest.TestCase): 1805 def test_urandom_length(self): 1806 self.assertEqual(len(os.urandom(0)), 0) 1807 self.assertEqual(len(os.urandom(1)), 1) 1808 self.assertEqual(len(os.urandom(10)), 10) 1809 self.assertEqual(len(os.urandom(100)), 100) 1810 self.assertEqual(len(os.urandom(1000)), 1000) 1811 1812 def test_urandom_value(self): 1813 data1 = os.urandom(16) 1814 self.assertIsInstance(data1, bytes) 1815 data2 = os.urandom(16) 1816 self.assertNotEqual(data1, data2) 1817 1818 def get_urandom_subprocess(self, count): 1819 code = '\n'.join(( 1820 'import os, sys', 1821 'data = os.urandom(%s)' % count, 1822 'sys.stdout.buffer.write(data)', 1823 'sys.stdout.buffer.flush()')) 1824 out = assert_python_ok('-c', code) 1825 stdout = out[1] 1826 self.assertEqual(len(stdout), count) 1827 return stdout 1828 1829 def test_urandom_subprocess(self): 1830 data1 = self.get_urandom_subprocess(16) 1831 data2 = self.get_urandom_subprocess(16) 1832 self.assertNotEqual(data1, data2) 1833 1834 1835@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()') 1836class GetRandomTests(unittest.TestCase): 1837 @classmethod 1838 def setUpClass(cls): 1839 try: 1840 os.getrandom(1) 1841 except OSError as exc: 1842 if exc.errno == errno.ENOSYS: 1843 # Python compiled on a more recent Linux version 1844 # than the current Linux kernel 1845 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS") 1846 else: 1847 raise 1848 1849 def test_getrandom_type(self): 1850 data = os.getrandom(16) 1851 self.assertIsInstance(data, bytes) 1852 self.assertEqual(len(data), 16) 1853 1854 def test_getrandom0(self): 1855 empty = os.getrandom(0) 1856 self.assertEqual(empty, b'') 1857 1858 def test_getrandom_random(self): 1859 self.assertTrue(hasattr(os, 'GRND_RANDOM')) 1860 1861 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare 1862 # resource /dev/random 1863 1864 def test_getrandom_nonblock(self): 1865 # The call must not fail. Check also that the flag exists 1866 try: 1867 os.getrandom(1, os.GRND_NONBLOCK) 1868 except BlockingIOError: 1869 # System urandom is not initialized yet 1870 pass 1871 1872 def test_getrandom_value(self): 1873 data1 = os.getrandom(16) 1874 data2 = os.getrandom(16) 1875 self.assertNotEqual(data1, data2) 1876 1877 1878# os.urandom() doesn't use a file descriptor when it is implemented with the 1879# getentropy() function, the getrandom() function or the getrandom() syscall 1880OS_URANDOM_DONT_USE_FD = ( 1881 sysconfig.get_config_var('HAVE_GETENTROPY') == 1 1882 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1 1883 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1) 1884 1885@unittest.skipIf(OS_URANDOM_DONT_USE_FD , 1886 "os.random() does not use a file descriptor") 1887@unittest.skipIf(sys.platform == "vxworks", 1888 "VxWorks can't set RLIMIT_NOFILE to 1") 1889class URandomFDTests(unittest.TestCase): 1890 @unittest.skipUnless(resource, "test requires the resource module") 1891 def test_urandom_failure(self): 1892 # Check urandom() failing when it is not able to open /dev/random. 1893 # We spawn a new process to make the test more robust (if getrlimit() 1894 # failed to restore the file descriptor limit after this, the whole 1895 # test suite would crash; this actually happened on the OS X Tiger 1896 # buildbot). 1897 code = """if 1: 1898 import errno 1899 import os 1900 import resource 1901 1902 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE) 1903 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit)) 1904 try: 1905 os.urandom(16) 1906 except OSError as e: 1907 assert e.errno == errno.EMFILE, e.errno 1908 else: 1909 raise AssertionError("OSError not raised") 1910 """ 1911 assert_python_ok('-c', code) 1912 1913 def test_urandom_fd_closed(self): 1914 # Issue #21207: urandom() should reopen its fd to /dev/urandom if 1915 # closed. 1916 code = """if 1: 1917 import os 1918 import sys 1919 import test.support 1920 os.urandom(4) 1921 with test.support.SuppressCrashReport(): 1922 os.closerange(3, 256) 1923 sys.stdout.buffer.write(os.urandom(4)) 1924 """ 1925 rc, out, err = assert_python_ok('-Sc', code) 1926 1927 def test_urandom_fd_reopened(self): 1928 # Issue #21207: urandom() should detect its fd to /dev/urandom 1929 # changed to something else, and reopen it. 1930 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 1931 create_file(os_helper.TESTFN, b"x" * 256) 1932 1933 code = """if 1: 1934 import os 1935 import sys 1936 import test.support 1937 os.urandom(4) 1938 with test.support.SuppressCrashReport(): 1939 for fd in range(3, 256): 1940 try: 1941 os.close(fd) 1942 except OSError: 1943 pass 1944 else: 1945 # Found the urandom fd (XXX hopefully) 1946 break 1947 os.closerange(3, 256) 1948 with open({TESTFN!r}, 'rb') as f: 1949 new_fd = f.fileno() 1950 # Issue #26935: posix allows new_fd and fd to be equal but 1951 # some libc implementations have dup2 return an error in this 1952 # case. 1953 if new_fd != fd: 1954 os.dup2(new_fd, fd) 1955 sys.stdout.buffer.write(os.urandom(4)) 1956 sys.stdout.buffer.write(os.urandom(4)) 1957 """.format(TESTFN=os_helper.TESTFN) 1958 rc, out, err = assert_python_ok('-Sc', code) 1959 self.assertEqual(len(out), 8) 1960 self.assertNotEqual(out[0:4], out[4:8]) 1961 rc, out2, err2 = assert_python_ok('-Sc', code) 1962 self.assertEqual(len(out2), 8) 1963 self.assertNotEqual(out2, out) 1964 1965 1966@contextlib.contextmanager 1967def _execvpe_mockup(defpath=None): 1968 """ 1969 Stubs out execv and execve functions when used as context manager. 1970 Records exec calls. The mock execv and execve functions always raise an 1971 exception as they would normally never return. 1972 """ 1973 # A list of tuples containing (function name, first arg, args) 1974 # of calls to execv or execve that have been made. 1975 calls = [] 1976 1977 def mock_execv(name, *args): 1978 calls.append(('execv', name, args)) 1979 raise RuntimeError("execv called") 1980 1981 def mock_execve(name, *args): 1982 calls.append(('execve', name, args)) 1983 raise OSError(errno.ENOTDIR, "execve called") 1984 1985 try: 1986 orig_execv = os.execv 1987 orig_execve = os.execve 1988 orig_defpath = os.defpath 1989 os.execv = mock_execv 1990 os.execve = mock_execve 1991 if defpath is not None: 1992 os.defpath = defpath 1993 yield calls 1994 finally: 1995 os.execv = orig_execv 1996 os.execve = orig_execve 1997 os.defpath = orig_defpath 1998 1999@unittest.skipUnless(hasattr(os, 'execv'), 2000 "need os.execv()") 2001class ExecTests(unittest.TestCase): 2002 @unittest.skipIf(USING_LINUXTHREADS, 2003 "avoid triggering a linuxthreads bug: see issue #4970") 2004 def test_execvpe_with_bad_program(self): 2005 self.assertRaises(OSError, os.execvpe, 'no such app-', 2006 ['no such app-'], None) 2007 2008 def test_execv_with_bad_arglist(self): 2009 self.assertRaises(ValueError, os.execv, 'notepad', ()) 2010 self.assertRaises(ValueError, os.execv, 'notepad', []) 2011 self.assertRaises(ValueError, os.execv, 'notepad', ('',)) 2012 self.assertRaises(ValueError, os.execv, 'notepad', ['']) 2013 2014 def test_execvpe_with_bad_arglist(self): 2015 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None) 2016 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {}) 2017 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {}) 2018 2019 @unittest.skipUnless(hasattr(os, '_execvpe'), 2020 "No internal os._execvpe function to test.") 2021 def _test_internal_execvpe(self, test_type): 2022 program_path = os.sep + 'absolutepath' 2023 if test_type is bytes: 2024 program = b'executable' 2025 fullpath = os.path.join(os.fsencode(program_path), program) 2026 native_fullpath = fullpath 2027 arguments = [b'progname', 'arg1', 'arg2'] 2028 else: 2029 program = 'executable' 2030 arguments = ['progname', 'arg1', 'arg2'] 2031 fullpath = os.path.join(program_path, program) 2032 if os.name != "nt": 2033 native_fullpath = os.fsencode(fullpath) 2034 else: 2035 native_fullpath = fullpath 2036 env = {'spam': 'beans'} 2037 2038 # test os._execvpe() with an absolute path 2039 with _execvpe_mockup() as calls: 2040 self.assertRaises(RuntimeError, 2041 os._execvpe, fullpath, arguments) 2042 self.assertEqual(len(calls), 1) 2043 self.assertEqual(calls[0], ('execv', fullpath, (arguments,))) 2044 2045 # test os._execvpe() with a relative path: 2046 # os.get_exec_path() returns defpath 2047 with _execvpe_mockup(defpath=program_path) as calls: 2048 self.assertRaises(OSError, 2049 os._execvpe, program, arguments, env=env) 2050 self.assertEqual(len(calls), 1) 2051 self.assertSequenceEqual(calls[0], 2052 ('execve', native_fullpath, (arguments, env))) 2053 2054 # test os._execvpe() with a relative path: 2055 # os.get_exec_path() reads the 'PATH' variable 2056 with _execvpe_mockup() as calls: 2057 env_path = env.copy() 2058 if test_type is bytes: 2059 env_path[b'PATH'] = program_path 2060 else: 2061 env_path['PATH'] = program_path 2062 self.assertRaises(OSError, 2063 os._execvpe, program, arguments, env=env_path) 2064 self.assertEqual(len(calls), 1) 2065 self.assertSequenceEqual(calls[0], 2066 ('execve', native_fullpath, (arguments, env_path))) 2067 2068 def test_internal_execvpe_str(self): 2069 self._test_internal_execvpe(str) 2070 if os.name != "nt": 2071 self._test_internal_execvpe(bytes) 2072 2073 def test_execve_invalid_env(self): 2074 args = [sys.executable, '-c', 'pass'] 2075 2076 # null character in the environment variable name 2077 newenv = os.environ.copy() 2078 newenv["FRUIT\0VEGETABLE"] = "cabbage" 2079 with self.assertRaises(ValueError): 2080 os.execve(args[0], args, newenv) 2081 2082 # null character in the environment variable value 2083 newenv = os.environ.copy() 2084 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage" 2085 with self.assertRaises(ValueError): 2086 os.execve(args[0], args, newenv) 2087 2088 # equal character in the environment variable name 2089 newenv = os.environ.copy() 2090 newenv["FRUIT=ORANGE"] = "lemon" 2091 with self.assertRaises(ValueError): 2092 os.execve(args[0], args, newenv) 2093 2094 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test") 2095 def test_execve_with_empty_path(self): 2096 # bpo-32890: Check GetLastError() misuse 2097 try: 2098 os.execve('', ['arg'], {}) 2099 except OSError as e: 2100 self.assertTrue(e.winerror is None or e.winerror != 0) 2101 else: 2102 self.fail('No OSError raised') 2103 2104 2105@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2106class Win32ErrorTests(unittest.TestCase): 2107 def setUp(self): 2108 try: 2109 os.stat(os_helper.TESTFN) 2110 except FileNotFoundError: 2111 exists = False 2112 except OSError as exc: 2113 exists = True 2114 self.fail("file %s must not exist; os.stat failed with %s" 2115 % (os_helper.TESTFN, exc)) 2116 else: 2117 self.fail("file %s must not exist" % os_helper.TESTFN) 2118 2119 def test_rename(self): 2120 self.assertRaises(OSError, os.rename, os_helper.TESTFN, os_helper.TESTFN+".bak") 2121 2122 def test_remove(self): 2123 self.assertRaises(OSError, os.remove, os_helper.TESTFN) 2124 2125 def test_chdir(self): 2126 self.assertRaises(OSError, os.chdir, os_helper.TESTFN) 2127 2128 def test_mkdir(self): 2129 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 2130 2131 with open(os_helper.TESTFN, "x") as f: 2132 self.assertRaises(OSError, os.mkdir, os_helper.TESTFN) 2133 2134 def test_utime(self): 2135 self.assertRaises(OSError, os.utime, os_helper.TESTFN, None) 2136 2137 def test_chmod(self): 2138 self.assertRaises(OSError, os.chmod, os_helper.TESTFN, 0) 2139 2140 2141@unittest.skipIf(support.is_wasi, "Cannot create invalid FD on WASI.") 2142class TestInvalidFD(unittest.TestCase): 2143 singles = ["fchdir", "dup", "fdatasync", "fstat", 2144 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"] 2145 #singles.append("close") 2146 #We omit close because it doesn't raise an exception on some platforms 2147 def get_single(f): 2148 def helper(self): 2149 if hasattr(os, f): 2150 self.check(getattr(os, f)) 2151 return helper 2152 for f in singles: 2153 locals()["test_"+f] = get_single(f) 2154 2155 def check(self, f, *args, **kwargs): 2156 try: 2157 f(os_helper.make_bad_fd(), *args, **kwargs) 2158 except OSError as e: 2159 self.assertEqual(e.errno, errno.EBADF) 2160 else: 2161 self.fail("%r didn't raise an OSError with a bad file descriptor" 2162 % f) 2163 2164 def test_fdopen(self): 2165 self.check(os.fdopen, encoding="utf-8") 2166 2167 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()') 2168 def test_isatty(self): 2169 self.assertEqual(os.isatty(os_helper.make_bad_fd()), False) 2170 2171 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()') 2172 def test_closerange(self): 2173 fd = os_helper.make_bad_fd() 2174 # Make sure none of the descriptors we are about to close are 2175 # currently valid (issue 6542). 2176 for i in range(10): 2177 try: os.fstat(fd+i) 2178 except OSError: 2179 pass 2180 else: 2181 break 2182 if i < 2: 2183 raise unittest.SkipTest( 2184 "Unable to acquire a range of invalid file descriptors") 2185 self.assertEqual(os.closerange(fd, fd + i-1), None) 2186 2187 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()') 2188 def test_dup2(self): 2189 self.check(os.dup2, 20) 2190 2191 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()') 2192 @unittest.skipIf( 2193 support.is_emscripten, 2194 "dup2() with negative fds is broken on Emscripten (see gh-102179)" 2195 ) 2196 def test_dup2_negative_fd(self): 2197 valid_fd = os.open(__file__, os.O_RDONLY) 2198 self.addCleanup(os.close, valid_fd) 2199 fds = [ 2200 valid_fd, 2201 -1, 2202 -2**31, 2203 ] 2204 for fd, fd2 in itertools.product(fds, repeat=2): 2205 if fd != fd2: 2206 with self.subTest(fd=fd, fd2=fd2): 2207 with self.assertRaises(OSError) as ctx: 2208 os.dup2(fd, fd2) 2209 self.assertEqual(ctx.exception.errno, errno.EBADF) 2210 2211 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()') 2212 def test_fchmod(self): 2213 self.check(os.fchmod, 0) 2214 2215 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()') 2216 def test_fchown(self): 2217 self.check(os.fchown, -1, -1) 2218 2219 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()') 2220 @unittest.skipIf( 2221 support.is_emscripten or support.is_wasi, 2222 "musl libc issue on Emscripten/WASI, bpo-46390" 2223 ) 2224 def test_fpathconf(self): 2225 self.check(os.pathconf, "PC_NAME_MAX") 2226 self.check(os.fpathconf, "PC_NAME_MAX") 2227 2228 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()') 2229 def test_ftruncate(self): 2230 self.check(os.truncate, 0) 2231 self.check(os.ftruncate, 0) 2232 2233 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()') 2234 def test_lseek(self): 2235 self.check(os.lseek, 0, 0) 2236 2237 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()') 2238 def test_read(self): 2239 self.check(os.read, 1) 2240 2241 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()') 2242 def test_readv(self): 2243 buf = bytearray(10) 2244 self.check(os.readv, [buf]) 2245 2246 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()') 2247 def test_tcsetpgrpt(self): 2248 self.check(os.tcsetpgrp, 0) 2249 2250 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()') 2251 def test_write(self): 2252 self.check(os.write, b" ") 2253 2254 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()') 2255 def test_writev(self): 2256 self.check(os.writev, [b'abc']) 2257 2258 @support.requires_subprocess() 2259 def test_inheritable(self): 2260 self.check(os.get_inheritable) 2261 self.check(os.set_inheritable, True) 2262 2263 @unittest.skipUnless(hasattr(os, 'get_blocking'), 2264 'needs os.get_blocking() and os.set_blocking()') 2265 def test_blocking(self): 2266 self.check(os.get_blocking) 2267 self.check(os.set_blocking, True) 2268 2269 2270@unittest.skipUnless(hasattr(os, 'link'), 'requires os.link') 2271class LinkTests(unittest.TestCase): 2272 def setUp(self): 2273 self.file1 = os_helper.TESTFN 2274 self.file2 = os.path.join(os_helper.TESTFN + "2") 2275 2276 def tearDown(self): 2277 for file in (self.file1, self.file2): 2278 if os.path.exists(file): 2279 os.unlink(file) 2280 2281 def _test_link(self, file1, file2): 2282 create_file(file1) 2283 2284 try: 2285 os.link(file1, file2) 2286 except PermissionError as e: 2287 self.skipTest('os.link(): %s' % e) 2288 with open(file1, "rb") as f1, open(file2, "rb") as f2: 2289 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno())) 2290 2291 def test_link(self): 2292 self._test_link(self.file1, self.file2) 2293 2294 def test_link_bytes(self): 2295 self._test_link(bytes(self.file1, sys.getfilesystemencoding()), 2296 bytes(self.file2, sys.getfilesystemencoding())) 2297 2298 def test_unicode_name(self): 2299 try: 2300 os.fsencode("\xf1") 2301 except UnicodeError: 2302 raise unittest.SkipTest("Unable to encode for this platform.") 2303 2304 self.file1 += "\xf1" 2305 self.file2 = self.file1 + "2" 2306 self._test_link(self.file1, self.file2) 2307 2308@unittest.skipIf(sys.platform == "win32", "Posix specific tests") 2309class PosixUidGidTests(unittest.TestCase): 2310 # uid_t and gid_t are 32-bit unsigned integers on Linux 2311 UID_OVERFLOW = (1 << 32) 2312 GID_OVERFLOW = (1 << 32) 2313 2314 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()') 2315 def test_setuid(self): 2316 if os.getuid() != 0: 2317 self.assertRaises(OSError, os.setuid, 0) 2318 self.assertRaises(TypeError, os.setuid, 'not an int') 2319 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW) 2320 2321 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()') 2322 def test_setgid(self): 2323 if os.getuid() != 0 and not HAVE_WHEEL_GROUP: 2324 self.assertRaises(OSError, os.setgid, 0) 2325 self.assertRaises(TypeError, os.setgid, 'not an int') 2326 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW) 2327 2328 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()') 2329 def test_seteuid(self): 2330 if os.getuid() != 0: 2331 self.assertRaises(OSError, os.seteuid, 0) 2332 self.assertRaises(TypeError, os.setegid, 'not an int') 2333 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW) 2334 2335 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()') 2336 def test_setegid(self): 2337 if os.getuid() != 0 and not HAVE_WHEEL_GROUP: 2338 self.assertRaises(OSError, os.setegid, 0) 2339 self.assertRaises(TypeError, os.setegid, 'not an int') 2340 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW) 2341 2342 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') 2343 def test_setreuid(self): 2344 if os.getuid() != 0: 2345 self.assertRaises(OSError, os.setreuid, 0, 0) 2346 self.assertRaises(TypeError, os.setreuid, 'not an int', 0) 2347 self.assertRaises(TypeError, os.setreuid, 0, 'not an int') 2348 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0) 2349 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW) 2350 2351 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') 2352 @support.requires_subprocess() 2353 def test_setreuid_neg1(self): 2354 # Needs to accept -1. We run this in a subprocess to avoid 2355 # altering the test runner's process state (issue8045). 2356 subprocess.check_call([ 2357 sys.executable, '-c', 2358 'import os,sys;os.setreuid(-1,-1);sys.exit(0)']) 2359 2360 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') 2361 @support.requires_subprocess() 2362 def test_setregid(self): 2363 if os.getuid() != 0 and not HAVE_WHEEL_GROUP: 2364 self.assertRaises(OSError, os.setregid, 0, 0) 2365 self.assertRaises(TypeError, os.setregid, 'not an int', 0) 2366 self.assertRaises(TypeError, os.setregid, 0, 'not an int') 2367 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0) 2368 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW) 2369 2370 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') 2371 @support.requires_subprocess() 2372 def test_setregid_neg1(self): 2373 # Needs to accept -1. We run this in a subprocess to avoid 2374 # altering the test runner's process state (issue8045). 2375 subprocess.check_call([ 2376 sys.executable, '-c', 2377 'import os,sys;os.setregid(-1,-1);sys.exit(0)']) 2378 2379@unittest.skipIf(sys.platform == "win32", "Posix specific tests") 2380class Pep383Tests(unittest.TestCase): 2381 def setUp(self): 2382 if os_helper.TESTFN_UNENCODABLE: 2383 self.dir = os_helper.TESTFN_UNENCODABLE 2384 elif os_helper.TESTFN_NONASCII: 2385 self.dir = os_helper.TESTFN_NONASCII 2386 else: 2387 self.dir = os_helper.TESTFN 2388 self.bdir = os.fsencode(self.dir) 2389 2390 bytesfn = [] 2391 def add_filename(fn): 2392 try: 2393 fn = os.fsencode(fn) 2394 except UnicodeEncodeError: 2395 return 2396 bytesfn.append(fn) 2397 add_filename(os_helper.TESTFN_UNICODE) 2398 if os_helper.TESTFN_UNENCODABLE: 2399 add_filename(os_helper.TESTFN_UNENCODABLE) 2400 if os_helper.TESTFN_NONASCII: 2401 add_filename(os_helper.TESTFN_NONASCII) 2402 if not bytesfn: 2403 self.skipTest("couldn't create any non-ascii filename") 2404 2405 self.unicodefn = set() 2406 os.mkdir(self.dir) 2407 try: 2408 for fn in bytesfn: 2409 os_helper.create_empty_file(os.path.join(self.bdir, fn)) 2410 fn = os.fsdecode(fn) 2411 if fn in self.unicodefn: 2412 raise ValueError("duplicate filename") 2413 self.unicodefn.add(fn) 2414 except: 2415 shutil.rmtree(self.dir) 2416 raise 2417 2418 def tearDown(self): 2419 shutil.rmtree(self.dir) 2420 2421 def test_listdir(self): 2422 expected = self.unicodefn 2423 found = set(os.listdir(self.dir)) 2424 self.assertEqual(found, expected) 2425 # test listdir without arguments 2426 current_directory = os.getcwd() 2427 try: 2428 os.chdir(os.sep) 2429 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep))) 2430 finally: 2431 os.chdir(current_directory) 2432 2433 def test_open(self): 2434 for fn in self.unicodefn: 2435 f = open(os.path.join(self.dir, fn), 'rb') 2436 f.close() 2437 2438 @unittest.skipUnless(hasattr(os, 'statvfs'), 2439 "need os.statvfs()") 2440 def test_statvfs(self): 2441 # issue #9645 2442 for fn in self.unicodefn: 2443 # should not fail with file not found error 2444 fullname = os.path.join(self.dir, fn) 2445 os.statvfs(fullname) 2446 2447 def test_stat(self): 2448 for fn in self.unicodefn: 2449 os.stat(os.path.join(self.dir, fn)) 2450 2451@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2452class Win32KillTests(unittest.TestCase): 2453 def _kill(self, sig): 2454 # Start sys.executable as a subprocess and communicate from the 2455 # subprocess to the parent that the interpreter is ready. When it 2456 # becomes ready, send *sig* via os.kill to the subprocess and check 2457 # that the return code is equal to *sig*. 2458 import ctypes 2459 from ctypes import wintypes 2460 import msvcrt 2461 2462 # Since we can't access the contents of the process' stdout until the 2463 # process has exited, use PeekNamedPipe to see what's inside stdout 2464 # without waiting. This is done so we can tell that the interpreter 2465 # is started and running at a point where it could handle a signal. 2466 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe 2467 PeekNamedPipe.restype = wintypes.BOOL 2468 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle 2469 ctypes.POINTER(ctypes.c_char), # stdout buf 2470 wintypes.DWORD, # Buffer size 2471 ctypes.POINTER(wintypes.DWORD), # bytes read 2472 ctypes.POINTER(wintypes.DWORD), # bytes avail 2473 ctypes.POINTER(wintypes.DWORD)) # bytes left 2474 msg = "running" 2475 proc = subprocess.Popen([sys.executable, "-c", 2476 "import sys;" 2477 "sys.stdout.write('{}');" 2478 "sys.stdout.flush();" 2479 "input()".format(msg)], 2480 stdout=subprocess.PIPE, 2481 stderr=subprocess.PIPE, 2482 stdin=subprocess.PIPE) 2483 self.addCleanup(proc.stdout.close) 2484 self.addCleanup(proc.stderr.close) 2485 self.addCleanup(proc.stdin.close) 2486 2487 count, max = 0, 100 2488 while count < max and proc.poll() is None: 2489 # Create a string buffer to store the result of stdout from the pipe 2490 buf = ctypes.create_string_buffer(len(msg)) 2491 # Obtain the text currently in proc.stdout 2492 # Bytes read/avail/left are left as NULL and unused 2493 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()), 2494 buf, ctypes.sizeof(buf), None, None, None) 2495 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed") 2496 if buf.value: 2497 self.assertEqual(msg, buf.value.decode()) 2498 break 2499 time.sleep(0.1) 2500 count += 1 2501 else: 2502 self.fail("Did not receive communication from the subprocess") 2503 2504 os.kill(proc.pid, sig) 2505 self.assertEqual(proc.wait(), sig) 2506 2507 def test_kill_sigterm(self): 2508 # SIGTERM doesn't mean anything special, but make sure it works 2509 self._kill(signal.SIGTERM) 2510 2511 def test_kill_int(self): 2512 # os.kill on Windows can take an int which gets set as the exit code 2513 self._kill(100) 2514 2515 @unittest.skipIf(mmap is None, "requires mmap") 2516 def _kill_with_event(self, event, name): 2517 tagname = "test_os_%s" % uuid.uuid1() 2518 m = mmap.mmap(-1, 1, tagname) 2519 m[0] = 0 2520 # Run a script which has console control handling enabled. 2521 proc = subprocess.Popen([sys.executable, 2522 os.path.join(os.path.dirname(__file__), 2523 "win_console_handler.py"), tagname], 2524 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) 2525 # Let the interpreter startup before we send signals. See #3137. 2526 count, max = 0, 100 2527 while count < max and proc.poll() is None: 2528 if m[0] == 1: 2529 break 2530 time.sleep(0.1) 2531 count += 1 2532 else: 2533 # Forcefully kill the process if we weren't able to signal it. 2534 os.kill(proc.pid, signal.SIGINT) 2535 self.fail("Subprocess didn't finish initialization") 2536 os.kill(proc.pid, event) 2537 # proc.send_signal(event) could also be done here. 2538 # Allow time for the signal to be passed and the process to exit. 2539 time.sleep(0.5) 2540 if not proc.poll(): 2541 # Forcefully kill the process if we weren't able to signal it. 2542 os.kill(proc.pid, signal.SIGINT) 2543 self.fail("subprocess did not stop on {}".format(name)) 2544 2545 @unittest.skip("subprocesses aren't inheriting Ctrl+C property") 2546 @support.requires_subprocess() 2547 def test_CTRL_C_EVENT(self): 2548 from ctypes import wintypes 2549 import ctypes 2550 2551 # Make a NULL value by creating a pointer with no argument. 2552 NULL = ctypes.POINTER(ctypes.c_int)() 2553 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler 2554 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int), 2555 wintypes.BOOL) 2556 SetConsoleCtrlHandler.restype = wintypes.BOOL 2557 2558 # Calling this with NULL and FALSE causes the calling process to 2559 # handle Ctrl+C, rather than ignore it. This property is inherited 2560 # by subprocesses. 2561 SetConsoleCtrlHandler(NULL, 0) 2562 2563 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT") 2564 2565 @support.requires_subprocess() 2566 def test_CTRL_BREAK_EVENT(self): 2567 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT") 2568 2569 2570@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2571class Win32ListdirTests(unittest.TestCase): 2572 """Test listdir on Windows.""" 2573 2574 def setUp(self): 2575 self.created_paths = [] 2576 for i in range(2): 2577 dir_name = 'SUB%d' % i 2578 dir_path = os.path.join(os_helper.TESTFN, dir_name) 2579 file_name = 'FILE%d' % i 2580 file_path = os.path.join(os_helper.TESTFN, file_name) 2581 os.makedirs(dir_path) 2582 with open(file_path, 'w', encoding='utf-8') as f: 2583 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path) 2584 self.created_paths.extend([dir_name, file_name]) 2585 self.created_paths.sort() 2586 2587 def tearDown(self): 2588 shutil.rmtree(os_helper.TESTFN) 2589 2590 def test_listdir_no_extended_path(self): 2591 """Test when the path is not an "extended" path.""" 2592 # unicode 2593 self.assertEqual( 2594 sorted(os.listdir(os_helper.TESTFN)), 2595 self.created_paths) 2596 2597 # bytes 2598 self.assertEqual( 2599 sorted(os.listdir(os.fsencode(os_helper.TESTFN))), 2600 [os.fsencode(path) for path in self.created_paths]) 2601 2602 def test_listdir_extended_path(self): 2603 """Test when the path starts with '\\\\?\\'.""" 2604 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath 2605 # unicode 2606 path = '\\\\?\\' + os.path.abspath(os_helper.TESTFN) 2607 self.assertEqual( 2608 sorted(os.listdir(path)), 2609 self.created_paths) 2610 2611 # bytes 2612 path = b'\\\\?\\' + os.fsencode(os.path.abspath(os_helper.TESTFN)) 2613 self.assertEqual( 2614 sorted(os.listdir(path)), 2615 [os.fsencode(path) for path in self.created_paths]) 2616 2617 2618@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()') 2619class ReadlinkTests(unittest.TestCase): 2620 filelink = 'readlinktest' 2621 filelink_target = os.path.abspath(__file__) 2622 filelinkb = os.fsencode(filelink) 2623 filelinkb_target = os.fsencode(filelink_target) 2624 2625 def assertPathEqual(self, left, right): 2626 left = os.path.normcase(left) 2627 right = os.path.normcase(right) 2628 if sys.platform == 'win32': 2629 # Bad practice to blindly strip the prefix as it may be required to 2630 # correctly refer to the file, but we're only comparing paths here. 2631 has_prefix = lambda p: p.startswith( 2632 b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\') 2633 if has_prefix(left): 2634 left = left[4:] 2635 if has_prefix(right): 2636 right = right[4:] 2637 self.assertEqual(left, right) 2638 2639 def setUp(self): 2640 self.assertTrue(os.path.exists(self.filelink_target)) 2641 self.assertTrue(os.path.exists(self.filelinkb_target)) 2642 self.assertFalse(os.path.exists(self.filelink)) 2643 self.assertFalse(os.path.exists(self.filelinkb)) 2644 2645 def test_not_symlink(self): 2646 filelink_target = FakePath(self.filelink_target) 2647 self.assertRaises(OSError, os.readlink, self.filelink_target) 2648 self.assertRaises(OSError, os.readlink, filelink_target) 2649 2650 def test_missing_link(self): 2651 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link') 2652 self.assertRaises(FileNotFoundError, os.readlink, 2653 FakePath('missing-link')) 2654 2655 @os_helper.skip_unless_symlink 2656 def test_pathlike(self): 2657 os.symlink(self.filelink_target, self.filelink) 2658 self.addCleanup(os_helper.unlink, self.filelink) 2659 filelink = FakePath(self.filelink) 2660 self.assertPathEqual(os.readlink(filelink), self.filelink_target) 2661 2662 @os_helper.skip_unless_symlink 2663 def test_pathlike_bytes(self): 2664 os.symlink(self.filelinkb_target, self.filelinkb) 2665 self.addCleanup(os_helper.unlink, self.filelinkb) 2666 path = os.readlink(FakePath(self.filelinkb)) 2667 self.assertPathEqual(path, self.filelinkb_target) 2668 self.assertIsInstance(path, bytes) 2669 2670 @os_helper.skip_unless_symlink 2671 def test_bytes(self): 2672 os.symlink(self.filelinkb_target, self.filelinkb) 2673 self.addCleanup(os_helper.unlink, self.filelinkb) 2674 path = os.readlink(self.filelinkb) 2675 self.assertPathEqual(path, self.filelinkb_target) 2676 self.assertIsInstance(path, bytes) 2677 2678 2679@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2680@os_helper.skip_unless_symlink 2681class Win32SymlinkTests(unittest.TestCase): 2682 filelink = 'filelinktest' 2683 filelink_target = os.path.abspath(__file__) 2684 dirlink = 'dirlinktest' 2685 dirlink_target = os.path.dirname(filelink_target) 2686 missing_link = 'missing link' 2687 2688 def setUp(self): 2689 assert os.path.exists(self.dirlink_target) 2690 assert os.path.exists(self.filelink_target) 2691 assert not os.path.exists(self.dirlink) 2692 assert not os.path.exists(self.filelink) 2693 assert not os.path.exists(self.missing_link) 2694 2695 def tearDown(self): 2696 if os.path.exists(self.filelink): 2697 os.remove(self.filelink) 2698 if os.path.exists(self.dirlink): 2699 os.rmdir(self.dirlink) 2700 if os.path.lexists(self.missing_link): 2701 os.remove(self.missing_link) 2702 2703 def test_directory_link(self): 2704 os.symlink(self.dirlink_target, self.dirlink) 2705 self.assertTrue(os.path.exists(self.dirlink)) 2706 self.assertTrue(os.path.isdir(self.dirlink)) 2707 self.assertTrue(os.path.islink(self.dirlink)) 2708 self.check_stat(self.dirlink, self.dirlink_target) 2709 2710 def test_file_link(self): 2711 os.symlink(self.filelink_target, self.filelink) 2712 self.assertTrue(os.path.exists(self.filelink)) 2713 self.assertTrue(os.path.isfile(self.filelink)) 2714 self.assertTrue(os.path.islink(self.filelink)) 2715 self.check_stat(self.filelink, self.filelink_target) 2716 2717 def _create_missing_dir_link(self): 2718 'Create a "directory" link to a non-existent target' 2719 linkname = self.missing_link 2720 if os.path.lexists(linkname): 2721 os.remove(linkname) 2722 target = r'c:\\target does not exist.29r3c740' 2723 assert not os.path.exists(target) 2724 target_is_dir = True 2725 os.symlink(target, linkname, target_is_dir) 2726 2727 def test_remove_directory_link_to_missing_target(self): 2728 self._create_missing_dir_link() 2729 # For compatibility with Unix, os.remove will check the 2730 # directory status and call RemoveDirectory if the symlink 2731 # was created with target_is_dir==True. 2732 os.remove(self.missing_link) 2733 2734 def test_isdir_on_directory_link_to_missing_target(self): 2735 self._create_missing_dir_link() 2736 self.assertFalse(os.path.isdir(self.missing_link)) 2737 2738 def test_rmdir_on_directory_link_to_missing_target(self): 2739 self._create_missing_dir_link() 2740 os.rmdir(self.missing_link) 2741 2742 def check_stat(self, link, target): 2743 self.assertEqual(os.stat(link), os.stat(target)) 2744 self.assertNotEqual(os.lstat(link), os.stat(link)) 2745 2746 bytes_link = os.fsencode(link) 2747 self.assertEqual(os.stat(bytes_link), os.stat(target)) 2748 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link)) 2749 2750 def test_12084(self): 2751 level1 = os.path.abspath(os_helper.TESTFN) 2752 level2 = os.path.join(level1, "level2") 2753 level3 = os.path.join(level2, "level3") 2754 self.addCleanup(os_helper.rmtree, level1) 2755 2756 os.mkdir(level1) 2757 os.mkdir(level2) 2758 os.mkdir(level3) 2759 2760 file1 = os.path.abspath(os.path.join(level1, "file1")) 2761 create_file(file1) 2762 2763 orig_dir = os.getcwd() 2764 try: 2765 os.chdir(level2) 2766 link = os.path.join(level2, "link") 2767 os.symlink(os.path.relpath(file1), "link") 2768 self.assertIn("link", os.listdir(os.getcwd())) 2769 2770 # Check os.stat calls from the same dir as the link 2771 self.assertEqual(os.stat(file1), os.stat("link")) 2772 2773 # Check os.stat calls from a dir below the link 2774 os.chdir(level1) 2775 self.assertEqual(os.stat(file1), 2776 os.stat(os.path.relpath(link))) 2777 2778 # Check os.stat calls from a dir above the link 2779 os.chdir(level3) 2780 self.assertEqual(os.stat(file1), 2781 os.stat(os.path.relpath(link))) 2782 finally: 2783 os.chdir(orig_dir) 2784 2785 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users') 2786 and os.path.exists(r'C:\ProgramData'), 2787 'Test directories not found') 2788 def test_29248(self): 2789 # os.symlink() calls CreateSymbolicLink, which creates 2790 # the reparse data buffer with the print name stored 2791 # first, so the offset is always 0. CreateSymbolicLink 2792 # stores the "PrintName" DOS path (e.g. "C:\") first, 2793 # with an offset of 0, followed by the "SubstituteName" 2794 # NT path (e.g. "\??\C:\"). The "All Users" link, on 2795 # the other hand, seems to have been created manually 2796 # with an inverted order. 2797 target = os.readlink(r'C:\Users\All Users') 2798 self.assertTrue(os.path.samefile(target, r'C:\ProgramData')) 2799 2800 def test_buffer_overflow(self): 2801 # Older versions would have a buffer overflow when detecting 2802 # whether a link source was a directory. This test ensures we 2803 # no longer crash, but does not otherwise validate the behavior 2804 segment = 'X' * 27 2805 path = os.path.join(*[segment] * 10) 2806 test_cases = [ 2807 # overflow with absolute src 2808 ('\\' + path, segment), 2809 # overflow dest with relative src 2810 (segment, path), 2811 # overflow when joining src 2812 (path[:180], path[:180]), 2813 ] 2814 for src, dest in test_cases: 2815 try: 2816 os.symlink(src, dest) 2817 except FileNotFoundError: 2818 pass 2819 else: 2820 try: 2821 os.remove(dest) 2822 except OSError: 2823 pass 2824 # Also test with bytes, since that is a separate code path. 2825 try: 2826 os.symlink(os.fsencode(src), os.fsencode(dest)) 2827 except FileNotFoundError: 2828 pass 2829 else: 2830 try: 2831 os.remove(dest) 2832 except OSError: 2833 pass 2834 2835 def test_appexeclink(self): 2836 root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps') 2837 if not os.path.isdir(root): 2838 self.skipTest("test requires a WindowsApps directory") 2839 2840 aliases = [os.path.join(root, a) 2841 for a in fnmatch.filter(os.listdir(root), '*.exe')] 2842 2843 for alias in aliases: 2844 if support.verbose: 2845 print() 2846 print("Testing with", alias) 2847 st = os.lstat(alias) 2848 self.assertEqual(st, os.stat(alias)) 2849 self.assertFalse(stat.S_ISLNK(st.st_mode)) 2850 self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK) 2851 # testing the first one we see is sufficient 2852 break 2853 else: 2854 self.skipTest("test requires an app execution alias") 2855 2856@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2857class Win32JunctionTests(unittest.TestCase): 2858 junction = 'junctiontest' 2859 junction_target = os.path.dirname(os.path.abspath(__file__)) 2860 2861 def setUp(self): 2862 assert os.path.exists(self.junction_target) 2863 assert not os.path.lexists(self.junction) 2864 2865 def tearDown(self): 2866 if os.path.lexists(self.junction): 2867 os.unlink(self.junction) 2868 2869 def test_create_junction(self): 2870 _winapi.CreateJunction(self.junction_target, self.junction) 2871 self.assertTrue(os.path.lexists(self.junction)) 2872 self.assertTrue(os.path.exists(self.junction)) 2873 self.assertTrue(os.path.isdir(self.junction)) 2874 self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction)) 2875 self.assertEqual(os.stat(self.junction), os.stat(self.junction_target)) 2876 2877 # bpo-37834: Junctions are not recognized as links. 2878 self.assertFalse(os.path.islink(self.junction)) 2879 self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target), 2880 os.path.normcase(os.readlink(self.junction))) 2881 2882 def test_unlink_removes_junction(self): 2883 _winapi.CreateJunction(self.junction_target, self.junction) 2884 self.assertTrue(os.path.exists(self.junction)) 2885 self.assertTrue(os.path.lexists(self.junction)) 2886 2887 os.unlink(self.junction) 2888 self.assertFalse(os.path.exists(self.junction)) 2889 2890@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2891class Win32NtTests(unittest.TestCase): 2892 def test_getfinalpathname_handles(self): 2893 nt = import_helper.import_module('nt') 2894 ctypes = import_helper.import_module('ctypes') 2895 import ctypes.wintypes 2896 2897 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True) 2898 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE 2899 2900 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL 2901 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE, 2902 ctypes.wintypes.LPDWORD) 2903 2904 # This is a pseudo-handle that doesn't need to be closed 2905 hproc = kernel.GetCurrentProcess() 2906 2907 handle_count = ctypes.wintypes.DWORD() 2908 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count)) 2909 self.assertEqual(1, ok) 2910 2911 before_count = handle_count.value 2912 2913 # The first two test the error path, __file__ tests the success path 2914 filenames = [ 2915 r'\\?\C:', 2916 r'\\?\NUL', 2917 r'\\?\CONIN', 2918 __file__, 2919 ] 2920 2921 for _ in range(10): 2922 for name in filenames: 2923 try: 2924 nt._getfinalpathname(name) 2925 except Exception: 2926 # Failure is expected 2927 pass 2928 try: 2929 os.stat(name) 2930 except Exception: 2931 pass 2932 2933 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count)) 2934 self.assertEqual(1, ok) 2935 2936 handle_delta = handle_count.value - before_count 2937 2938 self.assertEqual(0, handle_delta) 2939 2940 @support.requires_subprocess() 2941 def test_stat_unlink_race(self): 2942 # bpo-46785: the implementation of os.stat() falls back to reading 2943 # the parent directory if CreateFileW() fails with a permission 2944 # error. If reading the parent directory fails because the file or 2945 # directory are subsequently unlinked, or because the volume or 2946 # share are no longer available, then the original permission error 2947 # should not be restored. 2948 filename = os_helper.TESTFN 2949 self.addCleanup(os_helper.unlink, filename) 2950 deadline = time.time() + 5 2951 command = textwrap.dedent("""\ 2952 import os 2953 import sys 2954 import time 2955 2956 filename = sys.argv[1] 2957 deadline = float(sys.argv[2]) 2958 2959 while time.time() < deadline: 2960 try: 2961 with open(filename, "w") as f: 2962 pass 2963 except OSError: 2964 pass 2965 try: 2966 os.remove(filename) 2967 except OSError: 2968 pass 2969 """) 2970 2971 with subprocess.Popen([sys.executable, '-c', command, filename, str(deadline)]) as proc: 2972 while time.time() < deadline: 2973 try: 2974 os.stat(filename) 2975 except FileNotFoundError as e: 2976 assert e.winerror == 2 # ERROR_FILE_NOT_FOUND 2977 try: 2978 proc.wait(1) 2979 except subprocess.TimeoutExpired: 2980 proc.terminate() 2981 2982 2983@os_helper.skip_unless_symlink 2984class NonLocalSymlinkTests(unittest.TestCase): 2985 2986 def setUp(self): 2987 r""" 2988 Create this structure: 2989 2990 base 2991 \___ some_dir 2992 """ 2993 os.makedirs('base/some_dir') 2994 2995 def tearDown(self): 2996 shutil.rmtree('base') 2997 2998 def test_directory_link_nonlocal(self): 2999 """ 3000 The symlink target should resolve relative to the link, not relative 3001 to the current directory. 3002 3003 Then, link base/some_link -> base/some_dir and ensure that some_link 3004 is resolved as a directory. 3005 3006 In issue13772, it was discovered that directory detection failed if 3007 the symlink target was not specified relative to the current 3008 directory, which was a defect in the implementation. 3009 """ 3010 src = os.path.join('base', 'some_link') 3011 os.symlink('some_dir', src) 3012 assert os.path.isdir(src) 3013 3014 3015class FSEncodingTests(unittest.TestCase): 3016 def test_nop(self): 3017 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff') 3018 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141') 3019 3020 def test_identity(self): 3021 # assert fsdecode(fsencode(x)) == x 3022 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'): 3023 try: 3024 bytesfn = os.fsencode(fn) 3025 except UnicodeEncodeError: 3026 continue 3027 self.assertEqual(os.fsdecode(bytesfn), fn) 3028 3029 3030 3031class DeviceEncodingTests(unittest.TestCase): 3032 3033 def test_bad_fd(self): 3034 # Return None when an fd doesn't actually exist. 3035 self.assertIsNone(os.device_encoding(123456)) 3036 3037 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or 3038 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))), 3039 'test requires a tty and either Windows or nl_langinfo(CODESET)') 3040 @unittest.skipIf( 3041 support.is_emscripten, "Cannot get encoding of stdin on Emscripten" 3042 ) 3043 def test_device_encoding(self): 3044 encoding = os.device_encoding(0) 3045 self.assertIsNotNone(encoding) 3046 self.assertTrue(codecs.lookup(encoding)) 3047 3048 3049@support.requires_subprocess() 3050class PidTests(unittest.TestCase): 3051 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid") 3052 def test_getppid(self): 3053 p = subprocess.Popen([sys.executable, '-c', 3054 'import os; print(os.getppid())'], 3055 stdout=subprocess.PIPE) 3056 stdout, _ = p.communicate() 3057 # We are the parent of our subprocess 3058 self.assertEqual(int(stdout), os.getpid()) 3059 3060 def check_waitpid(self, code, exitcode, callback=None): 3061 if sys.platform == 'win32': 3062 # On Windows, os.spawnv() simply joins arguments with spaces: 3063 # arguments need to be quoted 3064 args = [f'"{sys.executable}"', '-c', f'"{code}"'] 3065 else: 3066 args = [sys.executable, '-c', code] 3067 pid = os.spawnv(os.P_NOWAIT, sys.executable, args) 3068 3069 if callback is not None: 3070 callback(pid) 3071 3072 # don't use support.wait_process() to test directly os.waitpid() 3073 # and os.waitstatus_to_exitcode() 3074 pid2, status = os.waitpid(pid, 0) 3075 self.assertEqual(os.waitstatus_to_exitcode(status), exitcode) 3076 self.assertEqual(pid2, pid) 3077 3078 def test_waitpid(self): 3079 self.check_waitpid(code='pass', exitcode=0) 3080 3081 def test_waitstatus_to_exitcode(self): 3082 exitcode = 23 3083 code = f'import sys; sys.exit({exitcode})' 3084 self.check_waitpid(code, exitcode=exitcode) 3085 3086 with self.assertRaises(TypeError): 3087 os.waitstatus_to_exitcode(0.0) 3088 3089 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test') 3090 def test_waitpid_windows(self): 3091 # bpo-40138: test os.waitpid() and os.waitstatus_to_exitcode() 3092 # with exit code larger than INT_MAX. 3093 STATUS_CONTROL_C_EXIT = 0xC000013A 3094 code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})' 3095 self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT) 3096 3097 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test') 3098 def test_waitstatus_to_exitcode_windows(self): 3099 max_exitcode = 2 ** 32 - 1 3100 for exitcode in (0, 1, 5, max_exitcode): 3101 self.assertEqual(os.waitstatus_to_exitcode(exitcode << 8), 3102 exitcode) 3103 3104 # invalid values 3105 with self.assertRaises(ValueError): 3106 os.waitstatus_to_exitcode((max_exitcode + 1) << 8) 3107 with self.assertRaises(OverflowError): 3108 os.waitstatus_to_exitcode(-1) 3109 3110 # Skip the test on Windows 3111 @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'need signal.SIGKILL') 3112 def test_waitstatus_to_exitcode_kill(self): 3113 code = f'import time; time.sleep({support.LONG_TIMEOUT})' 3114 signum = signal.SIGKILL 3115 3116 def kill_process(pid): 3117 os.kill(pid, signum) 3118 3119 self.check_waitpid(code, exitcode=-signum, callback=kill_process) 3120 3121 3122@support.requires_subprocess() 3123class SpawnTests(unittest.TestCase): 3124 @staticmethod 3125 def quote_args(args): 3126 # On Windows, os.spawn* simply joins arguments with spaces: 3127 # arguments need to be quoted 3128 if os.name != 'nt': 3129 return args 3130 return [f'"{arg}"' if " " in arg.strip() else arg for arg in args] 3131 3132 def create_args(self, *, with_env=False, use_bytes=False): 3133 self.exitcode = 17 3134 3135 filename = os_helper.TESTFN 3136 self.addCleanup(os_helper.unlink, filename) 3137 3138 if not with_env: 3139 code = 'import sys; sys.exit(%s)' % self.exitcode 3140 else: 3141 self.env = dict(os.environ) 3142 # create an unique key 3143 self.key = str(uuid.uuid4()) 3144 self.env[self.key] = self.key 3145 # read the variable from os.environ to check that it exists 3146 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)' 3147 % (self.key, self.exitcode)) 3148 3149 with open(filename, "w", encoding="utf-8") as fp: 3150 fp.write(code) 3151 3152 program = sys.executable 3153 args = self.quote_args([program, filename]) 3154 if use_bytes: 3155 program = os.fsencode(program) 3156 args = [os.fsencode(a) for a in args] 3157 self.env = {os.fsencode(k): os.fsencode(v) 3158 for k, v in self.env.items()} 3159 3160 return program, args 3161 3162 @requires_os_func('spawnl') 3163 def test_spawnl(self): 3164 program, args = self.create_args() 3165 exitcode = os.spawnl(os.P_WAIT, program, *args) 3166 self.assertEqual(exitcode, self.exitcode) 3167 3168 @requires_os_func('spawnle') 3169 def test_spawnle(self): 3170 program, args = self.create_args(with_env=True) 3171 exitcode = os.spawnle(os.P_WAIT, program, *args, self.env) 3172 self.assertEqual(exitcode, self.exitcode) 3173 3174 @requires_os_func('spawnlp') 3175 def test_spawnlp(self): 3176 program, args = self.create_args() 3177 exitcode = os.spawnlp(os.P_WAIT, program, *args) 3178 self.assertEqual(exitcode, self.exitcode) 3179 3180 @requires_os_func('spawnlpe') 3181 def test_spawnlpe(self): 3182 program, args = self.create_args(with_env=True) 3183 exitcode = os.spawnlpe(os.P_WAIT, program, *args, self.env) 3184 self.assertEqual(exitcode, self.exitcode) 3185 3186 @requires_os_func('spawnv') 3187 def test_spawnv(self): 3188 program, args = self.create_args() 3189 exitcode = os.spawnv(os.P_WAIT, program, args) 3190 self.assertEqual(exitcode, self.exitcode) 3191 3192 # Test for PyUnicode_FSConverter() 3193 exitcode = os.spawnv(os.P_WAIT, FakePath(program), args) 3194 self.assertEqual(exitcode, self.exitcode) 3195 3196 @requires_os_func('spawnve') 3197 def test_spawnve(self): 3198 program, args = self.create_args(with_env=True) 3199 exitcode = os.spawnve(os.P_WAIT, program, args, self.env) 3200 self.assertEqual(exitcode, self.exitcode) 3201 3202 @requires_os_func('spawnvp') 3203 def test_spawnvp(self): 3204 program, args = self.create_args() 3205 exitcode = os.spawnvp(os.P_WAIT, program, args) 3206 self.assertEqual(exitcode, self.exitcode) 3207 3208 @requires_os_func('spawnvpe') 3209 def test_spawnvpe(self): 3210 program, args = self.create_args(with_env=True) 3211 exitcode = os.spawnvpe(os.P_WAIT, program, args, self.env) 3212 self.assertEqual(exitcode, self.exitcode) 3213 3214 @requires_os_func('spawnv') 3215 def test_nowait(self): 3216 program, args = self.create_args() 3217 pid = os.spawnv(os.P_NOWAIT, program, args) 3218 support.wait_process(pid, exitcode=self.exitcode) 3219 3220 @requires_os_func('spawnve') 3221 def test_spawnve_bytes(self): 3222 # Test bytes handling in parse_arglist and parse_envlist (#28114) 3223 program, args = self.create_args(with_env=True, use_bytes=True) 3224 exitcode = os.spawnve(os.P_WAIT, program, args, self.env) 3225 self.assertEqual(exitcode, self.exitcode) 3226 3227 @requires_os_func('spawnl') 3228 def test_spawnl_noargs(self): 3229 program, __ = self.create_args() 3230 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, program) 3231 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, program, '') 3232 3233 @requires_os_func('spawnle') 3234 def test_spawnle_noargs(self): 3235 program, __ = self.create_args() 3236 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, program, {}) 3237 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, program, '', {}) 3238 3239 @requires_os_func('spawnv') 3240 def test_spawnv_noargs(self): 3241 program, __ = self.create_args() 3242 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, program, ()) 3243 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, program, []) 3244 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, program, ('',)) 3245 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, program, ['']) 3246 3247 @requires_os_func('spawnve') 3248 def test_spawnve_noargs(self): 3249 program, __ = self.create_args() 3250 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, program, (), {}) 3251 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, program, [], {}) 3252 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, program, ('',), {}) 3253 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, program, [''], {}) 3254 3255 def _test_invalid_env(self, spawn): 3256 program = sys.executable 3257 args = self.quote_args([program, '-c', 'pass']) 3258 3259 # null character in the environment variable name 3260 newenv = os.environ.copy() 3261 newenv["FRUIT\0VEGETABLE"] = "cabbage" 3262 try: 3263 exitcode = spawn(os.P_WAIT, program, args, newenv) 3264 except ValueError: 3265 pass 3266 else: 3267 self.assertEqual(exitcode, 127) 3268 3269 # null character in the environment variable value 3270 newenv = os.environ.copy() 3271 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage" 3272 try: 3273 exitcode = spawn(os.P_WAIT, program, args, newenv) 3274 except ValueError: 3275 pass 3276 else: 3277 self.assertEqual(exitcode, 127) 3278 3279 # equal character in the environment variable name 3280 newenv = os.environ.copy() 3281 newenv["FRUIT=ORANGE"] = "lemon" 3282 try: 3283 exitcode = spawn(os.P_WAIT, program, args, newenv) 3284 except ValueError: 3285 pass 3286 else: 3287 self.assertEqual(exitcode, 127) 3288 3289 # equal character in the environment variable value 3290 filename = os_helper.TESTFN 3291 self.addCleanup(os_helper.unlink, filename) 3292 with open(filename, "w", encoding="utf-8") as fp: 3293 fp.write('import sys, os\n' 3294 'if os.getenv("FRUIT") != "orange=lemon":\n' 3295 ' raise AssertionError') 3296 3297 args = self.quote_args([program, filename]) 3298 newenv = os.environ.copy() 3299 newenv["FRUIT"] = "orange=lemon" 3300 exitcode = spawn(os.P_WAIT, program, args, newenv) 3301 self.assertEqual(exitcode, 0) 3302 3303 @requires_os_func('spawnve') 3304 def test_spawnve_invalid_env(self): 3305 self._test_invalid_env(os.spawnve) 3306 3307 @requires_os_func('spawnvpe') 3308 def test_spawnvpe_invalid_env(self): 3309 self._test_invalid_env(os.spawnvpe) 3310 3311 3312# The introduction of this TestCase caused at least two different errors on 3313# *nix buildbots. Temporarily skip this to let the buildbots move along. 3314@unittest.skip("Skip due to platform/environment differences on *NIX buildbots") 3315@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin") 3316class LoginTests(unittest.TestCase): 3317 def test_getlogin(self): 3318 user_name = os.getlogin() 3319 self.assertNotEqual(len(user_name), 0) 3320 3321 3322@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'), 3323 "needs os.getpriority and os.setpriority") 3324class ProgramPriorityTests(unittest.TestCase): 3325 """Tests for os.getpriority() and os.setpriority().""" 3326 3327 def test_set_get_priority(self): 3328 3329 base = os.getpriority(os.PRIO_PROCESS, os.getpid()) 3330 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1) 3331 try: 3332 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid()) 3333 if base >= 19 and new_prio <= 19: 3334 raise unittest.SkipTest("unable to reliably test setpriority " 3335 "at current nice level of %s" % base) 3336 else: 3337 self.assertEqual(new_prio, base + 1) 3338 finally: 3339 try: 3340 os.setpriority(os.PRIO_PROCESS, os.getpid(), base) 3341 except OSError as err: 3342 if err.errno != errno.EACCES: 3343 raise 3344 3345 3346@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()") 3347class TestSendfile(unittest.IsolatedAsyncioTestCase): 3348 3349 DATA = b"12345abcde" * 16 * 1024 # 160 KiB 3350 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \ 3351 not sys.platform.startswith("solaris") and \ 3352 not sys.platform.startswith("sunos") 3353 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS, 3354 'requires headers and trailers support') 3355 requires_32b = unittest.skipUnless(sys.maxsize < 2**32, 3356 'test is only meaningful on 32-bit builds') 3357 3358 @classmethod 3359 def setUpClass(cls): 3360 create_file(os_helper.TESTFN, cls.DATA) 3361 3362 @classmethod 3363 def tearDownClass(cls): 3364 os_helper.unlink(os_helper.TESTFN) 3365 3366 @staticmethod 3367 async def chunks(reader): 3368 while not reader.at_eof(): 3369 yield await reader.read() 3370 3371 async def handle_new_client(self, reader, writer): 3372 self.server_buffer = b''.join([x async for x in self.chunks(reader)]) 3373 writer.close() 3374 self.server.close() # The test server processes a single client only 3375 3376 async def asyncSetUp(self): 3377 self.server_buffer = b'' 3378 self.server = await asyncio.start_server(self.handle_new_client, 3379 socket_helper.HOSTv4) 3380 server_name = self.server.sockets[0].getsockname() 3381 self.client = socket.socket() 3382 self.client.setblocking(False) 3383 await asyncio.get_running_loop().sock_connect(self.client, server_name) 3384 self.sockno = self.client.fileno() 3385 self.file = open(os_helper.TESTFN, 'rb') 3386 self.fileno = self.file.fileno() 3387 3388 async def asyncTearDown(self): 3389 self.file.close() 3390 self.client.close() 3391 await self.server.wait_closed() 3392 3393 # Use the test subject instead of asyncio.loop.sendfile 3394 @staticmethod 3395 async def async_sendfile(*args, **kwargs): 3396 return await asyncio.to_thread(os.sendfile, *args, **kwargs) 3397 3398 @staticmethod 3399 async def sendfile_wrapper(*args, **kwargs): 3400 """A higher level wrapper representing how an application is 3401 supposed to use sendfile(). 3402 """ 3403 while True: 3404 try: 3405 return await TestSendfile.async_sendfile(*args, **kwargs) 3406 except OSError as err: 3407 if err.errno == errno.ECONNRESET: 3408 # disconnected 3409 raise 3410 elif err.errno in (errno.EAGAIN, errno.EBUSY): 3411 # we have to retry send data 3412 continue 3413 else: 3414 raise 3415 3416 async def test_send_whole_file(self): 3417 # normal send 3418 total_sent = 0 3419 offset = 0 3420 nbytes = 4096 3421 while total_sent < len(self.DATA): 3422 sent = await self.sendfile_wrapper(self.sockno, self.fileno, 3423 offset, nbytes) 3424 if sent == 0: 3425 break 3426 offset += sent 3427 total_sent += sent 3428 self.assertTrue(sent <= nbytes) 3429 self.assertEqual(offset, total_sent) 3430 3431 self.assertEqual(total_sent, len(self.DATA)) 3432 self.client.shutdown(socket.SHUT_RDWR) 3433 self.client.close() 3434 await self.server.wait_closed() 3435 self.assertEqual(len(self.server_buffer), len(self.DATA)) 3436 self.assertEqual(self.server_buffer, self.DATA) 3437 3438 async def test_send_at_certain_offset(self): 3439 # start sending a file at a certain offset 3440 total_sent = 0 3441 offset = len(self.DATA) // 2 3442 must_send = len(self.DATA) - offset 3443 nbytes = 4096 3444 while total_sent < must_send: 3445 sent = await self.sendfile_wrapper(self.sockno, self.fileno, 3446 offset, nbytes) 3447 if sent == 0: 3448 break 3449 offset += sent 3450 total_sent += sent 3451 self.assertTrue(sent <= nbytes) 3452 3453 self.client.shutdown(socket.SHUT_RDWR) 3454 self.client.close() 3455 await self.server.wait_closed() 3456 expected = self.DATA[len(self.DATA) // 2:] 3457 self.assertEqual(total_sent, len(expected)) 3458 self.assertEqual(len(self.server_buffer), len(expected)) 3459 self.assertEqual(self.server_buffer, expected) 3460 3461 async def test_offset_overflow(self): 3462 # specify an offset > file size 3463 offset = len(self.DATA) + 4096 3464 try: 3465 sent = await self.async_sendfile(self.sockno, self.fileno, 3466 offset, 4096) 3467 except OSError as e: 3468 # Solaris can raise EINVAL if offset >= file length, ignore. 3469 if e.errno != errno.EINVAL: 3470 raise 3471 else: 3472 self.assertEqual(sent, 0) 3473 self.client.shutdown(socket.SHUT_RDWR) 3474 self.client.close() 3475 await self.server.wait_closed() 3476 self.assertEqual(self.server_buffer, b'') 3477 3478 async def test_invalid_offset(self): 3479 with self.assertRaises(OSError) as cm: 3480 await self.async_sendfile(self.sockno, self.fileno, -1, 4096) 3481 self.assertEqual(cm.exception.errno, errno.EINVAL) 3482 3483 async def test_keywords(self): 3484 # Keyword arguments should be supported 3485 await self.async_sendfile(out_fd=self.sockno, in_fd=self.fileno, 3486 offset=0, count=4096) 3487 if self.SUPPORT_HEADERS_TRAILERS: 3488 await self.async_sendfile(out_fd=self.sockno, in_fd=self.fileno, 3489 offset=0, count=4096, 3490 headers=(), trailers=(), flags=0) 3491 3492 # --- headers / trailers tests 3493 3494 @requires_headers_trailers 3495 async def test_headers(self): 3496 total_sent = 0 3497 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1] 3498 sent = await self.async_sendfile(self.sockno, self.fileno, 0, 4096, 3499 headers=[b"x" * 512, b"y" * 256]) 3500 self.assertLessEqual(sent, 512 + 256 + 4096) 3501 total_sent += sent 3502 offset = 4096 3503 while total_sent < len(expected_data): 3504 nbytes = min(len(expected_data) - total_sent, 4096) 3505 sent = await self.sendfile_wrapper(self.sockno, self.fileno, 3506 offset, nbytes) 3507 if sent == 0: 3508 break 3509 self.assertLessEqual(sent, nbytes) 3510 total_sent += sent 3511 offset += sent 3512 3513 self.assertEqual(total_sent, len(expected_data)) 3514 self.client.close() 3515 await self.server.wait_closed() 3516 self.assertEqual(hash(self.server_buffer), hash(expected_data)) 3517 3518 @requires_headers_trailers 3519 async def test_trailers(self): 3520 TESTFN2 = os_helper.TESTFN + "2" 3521 file_data = b"abcdef" 3522 3523 self.addCleanup(os_helper.unlink, TESTFN2) 3524 create_file(TESTFN2, file_data) 3525 3526 with open(TESTFN2, 'rb') as f: 3527 await self.async_sendfile(self.sockno, f.fileno(), 0, 5, 3528 trailers=[b"123456", b"789"]) 3529 self.client.close() 3530 await self.server.wait_closed() 3531 self.assertEqual(self.server_buffer, b"abcde123456789") 3532 3533 @requires_headers_trailers 3534 @requires_32b 3535 async def test_headers_overflow_32bits(self): 3536 self.server.handler_instance.accumulate = False 3537 with self.assertRaises(OSError) as cm: 3538 await self.async_sendfile(self.sockno, self.fileno, 0, 0, 3539 headers=[b"x" * 2**16] * 2**15) 3540 self.assertEqual(cm.exception.errno, errno.EINVAL) 3541 3542 @requires_headers_trailers 3543 @requires_32b 3544 async def test_trailers_overflow_32bits(self): 3545 self.server.handler_instance.accumulate = False 3546 with self.assertRaises(OSError) as cm: 3547 await self.async_sendfile(self.sockno, self.fileno, 0, 0, 3548 trailers=[b"x" * 2**16] * 2**15) 3549 self.assertEqual(cm.exception.errno, errno.EINVAL) 3550 3551 @requires_headers_trailers 3552 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'), 3553 'test needs os.SF_NODISKIO') 3554 async def test_flags(self): 3555 try: 3556 await self.async_sendfile(self.sockno, self.fileno, 0, 4096, 3557 flags=os.SF_NODISKIO) 3558 except OSError as err: 3559 if err.errno not in (errno.EBUSY, errno.EAGAIN): 3560 raise 3561 3562 3563def supports_extended_attributes(): 3564 if not hasattr(os, "setxattr"): 3565 return False 3566 3567 try: 3568 with open(os_helper.TESTFN, "xb", 0) as fp: 3569 try: 3570 os.setxattr(fp.fileno(), b"user.test", b"") 3571 except OSError: 3572 return False 3573 finally: 3574 os_helper.unlink(os_helper.TESTFN) 3575 3576 return True 3577 3578 3579@unittest.skipUnless(supports_extended_attributes(), 3580 "no non-broken extended attribute support") 3581# Kernels < 2.6.39 don't respect setxattr flags. 3582@support.requires_linux_version(2, 6, 39) 3583class ExtendedAttributeTests(unittest.TestCase): 3584 3585 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs): 3586 fn = os_helper.TESTFN 3587 self.addCleanup(os_helper.unlink, fn) 3588 create_file(fn) 3589 3590 with self.assertRaises(OSError) as cm: 3591 getxattr(fn, s("user.test"), **kwargs) 3592 self.assertEqual(cm.exception.errno, errno.ENODATA) 3593 3594 init_xattr = listxattr(fn) 3595 self.assertIsInstance(init_xattr, list) 3596 3597 setxattr(fn, s("user.test"), b"", **kwargs) 3598 xattr = set(init_xattr) 3599 xattr.add("user.test") 3600 self.assertEqual(set(listxattr(fn)), xattr) 3601 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"") 3602 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs) 3603 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello") 3604 3605 with self.assertRaises(OSError) as cm: 3606 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs) 3607 self.assertEqual(cm.exception.errno, errno.EEXIST) 3608 3609 with self.assertRaises(OSError) as cm: 3610 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs) 3611 self.assertEqual(cm.exception.errno, errno.ENODATA) 3612 3613 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs) 3614 xattr.add("user.test2") 3615 self.assertEqual(set(listxattr(fn)), xattr) 3616 removexattr(fn, s("user.test"), **kwargs) 3617 3618 with self.assertRaises(OSError) as cm: 3619 getxattr(fn, s("user.test"), **kwargs) 3620 self.assertEqual(cm.exception.errno, errno.ENODATA) 3621 3622 xattr.remove("user.test") 3623 self.assertEqual(set(listxattr(fn)), xattr) 3624 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo") 3625 setxattr(fn, s("user.test"), b"a"*1024, **kwargs) 3626 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024) 3627 removexattr(fn, s("user.test"), **kwargs) 3628 many = sorted("user.test{}".format(i) for i in range(100)) 3629 for thing in many: 3630 setxattr(fn, thing, b"x", **kwargs) 3631 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many)) 3632 3633 def _check_xattrs(self, *args, **kwargs): 3634 self._check_xattrs_str(str, *args, **kwargs) 3635 os_helper.unlink(os_helper.TESTFN) 3636 3637 self._check_xattrs_str(os.fsencode, *args, **kwargs) 3638 os_helper.unlink(os_helper.TESTFN) 3639 3640 def test_simple(self): 3641 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr, 3642 os.listxattr) 3643 3644 def test_lpath(self): 3645 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr, 3646 os.listxattr, follow_symlinks=False) 3647 3648 def test_fds(self): 3649 def getxattr(path, *args): 3650 with open(path, "rb") as fp: 3651 return os.getxattr(fp.fileno(), *args) 3652 def setxattr(path, *args): 3653 with open(path, "wb", 0) as fp: 3654 os.setxattr(fp.fileno(), *args) 3655 def removexattr(path, *args): 3656 with open(path, "wb", 0) as fp: 3657 os.removexattr(fp.fileno(), *args) 3658 def listxattr(path, *args): 3659 with open(path, "rb") as fp: 3660 return os.listxattr(fp.fileno(), *args) 3661 self._check_xattrs(getxattr, setxattr, removexattr, listxattr) 3662 3663 3664@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size") 3665class TermsizeTests(unittest.TestCase): 3666 def test_does_not_crash(self): 3667 """Check if get_terminal_size() returns a meaningful value. 3668 3669 There's no easy portable way to actually check the size of the 3670 terminal, so let's check if it returns something sensible instead. 3671 """ 3672 try: 3673 size = os.get_terminal_size() 3674 except OSError as e: 3675 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY): 3676 # Under win32 a generic OSError can be thrown if the 3677 # handle cannot be retrieved 3678 self.skipTest("failed to query terminal size") 3679 raise 3680 3681 self.assertGreaterEqual(size.columns, 0) 3682 self.assertGreaterEqual(size.lines, 0) 3683 3684 def test_stty_match(self): 3685 """Check if stty returns the same results 3686 3687 stty actually tests stdin, so get_terminal_size is invoked on 3688 stdin explicitly. If stty succeeded, then get_terminal_size() 3689 should work too. 3690 """ 3691 try: 3692 size = ( 3693 subprocess.check_output( 3694 ["stty", "size"], stderr=subprocess.DEVNULL, text=True 3695 ).split() 3696 ) 3697 except (FileNotFoundError, subprocess.CalledProcessError, 3698 PermissionError): 3699 self.skipTest("stty invocation failed") 3700 expected = (int(size[1]), int(size[0])) # reversed order 3701 3702 try: 3703 actual = os.get_terminal_size(sys.__stdin__.fileno()) 3704 except OSError as e: 3705 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY): 3706 # Under win32 a generic OSError can be thrown if the 3707 # handle cannot be retrieved 3708 self.skipTest("failed to query terminal size") 3709 raise 3710 self.assertEqual(expected, actual) 3711 3712 3713@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create') 3714@support.requires_linux_version(3, 17) 3715class MemfdCreateTests(unittest.TestCase): 3716 def test_memfd_create(self): 3717 fd = os.memfd_create("Hi", os.MFD_CLOEXEC) 3718 self.assertNotEqual(fd, -1) 3719 self.addCleanup(os.close, fd) 3720 self.assertFalse(os.get_inheritable(fd)) 3721 with open(fd, "wb", closefd=False) as f: 3722 f.write(b'memfd_create') 3723 self.assertEqual(f.tell(), 12) 3724 3725 fd2 = os.memfd_create("Hi") 3726 self.addCleanup(os.close, fd2) 3727 self.assertFalse(os.get_inheritable(fd2)) 3728 3729 3730@unittest.skipUnless(hasattr(os, 'eventfd'), 'requires os.eventfd') 3731@support.requires_linux_version(2, 6, 30) 3732class EventfdTests(unittest.TestCase): 3733 def test_eventfd_initval(self): 3734 def pack(value): 3735 """Pack as native uint64_t 3736 """ 3737 return struct.pack("@Q", value) 3738 size = 8 # read/write 8 bytes 3739 initval = 42 3740 fd = os.eventfd(initval) 3741 self.assertNotEqual(fd, -1) 3742 self.addCleanup(os.close, fd) 3743 self.assertFalse(os.get_inheritable(fd)) 3744 3745 # test with raw read/write 3746 res = os.read(fd, size) 3747 self.assertEqual(res, pack(initval)) 3748 3749 os.write(fd, pack(23)) 3750 res = os.read(fd, size) 3751 self.assertEqual(res, pack(23)) 3752 3753 os.write(fd, pack(40)) 3754 os.write(fd, pack(2)) 3755 res = os.read(fd, size) 3756 self.assertEqual(res, pack(42)) 3757 3758 # test with eventfd_read/eventfd_write 3759 os.eventfd_write(fd, 20) 3760 os.eventfd_write(fd, 3) 3761 res = os.eventfd_read(fd) 3762 self.assertEqual(res, 23) 3763 3764 def test_eventfd_semaphore(self): 3765 initval = 2 3766 flags = os.EFD_CLOEXEC | os.EFD_SEMAPHORE | os.EFD_NONBLOCK 3767 fd = os.eventfd(initval, flags) 3768 self.assertNotEqual(fd, -1) 3769 self.addCleanup(os.close, fd) 3770 3771 # semaphore starts has initval 2, two reads return '1' 3772 res = os.eventfd_read(fd) 3773 self.assertEqual(res, 1) 3774 res = os.eventfd_read(fd) 3775 self.assertEqual(res, 1) 3776 # third read would block 3777 with self.assertRaises(BlockingIOError): 3778 os.eventfd_read(fd) 3779 with self.assertRaises(BlockingIOError): 3780 os.read(fd, 8) 3781 3782 # increase semaphore counter, read one 3783 os.eventfd_write(fd, 1) 3784 res = os.eventfd_read(fd) 3785 self.assertEqual(res, 1) 3786 # next read would block, too 3787 with self.assertRaises(BlockingIOError): 3788 os.eventfd_read(fd) 3789 3790 def test_eventfd_select(self): 3791 flags = os.EFD_CLOEXEC | os.EFD_NONBLOCK 3792 fd = os.eventfd(0, flags) 3793 self.assertNotEqual(fd, -1) 3794 self.addCleanup(os.close, fd) 3795 3796 # counter is zero, only writeable 3797 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0) 3798 self.assertEqual((rfd, wfd, xfd), ([], [fd], [])) 3799 3800 # counter is non-zero, read and writeable 3801 os.eventfd_write(fd, 23) 3802 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0) 3803 self.assertEqual((rfd, wfd, xfd), ([fd], [fd], [])) 3804 self.assertEqual(os.eventfd_read(fd), 23) 3805 3806 # counter at max, only readable 3807 os.eventfd_write(fd, (2**64) - 2) 3808 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0) 3809 self.assertEqual((rfd, wfd, xfd), ([fd], [], [])) 3810 os.eventfd_read(fd) 3811 3812 3813class OSErrorTests(unittest.TestCase): 3814 def setUp(self): 3815 class Str(str): 3816 pass 3817 3818 self.bytes_filenames = [] 3819 self.unicode_filenames = [] 3820 if os_helper.TESTFN_UNENCODABLE is not None: 3821 decoded = os_helper.TESTFN_UNENCODABLE 3822 else: 3823 decoded = os_helper.TESTFN 3824 self.unicode_filenames.append(decoded) 3825 self.unicode_filenames.append(Str(decoded)) 3826 if os_helper.TESTFN_UNDECODABLE is not None: 3827 encoded = os_helper.TESTFN_UNDECODABLE 3828 else: 3829 encoded = os.fsencode(os_helper.TESTFN) 3830 self.bytes_filenames.append(encoded) 3831 self.bytes_filenames.append(bytearray(encoded)) 3832 self.bytes_filenames.append(memoryview(encoded)) 3833 3834 self.filenames = self.bytes_filenames + self.unicode_filenames 3835 3836 def test_oserror_filename(self): 3837 funcs = [ 3838 (self.filenames, os.chdir,), 3839 (self.filenames, os.lstat,), 3840 (self.filenames, os.open, os.O_RDONLY), 3841 (self.filenames, os.rmdir,), 3842 (self.filenames, os.stat,), 3843 (self.filenames, os.unlink,), 3844 ] 3845 if sys.platform == "win32": 3846 funcs.extend(( 3847 (self.bytes_filenames, os.rename, b"dst"), 3848 (self.bytes_filenames, os.replace, b"dst"), 3849 (self.unicode_filenames, os.rename, "dst"), 3850 (self.unicode_filenames, os.replace, "dst"), 3851 (self.unicode_filenames, os.listdir, ), 3852 )) 3853 else: 3854 funcs.extend(( 3855 (self.filenames, os.listdir,), 3856 (self.filenames, os.rename, "dst"), 3857 (self.filenames, os.replace, "dst"), 3858 )) 3859 if os_helper.can_chmod(): 3860 funcs.append((self.filenames, os.chmod, 0o777)) 3861 if hasattr(os, "chown"): 3862 funcs.append((self.filenames, os.chown, 0, 0)) 3863 if hasattr(os, "lchown"): 3864 funcs.append((self.filenames, os.lchown, 0, 0)) 3865 if hasattr(os, "truncate"): 3866 funcs.append((self.filenames, os.truncate, 0)) 3867 if hasattr(os, "chflags"): 3868 funcs.append((self.filenames, os.chflags, 0)) 3869 if hasattr(os, "lchflags"): 3870 funcs.append((self.filenames, os.lchflags, 0)) 3871 if hasattr(os, "chroot"): 3872 funcs.append((self.filenames, os.chroot,)) 3873 if hasattr(os, "link"): 3874 if sys.platform == "win32": 3875 funcs.append((self.bytes_filenames, os.link, b"dst")) 3876 funcs.append((self.unicode_filenames, os.link, "dst")) 3877 else: 3878 funcs.append((self.filenames, os.link, "dst")) 3879 if hasattr(os, "listxattr"): 3880 funcs.extend(( 3881 (self.filenames, os.listxattr,), 3882 (self.filenames, os.getxattr, "user.test"), 3883 (self.filenames, os.setxattr, "user.test", b'user'), 3884 (self.filenames, os.removexattr, "user.test"), 3885 )) 3886 if hasattr(os, "lchmod"): 3887 funcs.append((self.filenames, os.lchmod, 0o777)) 3888 if hasattr(os, "readlink"): 3889 funcs.append((self.filenames, os.readlink,)) 3890 3891 3892 for filenames, func, *func_args in funcs: 3893 for name in filenames: 3894 try: 3895 if isinstance(name, (str, bytes)): 3896 func(name, *func_args) 3897 else: 3898 with self.assertWarnsRegex(DeprecationWarning, 'should be'): 3899 func(name, *func_args) 3900 except OSError as err: 3901 self.assertIs(err.filename, name, str(func)) 3902 except UnicodeDecodeError: 3903 pass 3904 else: 3905 self.fail("No exception thrown by {}".format(func)) 3906 3907class CPUCountTests(unittest.TestCase): 3908 def test_cpu_count(self): 3909 cpus = os.cpu_count() 3910 if cpus is not None: 3911 self.assertIsInstance(cpus, int) 3912 self.assertGreater(cpus, 0) 3913 else: 3914 self.skipTest("Could not determine the number of CPUs") 3915 3916 3917# FD inheritance check is only useful for systems with process support. 3918@support.requires_subprocess() 3919class FDInheritanceTests(unittest.TestCase): 3920 def test_get_set_inheritable(self): 3921 fd = os.open(__file__, os.O_RDONLY) 3922 self.addCleanup(os.close, fd) 3923 self.assertEqual(os.get_inheritable(fd), False) 3924 3925 os.set_inheritable(fd, True) 3926 self.assertEqual(os.get_inheritable(fd), True) 3927 3928 @unittest.skipIf(fcntl is None, "need fcntl") 3929 def test_get_inheritable_cloexec(self): 3930 fd = os.open(__file__, os.O_RDONLY) 3931 self.addCleanup(os.close, fd) 3932 self.assertEqual(os.get_inheritable(fd), False) 3933 3934 # clear FD_CLOEXEC flag 3935 flags = fcntl.fcntl(fd, fcntl.F_GETFD) 3936 flags &= ~fcntl.FD_CLOEXEC 3937 fcntl.fcntl(fd, fcntl.F_SETFD, flags) 3938 3939 self.assertEqual(os.get_inheritable(fd), True) 3940 3941 @unittest.skipIf(fcntl is None, "need fcntl") 3942 def test_set_inheritable_cloexec(self): 3943 fd = os.open(__file__, os.O_RDONLY) 3944 self.addCleanup(os.close, fd) 3945 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 3946 fcntl.FD_CLOEXEC) 3947 3948 os.set_inheritable(fd, True) 3949 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 3950 0) 3951 3952 @unittest.skipUnless(hasattr(os, 'O_PATH'), "need os.O_PATH") 3953 def test_get_set_inheritable_o_path(self): 3954 fd = os.open(__file__, os.O_PATH) 3955 self.addCleanup(os.close, fd) 3956 self.assertEqual(os.get_inheritable(fd), False) 3957 3958 os.set_inheritable(fd, True) 3959 self.assertEqual(os.get_inheritable(fd), True) 3960 3961 os.set_inheritable(fd, False) 3962 self.assertEqual(os.get_inheritable(fd), False) 3963 3964 def test_get_set_inheritable_badf(self): 3965 fd = os_helper.make_bad_fd() 3966 3967 with self.assertRaises(OSError) as ctx: 3968 os.get_inheritable(fd) 3969 self.assertEqual(ctx.exception.errno, errno.EBADF) 3970 3971 with self.assertRaises(OSError) as ctx: 3972 os.set_inheritable(fd, True) 3973 self.assertEqual(ctx.exception.errno, errno.EBADF) 3974 3975 with self.assertRaises(OSError) as ctx: 3976 os.set_inheritable(fd, False) 3977 self.assertEqual(ctx.exception.errno, errno.EBADF) 3978 3979 def test_open(self): 3980 fd = os.open(__file__, os.O_RDONLY) 3981 self.addCleanup(os.close, fd) 3982 self.assertEqual(os.get_inheritable(fd), False) 3983 3984 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()") 3985 def test_pipe(self): 3986 rfd, wfd = os.pipe() 3987 self.addCleanup(os.close, rfd) 3988 self.addCleanup(os.close, wfd) 3989 self.assertEqual(os.get_inheritable(rfd), False) 3990 self.assertEqual(os.get_inheritable(wfd), False) 3991 3992 def test_dup(self): 3993 fd1 = os.open(__file__, os.O_RDONLY) 3994 self.addCleanup(os.close, fd1) 3995 3996 fd2 = os.dup(fd1) 3997 self.addCleanup(os.close, fd2) 3998 self.assertEqual(os.get_inheritable(fd2), False) 3999 4000 def test_dup_standard_stream(self): 4001 fd = os.dup(1) 4002 self.addCleanup(os.close, fd) 4003 self.assertGreater(fd, 0) 4004 4005 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test') 4006 def test_dup_nul(self): 4007 # os.dup() was creating inheritable fds for character files. 4008 fd1 = os.open('NUL', os.O_RDONLY) 4009 self.addCleanup(os.close, fd1) 4010 fd2 = os.dup(fd1) 4011 self.addCleanup(os.close, fd2) 4012 self.assertFalse(os.get_inheritable(fd2)) 4013 4014 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()") 4015 def test_dup2(self): 4016 fd = os.open(__file__, os.O_RDONLY) 4017 self.addCleanup(os.close, fd) 4018 4019 # inheritable by default 4020 fd2 = os.open(__file__, os.O_RDONLY) 4021 self.addCleanup(os.close, fd2) 4022 self.assertEqual(os.dup2(fd, fd2), fd2) 4023 self.assertTrue(os.get_inheritable(fd2)) 4024 4025 # force non-inheritable 4026 fd3 = os.open(__file__, os.O_RDONLY) 4027 self.addCleanup(os.close, fd3) 4028 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3) 4029 self.assertFalse(os.get_inheritable(fd3)) 4030 4031 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()") 4032 def test_openpty(self): 4033 master_fd, slave_fd = os.openpty() 4034 self.addCleanup(os.close, master_fd) 4035 self.addCleanup(os.close, slave_fd) 4036 self.assertEqual(os.get_inheritable(master_fd), False) 4037 self.assertEqual(os.get_inheritable(slave_fd), False) 4038 4039 4040class PathTConverterTests(unittest.TestCase): 4041 # tuples of (function name, allows fd arguments, additional arguments to 4042 # function, cleanup function) 4043 functions = [ 4044 ('stat', True, (), None), 4045 ('lstat', False, (), None), 4046 ('access', False, (os.F_OK,), None), 4047 ('chflags', False, (0,), None), 4048 ('lchflags', False, (0,), None), 4049 ('open', False, (os.O_RDONLY,), getattr(os, 'close', None)), 4050 ] 4051 4052 def test_path_t_converter(self): 4053 str_filename = os_helper.TESTFN 4054 if os.name == 'nt': 4055 bytes_fspath = bytes_filename = None 4056 else: 4057 bytes_filename = os.fsencode(os_helper.TESTFN) 4058 bytes_fspath = FakePath(bytes_filename) 4059 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT) 4060 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 4061 self.addCleanup(os.close, fd) 4062 4063 int_fspath = FakePath(fd) 4064 str_fspath = FakePath(str_filename) 4065 4066 for name, allow_fd, extra_args, cleanup_fn in self.functions: 4067 with self.subTest(name=name): 4068 try: 4069 fn = getattr(os, name) 4070 except AttributeError: 4071 continue 4072 4073 for path in (str_filename, bytes_filename, str_fspath, 4074 bytes_fspath): 4075 if path is None: 4076 continue 4077 with self.subTest(name=name, path=path): 4078 result = fn(path, *extra_args) 4079 if cleanup_fn is not None: 4080 cleanup_fn(result) 4081 4082 with self.assertRaisesRegex( 4083 TypeError, 'to return str or bytes'): 4084 fn(int_fspath, *extra_args) 4085 4086 if allow_fd: 4087 result = fn(fd, *extra_args) # should not fail 4088 if cleanup_fn is not None: 4089 cleanup_fn(result) 4090 else: 4091 with self.assertRaisesRegex( 4092 TypeError, 4093 'os.PathLike'): 4094 fn(fd, *extra_args) 4095 4096 def test_path_t_converter_and_custom_class(self): 4097 msg = r'__fspath__\(\) to return str or bytes, not %s' 4098 with self.assertRaisesRegex(TypeError, msg % r'int'): 4099 os.stat(FakePath(2)) 4100 with self.assertRaisesRegex(TypeError, msg % r'float'): 4101 os.stat(FakePath(2.34)) 4102 with self.assertRaisesRegex(TypeError, msg % r'object'): 4103 os.stat(FakePath(object())) 4104 4105 4106@unittest.skipUnless(hasattr(os, 'get_blocking'), 4107 'needs os.get_blocking() and os.set_blocking()') 4108@unittest.skipIf(support.is_emscripten, "Cannot unset blocking flag") 4109class BlockingTests(unittest.TestCase): 4110 def test_blocking(self): 4111 fd = os.open(__file__, os.O_RDONLY) 4112 self.addCleanup(os.close, fd) 4113 self.assertEqual(os.get_blocking(fd), True) 4114 4115 os.set_blocking(fd, False) 4116 self.assertEqual(os.get_blocking(fd), False) 4117 4118 os.set_blocking(fd, True) 4119 self.assertEqual(os.get_blocking(fd), True) 4120 4121 4122 4123class ExportsTests(unittest.TestCase): 4124 def test_os_all(self): 4125 self.assertIn('open', os.__all__) 4126 self.assertIn('walk', os.__all__) 4127 4128 4129class TestDirEntry(unittest.TestCase): 4130 def setUp(self): 4131 self.path = os.path.realpath(os_helper.TESTFN) 4132 self.addCleanup(os_helper.rmtree, self.path) 4133 os.mkdir(self.path) 4134 4135 def test_uninstantiable(self): 4136 self.assertRaises(TypeError, os.DirEntry) 4137 4138 def test_unpickable(self): 4139 filename = create_file(os.path.join(self.path, "file.txt"), b'python') 4140 entry = [entry for entry in os.scandir(self.path)].pop() 4141 self.assertIsInstance(entry, os.DirEntry) 4142 self.assertEqual(entry.name, "file.txt") 4143 import pickle 4144 self.assertRaises(TypeError, pickle.dumps, entry, filename) 4145 4146 4147class TestScandir(unittest.TestCase): 4148 check_no_resource_warning = warnings_helper.check_no_resource_warning 4149 4150 def setUp(self): 4151 self.path = os.path.realpath(os_helper.TESTFN) 4152 self.bytes_path = os.fsencode(self.path) 4153 self.addCleanup(os_helper.rmtree, self.path) 4154 os.mkdir(self.path) 4155 4156 def create_file(self, name="file.txt"): 4157 path = self.bytes_path if isinstance(name, bytes) else self.path 4158 filename = os.path.join(path, name) 4159 create_file(filename, b'python') 4160 return filename 4161 4162 def get_entries(self, names): 4163 entries = dict((entry.name, entry) 4164 for entry in os.scandir(self.path)) 4165 self.assertEqual(sorted(entries.keys()), names) 4166 return entries 4167 4168 def assert_stat_equal(self, stat1, stat2, skip_fields): 4169 if skip_fields: 4170 for attr in dir(stat1): 4171 if not attr.startswith("st_"): 4172 continue 4173 if attr in ("st_dev", "st_ino", "st_nlink"): 4174 continue 4175 self.assertEqual(getattr(stat1, attr), 4176 getattr(stat2, attr), 4177 (stat1, stat2, attr)) 4178 else: 4179 self.assertEqual(stat1, stat2) 4180 4181 def test_uninstantiable(self): 4182 scandir_iter = os.scandir(self.path) 4183 self.assertRaises(TypeError, type(scandir_iter)) 4184 scandir_iter.close() 4185 4186 def test_unpickable(self): 4187 filename = self.create_file("file.txt") 4188 scandir_iter = os.scandir(self.path) 4189 import pickle 4190 self.assertRaises(TypeError, pickle.dumps, scandir_iter, filename) 4191 scandir_iter.close() 4192 4193 def check_entry(self, entry, name, is_dir, is_file, is_symlink): 4194 self.assertIsInstance(entry, os.DirEntry) 4195 self.assertEqual(entry.name, name) 4196 self.assertEqual(entry.path, os.path.join(self.path, name)) 4197 self.assertEqual(entry.inode(), 4198 os.stat(entry.path, follow_symlinks=False).st_ino) 4199 4200 entry_stat = os.stat(entry.path) 4201 self.assertEqual(entry.is_dir(), 4202 stat.S_ISDIR(entry_stat.st_mode)) 4203 self.assertEqual(entry.is_file(), 4204 stat.S_ISREG(entry_stat.st_mode)) 4205 self.assertEqual(entry.is_symlink(), 4206 os.path.islink(entry.path)) 4207 4208 entry_lstat = os.stat(entry.path, follow_symlinks=False) 4209 self.assertEqual(entry.is_dir(follow_symlinks=False), 4210 stat.S_ISDIR(entry_lstat.st_mode)) 4211 self.assertEqual(entry.is_file(follow_symlinks=False), 4212 stat.S_ISREG(entry_lstat.st_mode)) 4213 4214 self.assert_stat_equal(entry.stat(), 4215 entry_stat, 4216 os.name == 'nt' and not is_symlink) 4217 self.assert_stat_equal(entry.stat(follow_symlinks=False), 4218 entry_lstat, 4219 os.name == 'nt') 4220 4221 def test_attributes(self): 4222 link = hasattr(os, 'link') 4223 symlink = os_helper.can_symlink() 4224 4225 dirname = os.path.join(self.path, "dir") 4226 os.mkdir(dirname) 4227 filename = self.create_file("file.txt") 4228 if link: 4229 try: 4230 os.link(filename, os.path.join(self.path, "link_file.txt")) 4231 except PermissionError as e: 4232 self.skipTest('os.link(): %s' % e) 4233 if symlink: 4234 os.symlink(dirname, os.path.join(self.path, "symlink_dir"), 4235 target_is_directory=True) 4236 os.symlink(filename, os.path.join(self.path, "symlink_file.txt")) 4237 4238 names = ['dir', 'file.txt'] 4239 if link: 4240 names.append('link_file.txt') 4241 if symlink: 4242 names.extend(('symlink_dir', 'symlink_file.txt')) 4243 entries = self.get_entries(names) 4244 4245 entry = entries['dir'] 4246 self.check_entry(entry, 'dir', True, False, False) 4247 4248 entry = entries['file.txt'] 4249 self.check_entry(entry, 'file.txt', False, True, False) 4250 4251 if link: 4252 entry = entries['link_file.txt'] 4253 self.check_entry(entry, 'link_file.txt', False, True, False) 4254 4255 if symlink: 4256 entry = entries['symlink_dir'] 4257 self.check_entry(entry, 'symlink_dir', True, False, True) 4258 4259 entry = entries['symlink_file.txt'] 4260 self.check_entry(entry, 'symlink_file.txt', False, True, True) 4261 4262 def get_entry(self, name): 4263 path = self.bytes_path if isinstance(name, bytes) else self.path 4264 entries = list(os.scandir(path)) 4265 self.assertEqual(len(entries), 1) 4266 4267 entry = entries[0] 4268 self.assertEqual(entry.name, name) 4269 return entry 4270 4271 def create_file_entry(self, name='file.txt'): 4272 filename = self.create_file(name=name) 4273 return self.get_entry(os.path.basename(filename)) 4274 4275 def test_current_directory(self): 4276 filename = self.create_file() 4277 old_dir = os.getcwd() 4278 try: 4279 os.chdir(self.path) 4280 4281 # call scandir() without parameter: it must list the content 4282 # of the current directory 4283 entries = dict((entry.name, entry) for entry in os.scandir()) 4284 self.assertEqual(sorted(entries.keys()), 4285 [os.path.basename(filename)]) 4286 finally: 4287 os.chdir(old_dir) 4288 4289 def test_repr(self): 4290 entry = self.create_file_entry() 4291 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>") 4292 4293 def test_fspath_protocol(self): 4294 entry = self.create_file_entry() 4295 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt')) 4296 4297 def test_fspath_protocol_bytes(self): 4298 bytes_filename = os.fsencode('bytesfile.txt') 4299 bytes_entry = self.create_file_entry(name=bytes_filename) 4300 fspath = os.fspath(bytes_entry) 4301 self.assertIsInstance(fspath, bytes) 4302 self.assertEqual(fspath, 4303 os.path.join(os.fsencode(self.path),bytes_filename)) 4304 4305 def test_removed_dir(self): 4306 path = os.path.join(self.path, 'dir') 4307 4308 os.mkdir(path) 4309 entry = self.get_entry('dir') 4310 os.rmdir(path) 4311 4312 # On POSIX, is_dir() result depends if scandir() filled d_type or not 4313 if os.name == 'nt': 4314 self.assertTrue(entry.is_dir()) 4315 self.assertFalse(entry.is_file()) 4316 self.assertFalse(entry.is_symlink()) 4317 if os.name == 'nt': 4318 self.assertRaises(FileNotFoundError, entry.inode) 4319 # don't fail 4320 entry.stat() 4321 entry.stat(follow_symlinks=False) 4322 else: 4323 self.assertGreater(entry.inode(), 0) 4324 self.assertRaises(FileNotFoundError, entry.stat) 4325 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False) 4326 4327 def test_removed_file(self): 4328 entry = self.create_file_entry() 4329 os.unlink(entry.path) 4330 4331 self.assertFalse(entry.is_dir()) 4332 # On POSIX, is_dir() result depends if scandir() filled d_type or not 4333 if os.name == 'nt': 4334 self.assertTrue(entry.is_file()) 4335 self.assertFalse(entry.is_symlink()) 4336 if os.name == 'nt': 4337 self.assertRaises(FileNotFoundError, entry.inode) 4338 # don't fail 4339 entry.stat() 4340 entry.stat(follow_symlinks=False) 4341 else: 4342 self.assertGreater(entry.inode(), 0) 4343 self.assertRaises(FileNotFoundError, entry.stat) 4344 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False) 4345 4346 def test_broken_symlink(self): 4347 if not os_helper.can_symlink(): 4348 return self.skipTest('cannot create symbolic link') 4349 4350 filename = self.create_file("file.txt") 4351 os.symlink(filename, 4352 os.path.join(self.path, "symlink.txt")) 4353 entries = self.get_entries(['file.txt', 'symlink.txt']) 4354 entry = entries['symlink.txt'] 4355 os.unlink(filename) 4356 4357 self.assertGreater(entry.inode(), 0) 4358 self.assertFalse(entry.is_dir()) 4359 self.assertFalse(entry.is_file()) # broken symlink returns False 4360 self.assertFalse(entry.is_dir(follow_symlinks=False)) 4361 self.assertFalse(entry.is_file(follow_symlinks=False)) 4362 self.assertTrue(entry.is_symlink()) 4363 self.assertRaises(FileNotFoundError, entry.stat) 4364 # don't fail 4365 entry.stat(follow_symlinks=False) 4366 4367 def test_bytes(self): 4368 self.create_file("file.txt") 4369 4370 path_bytes = os.fsencode(self.path) 4371 entries = list(os.scandir(path_bytes)) 4372 self.assertEqual(len(entries), 1, entries) 4373 entry = entries[0] 4374 4375 self.assertEqual(entry.name, b'file.txt') 4376 self.assertEqual(entry.path, 4377 os.fsencode(os.path.join(self.path, 'file.txt'))) 4378 4379 def test_bytes_like(self): 4380 self.create_file("file.txt") 4381 4382 for cls in bytearray, memoryview: 4383 path_bytes = cls(os.fsencode(self.path)) 4384 with self.assertWarns(DeprecationWarning): 4385 entries = list(os.scandir(path_bytes)) 4386 self.assertEqual(len(entries), 1, entries) 4387 entry = entries[0] 4388 4389 self.assertEqual(entry.name, b'file.txt') 4390 self.assertEqual(entry.path, 4391 os.fsencode(os.path.join(self.path, 'file.txt'))) 4392 self.assertIs(type(entry.name), bytes) 4393 self.assertIs(type(entry.path), bytes) 4394 4395 @unittest.skipUnless(os.listdir in os.supports_fd, 4396 'fd support for listdir required for this test.') 4397 def test_fd(self): 4398 self.assertIn(os.scandir, os.supports_fd) 4399 self.create_file('file.txt') 4400 expected_names = ['file.txt'] 4401 if os_helper.can_symlink(): 4402 os.symlink('file.txt', os.path.join(self.path, 'link')) 4403 expected_names.append('link') 4404 4405 with os_helper.open_dir_fd(self.path) as fd: 4406 with os.scandir(fd) as it: 4407 entries = list(it) 4408 names = [entry.name for entry in entries] 4409 self.assertEqual(sorted(names), expected_names) 4410 self.assertEqual(names, os.listdir(fd)) 4411 for entry in entries: 4412 self.assertEqual(entry.path, entry.name) 4413 self.assertEqual(os.fspath(entry), entry.name) 4414 self.assertEqual(entry.is_symlink(), entry.name == 'link') 4415 if os.stat in os.supports_dir_fd: 4416 st = os.stat(entry.name, dir_fd=fd) 4417 self.assertEqual(entry.stat(), st) 4418 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False) 4419 self.assertEqual(entry.stat(follow_symlinks=False), st) 4420 4421 @unittest.skipIf(support.is_wasi, "WASI maps '' to cwd") 4422 def test_empty_path(self): 4423 self.assertRaises(FileNotFoundError, os.scandir, '') 4424 4425 def test_consume_iterator_twice(self): 4426 self.create_file("file.txt") 4427 iterator = os.scandir(self.path) 4428 4429 entries = list(iterator) 4430 self.assertEqual(len(entries), 1, entries) 4431 4432 # check than consuming the iterator twice doesn't raise exception 4433 entries2 = list(iterator) 4434 self.assertEqual(len(entries2), 0, entries2) 4435 4436 def test_bad_path_type(self): 4437 for obj in [1.234, {}, []]: 4438 self.assertRaises(TypeError, os.scandir, obj) 4439 4440 def test_close(self): 4441 self.create_file("file.txt") 4442 self.create_file("file2.txt") 4443 iterator = os.scandir(self.path) 4444 next(iterator) 4445 iterator.close() 4446 # multiple closes 4447 iterator.close() 4448 with self.check_no_resource_warning(): 4449 del iterator 4450 4451 def test_context_manager(self): 4452 self.create_file("file.txt") 4453 self.create_file("file2.txt") 4454 with os.scandir(self.path) as iterator: 4455 next(iterator) 4456 with self.check_no_resource_warning(): 4457 del iterator 4458 4459 def test_context_manager_close(self): 4460 self.create_file("file.txt") 4461 self.create_file("file2.txt") 4462 with os.scandir(self.path) as iterator: 4463 next(iterator) 4464 iterator.close() 4465 4466 def test_context_manager_exception(self): 4467 self.create_file("file.txt") 4468 self.create_file("file2.txt") 4469 with self.assertRaises(ZeroDivisionError): 4470 with os.scandir(self.path) as iterator: 4471 next(iterator) 4472 1/0 4473 with self.check_no_resource_warning(): 4474 del iterator 4475 4476 def test_resource_warning(self): 4477 self.create_file("file.txt") 4478 self.create_file("file2.txt") 4479 iterator = os.scandir(self.path) 4480 next(iterator) 4481 with self.assertWarns(ResourceWarning): 4482 del iterator 4483 support.gc_collect() 4484 # exhausted iterator 4485 iterator = os.scandir(self.path) 4486 list(iterator) 4487 with self.check_no_resource_warning(): 4488 del iterator 4489 4490 4491class TestPEP519(unittest.TestCase): 4492 4493 # Abstracted so it can be overridden to test pure Python implementation 4494 # if a C version is provided. 4495 fspath = staticmethod(os.fspath) 4496 4497 def test_return_bytes(self): 4498 for b in b'hello', b'goodbye', b'some/path/and/file': 4499 self.assertEqual(b, self.fspath(b)) 4500 4501 def test_return_string(self): 4502 for s in 'hello', 'goodbye', 'some/path/and/file': 4503 self.assertEqual(s, self.fspath(s)) 4504 4505 def test_fsencode_fsdecode(self): 4506 for p in "path/like/object", b"path/like/object": 4507 pathlike = FakePath(p) 4508 4509 self.assertEqual(p, self.fspath(pathlike)) 4510 self.assertEqual(b"path/like/object", os.fsencode(pathlike)) 4511 self.assertEqual("path/like/object", os.fsdecode(pathlike)) 4512 4513 def test_pathlike(self): 4514 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil'))) 4515 self.assertTrue(issubclass(FakePath, os.PathLike)) 4516 self.assertTrue(isinstance(FakePath('x'), os.PathLike)) 4517 4518 def test_garbage_in_exception_out(self): 4519 vapor = type('blah', (), {}) 4520 for o in int, type, os, vapor(): 4521 self.assertRaises(TypeError, self.fspath, o) 4522 4523 def test_argument_required(self): 4524 self.assertRaises(TypeError, self.fspath) 4525 4526 def test_bad_pathlike(self): 4527 # __fspath__ returns a value other than str or bytes. 4528 self.assertRaises(TypeError, self.fspath, FakePath(42)) 4529 # __fspath__ attribute that is not callable. 4530 c = type('foo', (), {}) 4531 c.__fspath__ = 1 4532 self.assertRaises(TypeError, self.fspath, c()) 4533 # __fspath__ raises an exception. 4534 self.assertRaises(ZeroDivisionError, self.fspath, 4535 FakePath(ZeroDivisionError())) 4536 4537 def test_pathlike_subclasshook(self): 4538 # bpo-38878: subclasshook causes subclass checks 4539 # true on abstract implementation. 4540 class A(os.PathLike): 4541 pass 4542 self.assertFalse(issubclass(FakePath, A)) 4543 self.assertTrue(issubclass(FakePath, os.PathLike)) 4544 4545 def test_pathlike_class_getitem(self): 4546 self.assertIsInstance(os.PathLike[bytes], types.GenericAlias) 4547 4548 4549class TimesTests(unittest.TestCase): 4550 def test_times(self): 4551 times = os.times() 4552 self.assertIsInstance(times, os.times_result) 4553 4554 for field in ('user', 'system', 'children_user', 'children_system', 4555 'elapsed'): 4556 value = getattr(times, field) 4557 self.assertIsInstance(value, float) 4558 4559 if os.name == 'nt': 4560 self.assertEqual(times.children_user, 0) 4561 self.assertEqual(times.children_system, 0) 4562 self.assertEqual(times.elapsed, 0) 4563 4564 4565@support.requires_fork() 4566class ForkTests(unittest.TestCase): 4567 def test_fork(self): 4568 # bpo-42540: ensure os.fork() with non-default memory allocator does 4569 # not crash on exit. 4570 code = """if 1: 4571 import os 4572 from test import support 4573 pid = os.fork() 4574 if pid != 0: 4575 support.wait_process(pid, exitcode=0) 4576 """ 4577 assert_python_ok("-c", code) 4578 assert_python_ok("-c", code, PYTHONMALLOC="malloc_debug") 4579 4580 4581# Only test if the C version is provided, otherwise TestPEP519 already tested 4582# the pure Python implementation. 4583if hasattr(os, "_fspath"): 4584 class TestPEP519PurePython(TestPEP519): 4585 4586 """Explicitly test the pure Python implementation of os.fspath().""" 4587 4588 fspath = staticmethod(os._fspath) 4589 4590 4591if __name__ == "__main__": 4592 unittest.main() 4593