1import contextlib
2import sys
3import unittest
4from test import support
5from test.support import import_helper
6from test.support import os_helper
7import time
8
9resource = import_helper.import_module('resource')
10
11# This test is checking a few specific problem spots with the resource module.
12
13class ResourceTest(unittest.TestCase):
14
15    def test_args(self):
16        self.assertRaises(TypeError, resource.getrlimit)
17        self.assertRaises(TypeError, resource.getrlimit, 42, 42)
18        self.assertRaises(TypeError, resource.setrlimit)
19        self.assertRaises(TypeError, resource.setrlimit, 42, 42, 42)
20
21    @unittest.skipIf(sys.platform == "vxworks",
22                     "setting RLIMIT_FSIZE is not supported on VxWorks")
23    def test_fsize_ismax(self):
24        try:
25            (cur, max) = resource.getrlimit(resource.RLIMIT_FSIZE)
26        except AttributeError:
27            pass
28        else:
29            # RLIMIT_FSIZE should be RLIM_INFINITY, which will be a really big
30            # number on a platform with large file support.  On these platforms,
31            # we need to test that the get/setrlimit functions properly convert
32            # the number to a C long long and that the conversion doesn't raise
33            # an error.
34            self.assertEqual(resource.RLIM_INFINITY, max)
35            resource.setrlimit(resource.RLIMIT_FSIZE, (cur, max))
36
37    def test_fsize_enforced(self):
38        try:
39            (cur, max) = resource.getrlimit(resource.RLIMIT_FSIZE)
40        except AttributeError:
41            pass
42        else:
43            # Check to see what happens when the RLIMIT_FSIZE is small.  Some
44            # versions of Python were terminated by an uncaught SIGXFSZ, but
45            # pythonrun.c has been fixed to ignore that exception.  If so, the
46            # write() should return EFBIG when the limit is exceeded.
47
48            # At least one platform has an unlimited RLIMIT_FSIZE and attempts
49            # to change it raise ValueError instead.
50            try:
51                try:
52                    resource.setrlimit(resource.RLIMIT_FSIZE, (1024, max))
53                    limit_set = True
54                except ValueError:
55                    limit_set = False
56                f = open(os_helper.TESTFN, "wb")
57                try:
58                    f.write(b"X" * 1024)
59                    try:
60                        f.write(b"Y")
61                        f.flush()
62                        # On some systems (e.g., Ubuntu on hppa) the flush()
63                        # doesn't always cause the exception, but the close()
64                        # does eventually.  Try flushing several times in
65                        # an attempt to ensure the file is really synced and
66                        # the exception raised.
67                        for i in range(5):
68                            time.sleep(.1)
69                            f.flush()
70                    except OSError:
71                        if not limit_set:
72                            raise
73                    if limit_set:
74                        # Close will attempt to flush the byte we wrote
75                        # Restore limit first to avoid getting a spurious error
76                        resource.setrlimit(resource.RLIMIT_FSIZE, (cur, max))
77                finally:
78                    f.close()
79            finally:
80                if limit_set:
81                    resource.setrlimit(resource.RLIMIT_FSIZE, (cur, max))
82                os_helper.unlink(os_helper.TESTFN)
83
84    def test_fsize_toobig(self):
85        # Be sure that setrlimit is checking for really large values
86        too_big = 10**50
87        try:
88            (cur, max) = resource.getrlimit(resource.RLIMIT_FSIZE)
89        except AttributeError:
90            pass
91        else:
92            try:
93                resource.setrlimit(resource.RLIMIT_FSIZE, (too_big, max))
94            except (OverflowError, ValueError):
95                pass
96            try:
97                resource.setrlimit(resource.RLIMIT_FSIZE, (max, too_big))
98            except (OverflowError, ValueError):
99                pass
100
101    @unittest.skipUnless(hasattr(resource, "getrusage"), "needs getrusage")
102    def test_getrusage(self):
103        self.assertRaises(TypeError, resource.getrusage)
104        self.assertRaises(TypeError, resource.getrusage, 42, 42)
105        usageself = resource.getrusage(resource.RUSAGE_SELF)
106        usagechildren = resource.getrusage(resource.RUSAGE_CHILDREN)
107        # May not be available on all systems.
108        try:
109            usageboth = resource.getrusage(resource.RUSAGE_BOTH)
110        except (ValueError, AttributeError):
111            pass
112        try:
113            usage_thread = resource.getrusage(resource.RUSAGE_THREAD)
114        except (ValueError, AttributeError):
115            pass
116
117    # Issue 6083: Reference counting bug
118    @unittest.skipIf(sys.platform == "vxworks",
119                     "setting RLIMIT_CPU is not supported on VxWorks")
120    def test_setrusage_refcount(self):
121        try:
122            limits = resource.getrlimit(resource.RLIMIT_CPU)
123        except AttributeError:
124            pass
125        else:
126            class BadSequence:
127                def __len__(self):
128                    return 2
129                def __getitem__(self, key):
130                    if key in (0, 1):
131                        return len(tuple(range(1000000)))
132                    raise IndexError
133
134            resource.setrlimit(resource.RLIMIT_CPU, BadSequence())
135
136    def test_pagesize(self):
137        pagesize = resource.getpagesize()
138        self.assertIsInstance(pagesize, int)
139        self.assertGreaterEqual(pagesize, 0)
140
141    @unittest.skipUnless(sys.platform == 'linux', 'test requires Linux')
142    def test_linux_constants(self):
143        for attr in ['MSGQUEUE', 'NICE', 'RTPRIO', 'RTTIME', 'SIGPENDING']:
144            with contextlib.suppress(AttributeError):
145                self.assertIsInstance(getattr(resource, 'RLIMIT_' + attr), int)
146
147    def test_freebsd_contants(self):
148        for attr in ['SWAP', 'SBSIZE', 'NPTS']:
149            with contextlib.suppress(AttributeError):
150                self.assertIsInstance(getattr(resource, 'RLIMIT_' + attr), int)
151
152    @unittest.skipUnless(hasattr(resource, 'prlimit'), 'no prlimit')
153    @support.requires_linux_version(2, 6, 36)
154    def test_prlimit(self):
155        self.assertRaises(TypeError, resource.prlimit)
156        self.assertRaises(ProcessLookupError, resource.prlimit,
157                          -1, resource.RLIMIT_AS)
158        limit = resource.getrlimit(resource.RLIMIT_AS)
159        self.assertEqual(resource.prlimit(0, resource.RLIMIT_AS), limit)
160        self.assertEqual(resource.prlimit(0, resource.RLIMIT_AS, limit),
161                         limit)
162
163    # Issue 20191: Reference counting bug
164    @unittest.skipUnless(hasattr(resource, 'prlimit'), 'no prlimit')
165    @support.requires_linux_version(2, 6, 36)
166    def test_prlimit_refcount(self):
167        class BadSeq:
168            def __len__(self):
169                return 2
170            def __getitem__(self, key):
171                return limits[key] - 1  # new reference
172
173        limits = resource.getrlimit(resource.RLIMIT_AS)
174        self.assertEqual(resource.prlimit(0, resource.RLIMIT_AS, BadSeq()),
175                         limits)
176
177
178if __name__ == "__main__":
179    unittest.main()
180