17db96d56Sopenharmony_cifrom test.support import findfile
27db96d56Sopenharmony_cifrom test.support.os_helper import TESTFN, unlink
37db96d56Sopenharmony_ciimport array
47db96d56Sopenharmony_ciimport io
57db96d56Sopenharmony_ciimport pickle
67db96d56Sopenharmony_ci
77db96d56Sopenharmony_ci
87db96d56Sopenharmony_ciclass UnseekableIO(io.FileIO):
97db96d56Sopenharmony_ci    def tell(self):
107db96d56Sopenharmony_ci        raise io.UnsupportedOperation
117db96d56Sopenharmony_ci
127db96d56Sopenharmony_ci    def seek(self, *args, **kwargs):
137db96d56Sopenharmony_ci        raise io.UnsupportedOperation
147db96d56Sopenharmony_ci
157db96d56Sopenharmony_ci
167db96d56Sopenharmony_ciclass AudioTests:
177db96d56Sopenharmony_ci    close_fd = False
187db96d56Sopenharmony_ci
197db96d56Sopenharmony_ci    def setUp(self):
207db96d56Sopenharmony_ci        self.f = self.fout = None
217db96d56Sopenharmony_ci
227db96d56Sopenharmony_ci    def tearDown(self):
237db96d56Sopenharmony_ci        if self.f is not None:
247db96d56Sopenharmony_ci            self.f.close()
257db96d56Sopenharmony_ci        if self.fout is not None:
267db96d56Sopenharmony_ci            self.fout.close()
277db96d56Sopenharmony_ci        unlink(TESTFN)
287db96d56Sopenharmony_ci
297db96d56Sopenharmony_ci    def check_params(self, f, nchannels, sampwidth, framerate, nframes,
307db96d56Sopenharmony_ci                     comptype, compname):
317db96d56Sopenharmony_ci        self.assertEqual(f.getnchannels(), nchannels)
327db96d56Sopenharmony_ci        self.assertEqual(f.getsampwidth(), sampwidth)
337db96d56Sopenharmony_ci        self.assertEqual(f.getframerate(), framerate)
347db96d56Sopenharmony_ci        self.assertEqual(f.getnframes(), nframes)
357db96d56Sopenharmony_ci        self.assertEqual(f.getcomptype(), comptype)
367db96d56Sopenharmony_ci        self.assertEqual(f.getcompname(), compname)
377db96d56Sopenharmony_ci
387db96d56Sopenharmony_ci        params = f.getparams()
397db96d56Sopenharmony_ci        self.assertEqual(params,
407db96d56Sopenharmony_ci                (nchannels, sampwidth, framerate, nframes, comptype, compname))
417db96d56Sopenharmony_ci        self.assertEqual(params.nchannels, nchannels)
427db96d56Sopenharmony_ci        self.assertEqual(params.sampwidth, sampwidth)
437db96d56Sopenharmony_ci        self.assertEqual(params.framerate, framerate)
447db96d56Sopenharmony_ci        self.assertEqual(params.nframes, nframes)
457db96d56Sopenharmony_ci        self.assertEqual(params.comptype, comptype)
467db96d56Sopenharmony_ci        self.assertEqual(params.compname, compname)
477db96d56Sopenharmony_ci
487db96d56Sopenharmony_ci        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
497db96d56Sopenharmony_ci            dump = pickle.dumps(params, proto)
507db96d56Sopenharmony_ci            self.assertEqual(pickle.loads(dump), params)
517db96d56Sopenharmony_ci
527db96d56Sopenharmony_ci
537db96d56Sopenharmony_ciclass AudioWriteTests(AudioTests):
547db96d56Sopenharmony_ci
557db96d56Sopenharmony_ci    def create_file(self, testfile):
567db96d56Sopenharmony_ci        f = self.fout = self.module.open(testfile, 'wb')
577db96d56Sopenharmony_ci        f.setnchannels(self.nchannels)
587db96d56Sopenharmony_ci        f.setsampwidth(self.sampwidth)
597db96d56Sopenharmony_ci        f.setframerate(self.framerate)
607db96d56Sopenharmony_ci        f.setcomptype(self.comptype, self.compname)
617db96d56Sopenharmony_ci        return f
627db96d56Sopenharmony_ci
637db96d56Sopenharmony_ci    def check_file(self, testfile, nframes, frames):
647db96d56Sopenharmony_ci        with self.module.open(testfile, 'rb') as f:
657db96d56Sopenharmony_ci            self.assertEqual(f.getnchannels(), self.nchannels)
667db96d56Sopenharmony_ci            self.assertEqual(f.getsampwidth(), self.sampwidth)
677db96d56Sopenharmony_ci            self.assertEqual(f.getframerate(), self.framerate)
687db96d56Sopenharmony_ci            self.assertEqual(f.getnframes(), nframes)
697db96d56Sopenharmony_ci            self.assertEqual(f.readframes(nframes), frames)
707db96d56Sopenharmony_ci
717db96d56Sopenharmony_ci    def test_write_params(self):
727db96d56Sopenharmony_ci        f = self.create_file(TESTFN)
737db96d56Sopenharmony_ci        f.setnframes(self.nframes)
747db96d56Sopenharmony_ci        f.writeframes(self.frames)
757db96d56Sopenharmony_ci        self.check_params(f, self.nchannels, self.sampwidth, self.framerate,
767db96d56Sopenharmony_ci                          self.nframes, self.comptype, self.compname)
777db96d56Sopenharmony_ci        f.close()
787db96d56Sopenharmony_ci
797db96d56Sopenharmony_ci    def test_write_context_manager_calls_close(self):
807db96d56Sopenharmony_ci        # Close checks for a minimum header and will raise an error
817db96d56Sopenharmony_ci        # if it is not set, so this proves that close is called.
827db96d56Sopenharmony_ci        with self.assertRaises(self.module.Error):
837db96d56Sopenharmony_ci            with self.module.open(TESTFN, 'wb'):
847db96d56Sopenharmony_ci                pass
857db96d56Sopenharmony_ci        with self.assertRaises(self.module.Error):
867db96d56Sopenharmony_ci            with open(TESTFN, 'wb') as testfile:
877db96d56Sopenharmony_ci                with self.module.open(testfile):
887db96d56Sopenharmony_ci                    pass
897db96d56Sopenharmony_ci
907db96d56Sopenharmony_ci    def test_context_manager_with_open_file(self):
917db96d56Sopenharmony_ci        with open(TESTFN, 'wb') as testfile:
927db96d56Sopenharmony_ci            with self.module.open(testfile) as f:
937db96d56Sopenharmony_ci                f.setnchannels(self.nchannels)
947db96d56Sopenharmony_ci                f.setsampwidth(self.sampwidth)
957db96d56Sopenharmony_ci                f.setframerate(self.framerate)
967db96d56Sopenharmony_ci                f.setcomptype(self.comptype, self.compname)
977db96d56Sopenharmony_ci            self.assertEqual(testfile.closed, self.close_fd)
987db96d56Sopenharmony_ci        with open(TESTFN, 'rb') as testfile:
997db96d56Sopenharmony_ci            with self.module.open(testfile) as f:
1007db96d56Sopenharmony_ci                self.assertFalse(f.getfp().closed)
1017db96d56Sopenharmony_ci                params = f.getparams()
1027db96d56Sopenharmony_ci                self.assertEqual(params.nchannels, self.nchannels)
1037db96d56Sopenharmony_ci                self.assertEqual(params.sampwidth, self.sampwidth)
1047db96d56Sopenharmony_ci                self.assertEqual(params.framerate, self.framerate)
1057db96d56Sopenharmony_ci            if not self.close_fd:
1067db96d56Sopenharmony_ci                self.assertIsNone(f.getfp())
1077db96d56Sopenharmony_ci            self.assertEqual(testfile.closed, self.close_fd)
1087db96d56Sopenharmony_ci
1097db96d56Sopenharmony_ci    def test_context_manager_with_filename(self):
1107db96d56Sopenharmony_ci        # If the file doesn't get closed, this test won't fail, but it will
1117db96d56Sopenharmony_ci        # produce a resource leak warning.
1127db96d56Sopenharmony_ci        with self.module.open(TESTFN, 'wb') as f:
1137db96d56Sopenharmony_ci            f.setnchannels(self.nchannels)
1147db96d56Sopenharmony_ci            f.setsampwidth(self.sampwidth)
1157db96d56Sopenharmony_ci            f.setframerate(self.framerate)
1167db96d56Sopenharmony_ci            f.setcomptype(self.comptype, self.compname)
1177db96d56Sopenharmony_ci        with self.module.open(TESTFN) as f:
1187db96d56Sopenharmony_ci            self.assertFalse(f.getfp().closed)
1197db96d56Sopenharmony_ci            params = f.getparams()
1207db96d56Sopenharmony_ci            self.assertEqual(params.nchannels, self.nchannels)
1217db96d56Sopenharmony_ci            self.assertEqual(params.sampwidth, self.sampwidth)
1227db96d56Sopenharmony_ci            self.assertEqual(params.framerate, self.framerate)
1237db96d56Sopenharmony_ci        if not self.close_fd:
1247db96d56Sopenharmony_ci            self.assertIsNone(f.getfp())
1257db96d56Sopenharmony_ci
1267db96d56Sopenharmony_ci    def test_write(self):
1277db96d56Sopenharmony_ci        f = self.create_file(TESTFN)
1287db96d56Sopenharmony_ci        f.setnframes(self.nframes)
1297db96d56Sopenharmony_ci        f.writeframes(self.frames)
1307db96d56Sopenharmony_ci        f.close()
1317db96d56Sopenharmony_ci
1327db96d56Sopenharmony_ci        self.check_file(TESTFN, self.nframes, self.frames)
1337db96d56Sopenharmony_ci
1347db96d56Sopenharmony_ci    def test_write_bytearray(self):
1357db96d56Sopenharmony_ci        f = self.create_file(TESTFN)
1367db96d56Sopenharmony_ci        f.setnframes(self.nframes)
1377db96d56Sopenharmony_ci        f.writeframes(bytearray(self.frames))
1387db96d56Sopenharmony_ci        f.close()
1397db96d56Sopenharmony_ci
1407db96d56Sopenharmony_ci        self.check_file(TESTFN, self.nframes, self.frames)
1417db96d56Sopenharmony_ci
1427db96d56Sopenharmony_ci    def test_write_array(self):
1437db96d56Sopenharmony_ci        f = self.create_file(TESTFN)
1447db96d56Sopenharmony_ci        f.setnframes(self.nframes)
1457db96d56Sopenharmony_ci        f.writeframes(array.array('h', self.frames))
1467db96d56Sopenharmony_ci        f.close()
1477db96d56Sopenharmony_ci
1487db96d56Sopenharmony_ci        self.check_file(TESTFN, self.nframes, self.frames)
1497db96d56Sopenharmony_ci
1507db96d56Sopenharmony_ci    def test_write_memoryview(self):
1517db96d56Sopenharmony_ci        f = self.create_file(TESTFN)
1527db96d56Sopenharmony_ci        f.setnframes(self.nframes)
1537db96d56Sopenharmony_ci        f.writeframes(memoryview(self.frames))
1547db96d56Sopenharmony_ci        f.close()
1557db96d56Sopenharmony_ci
1567db96d56Sopenharmony_ci        self.check_file(TESTFN, self.nframes, self.frames)
1577db96d56Sopenharmony_ci
1587db96d56Sopenharmony_ci    def test_incompleted_write(self):
1597db96d56Sopenharmony_ci        with open(TESTFN, 'wb') as testfile:
1607db96d56Sopenharmony_ci            testfile.write(b'ababagalamaga')
1617db96d56Sopenharmony_ci            f = self.create_file(testfile)
1627db96d56Sopenharmony_ci            f.setnframes(self.nframes + 1)
1637db96d56Sopenharmony_ci            f.writeframes(self.frames)
1647db96d56Sopenharmony_ci            f.close()
1657db96d56Sopenharmony_ci
1667db96d56Sopenharmony_ci        with open(TESTFN, 'rb') as testfile:
1677db96d56Sopenharmony_ci            self.assertEqual(testfile.read(13), b'ababagalamaga')
1687db96d56Sopenharmony_ci            self.check_file(testfile, self.nframes, self.frames)
1697db96d56Sopenharmony_ci
1707db96d56Sopenharmony_ci    def test_multiple_writes(self):
1717db96d56Sopenharmony_ci        with open(TESTFN, 'wb') as testfile:
1727db96d56Sopenharmony_ci            testfile.write(b'ababagalamaga')
1737db96d56Sopenharmony_ci            f = self.create_file(testfile)
1747db96d56Sopenharmony_ci            f.setnframes(self.nframes)
1757db96d56Sopenharmony_ci            framesize = self.nchannels * self.sampwidth
1767db96d56Sopenharmony_ci            f.writeframes(self.frames[:-framesize])
1777db96d56Sopenharmony_ci            f.writeframes(self.frames[-framesize:])
1787db96d56Sopenharmony_ci            f.close()
1797db96d56Sopenharmony_ci
1807db96d56Sopenharmony_ci        with open(TESTFN, 'rb') as testfile:
1817db96d56Sopenharmony_ci            self.assertEqual(testfile.read(13), b'ababagalamaga')
1827db96d56Sopenharmony_ci            self.check_file(testfile, self.nframes, self.frames)
1837db96d56Sopenharmony_ci
1847db96d56Sopenharmony_ci    def test_overflowed_write(self):
1857db96d56Sopenharmony_ci        with open(TESTFN, 'wb') as testfile:
1867db96d56Sopenharmony_ci            testfile.write(b'ababagalamaga')
1877db96d56Sopenharmony_ci            f = self.create_file(testfile)
1887db96d56Sopenharmony_ci            f.setnframes(self.nframes - 1)
1897db96d56Sopenharmony_ci            f.writeframes(self.frames)
1907db96d56Sopenharmony_ci            f.close()
1917db96d56Sopenharmony_ci
1927db96d56Sopenharmony_ci        with open(TESTFN, 'rb') as testfile:
1937db96d56Sopenharmony_ci            self.assertEqual(testfile.read(13), b'ababagalamaga')
1947db96d56Sopenharmony_ci            self.check_file(testfile, self.nframes, self.frames)
1957db96d56Sopenharmony_ci
1967db96d56Sopenharmony_ci    def test_unseekable_read(self):
1977db96d56Sopenharmony_ci        with self.create_file(TESTFN) as f:
1987db96d56Sopenharmony_ci            f.setnframes(self.nframes)
1997db96d56Sopenharmony_ci            f.writeframes(self.frames)
2007db96d56Sopenharmony_ci
2017db96d56Sopenharmony_ci        with UnseekableIO(TESTFN, 'rb') as testfile:
2027db96d56Sopenharmony_ci            self.check_file(testfile, self.nframes, self.frames)
2037db96d56Sopenharmony_ci
2047db96d56Sopenharmony_ci    def test_unseekable_write(self):
2057db96d56Sopenharmony_ci        with UnseekableIO(TESTFN, 'wb') as testfile:
2067db96d56Sopenharmony_ci            with self.create_file(testfile) as f:
2077db96d56Sopenharmony_ci                f.setnframes(self.nframes)
2087db96d56Sopenharmony_ci                f.writeframes(self.frames)
2097db96d56Sopenharmony_ci
2107db96d56Sopenharmony_ci        self.check_file(TESTFN, self.nframes, self.frames)
2117db96d56Sopenharmony_ci
2127db96d56Sopenharmony_ci    def test_unseekable_incompleted_write(self):
2137db96d56Sopenharmony_ci        with UnseekableIO(TESTFN, 'wb') as testfile:
2147db96d56Sopenharmony_ci            testfile.write(b'ababagalamaga')
2157db96d56Sopenharmony_ci            f = self.create_file(testfile)
2167db96d56Sopenharmony_ci            f.setnframes(self.nframes + 1)
2177db96d56Sopenharmony_ci            try:
2187db96d56Sopenharmony_ci                f.writeframes(self.frames)
2197db96d56Sopenharmony_ci            except OSError:
2207db96d56Sopenharmony_ci                pass
2217db96d56Sopenharmony_ci            try:
2227db96d56Sopenharmony_ci                f.close()
2237db96d56Sopenharmony_ci            except OSError:
2247db96d56Sopenharmony_ci                pass
2257db96d56Sopenharmony_ci
2267db96d56Sopenharmony_ci        with open(TESTFN, 'rb') as testfile:
2277db96d56Sopenharmony_ci            self.assertEqual(testfile.read(13), b'ababagalamaga')
2287db96d56Sopenharmony_ci            self.check_file(testfile, self.nframes + 1, self.frames)
2297db96d56Sopenharmony_ci
2307db96d56Sopenharmony_ci    def test_unseekable_overflowed_write(self):
2317db96d56Sopenharmony_ci        with UnseekableIO(TESTFN, 'wb') as testfile:
2327db96d56Sopenharmony_ci            testfile.write(b'ababagalamaga')
2337db96d56Sopenharmony_ci            f = self.create_file(testfile)
2347db96d56Sopenharmony_ci            f.setnframes(self.nframes - 1)
2357db96d56Sopenharmony_ci            try:
2367db96d56Sopenharmony_ci                f.writeframes(self.frames)
2377db96d56Sopenharmony_ci            except OSError:
2387db96d56Sopenharmony_ci                pass
2397db96d56Sopenharmony_ci            try:
2407db96d56Sopenharmony_ci                f.close()
2417db96d56Sopenharmony_ci            except OSError:
2427db96d56Sopenharmony_ci                pass
2437db96d56Sopenharmony_ci
2447db96d56Sopenharmony_ci        with open(TESTFN, 'rb') as testfile:
2457db96d56Sopenharmony_ci            self.assertEqual(testfile.read(13), b'ababagalamaga')
2467db96d56Sopenharmony_ci            framesize = self.nchannels * self.sampwidth
2477db96d56Sopenharmony_ci            self.check_file(testfile, self.nframes - 1, self.frames[:-framesize])
2487db96d56Sopenharmony_ci
2497db96d56Sopenharmony_ci
2507db96d56Sopenharmony_ciclass AudioTestsWithSourceFile(AudioTests):
2517db96d56Sopenharmony_ci
2527db96d56Sopenharmony_ci    @classmethod
2537db96d56Sopenharmony_ci    def setUpClass(cls):
2547db96d56Sopenharmony_ci        cls.sndfilepath = findfile(cls.sndfilename, subdir='audiodata')
2557db96d56Sopenharmony_ci
2567db96d56Sopenharmony_ci    def test_read_params(self):
2577db96d56Sopenharmony_ci        f = self.f = self.module.open(self.sndfilepath)
2587db96d56Sopenharmony_ci        #self.assertEqual(f.getfp().name, self.sndfilepath)
2597db96d56Sopenharmony_ci        self.check_params(f, self.nchannels, self.sampwidth, self.framerate,
2607db96d56Sopenharmony_ci                          self.sndfilenframes, self.comptype, self.compname)
2617db96d56Sopenharmony_ci
2627db96d56Sopenharmony_ci    def test_close(self):
2637db96d56Sopenharmony_ci        with open(self.sndfilepath, 'rb') as testfile:
2647db96d56Sopenharmony_ci            f = self.f = self.module.open(testfile)
2657db96d56Sopenharmony_ci            self.assertFalse(testfile.closed)
2667db96d56Sopenharmony_ci            f.close()
2677db96d56Sopenharmony_ci            self.assertEqual(testfile.closed, self.close_fd)
2687db96d56Sopenharmony_ci        with open(TESTFN, 'wb') as testfile:
2697db96d56Sopenharmony_ci            fout = self.fout = self.module.open(testfile, 'wb')
2707db96d56Sopenharmony_ci            self.assertFalse(testfile.closed)
2717db96d56Sopenharmony_ci            with self.assertRaises(self.module.Error):
2727db96d56Sopenharmony_ci                fout.close()
2737db96d56Sopenharmony_ci            self.assertEqual(testfile.closed, self.close_fd)
2747db96d56Sopenharmony_ci            fout.close() # do nothing
2757db96d56Sopenharmony_ci
2767db96d56Sopenharmony_ci    def test_read(self):
2777db96d56Sopenharmony_ci        framesize = self.nchannels * self.sampwidth
2787db96d56Sopenharmony_ci        chunk1 = self.frames[:2 * framesize]
2797db96d56Sopenharmony_ci        chunk2 = self.frames[2 * framesize: 4 * framesize]
2807db96d56Sopenharmony_ci        f = self.f = self.module.open(self.sndfilepath)
2817db96d56Sopenharmony_ci        self.assertEqual(f.readframes(0), b'')
2827db96d56Sopenharmony_ci        self.assertEqual(f.tell(), 0)
2837db96d56Sopenharmony_ci        self.assertEqual(f.readframes(2), chunk1)
2847db96d56Sopenharmony_ci        f.rewind()
2857db96d56Sopenharmony_ci        pos0 = f.tell()
2867db96d56Sopenharmony_ci        self.assertEqual(pos0, 0)
2877db96d56Sopenharmony_ci        self.assertEqual(f.readframes(2), chunk1)
2887db96d56Sopenharmony_ci        pos2 = f.tell()
2897db96d56Sopenharmony_ci        self.assertEqual(pos2, 2)
2907db96d56Sopenharmony_ci        self.assertEqual(f.readframes(2), chunk2)
2917db96d56Sopenharmony_ci        f.setpos(pos2)
2927db96d56Sopenharmony_ci        self.assertEqual(f.readframes(2), chunk2)
2937db96d56Sopenharmony_ci        f.setpos(pos0)
2947db96d56Sopenharmony_ci        self.assertEqual(f.readframes(2), chunk1)
2957db96d56Sopenharmony_ci        with self.assertRaises(self.module.Error):
2967db96d56Sopenharmony_ci            f.setpos(-1)
2977db96d56Sopenharmony_ci        with self.assertRaises(self.module.Error):
2987db96d56Sopenharmony_ci            f.setpos(f.getnframes() + 1)
2997db96d56Sopenharmony_ci
3007db96d56Sopenharmony_ci    def test_copy(self):
3017db96d56Sopenharmony_ci        f = self.f = self.module.open(self.sndfilepath)
3027db96d56Sopenharmony_ci        fout = self.fout = self.module.open(TESTFN, 'wb')
3037db96d56Sopenharmony_ci        fout.setparams(f.getparams())
3047db96d56Sopenharmony_ci        i = 0
3057db96d56Sopenharmony_ci        n = f.getnframes()
3067db96d56Sopenharmony_ci        while n > 0:
3077db96d56Sopenharmony_ci            i += 1
3087db96d56Sopenharmony_ci            fout.writeframes(f.readframes(i))
3097db96d56Sopenharmony_ci            n -= i
3107db96d56Sopenharmony_ci        fout.close()
3117db96d56Sopenharmony_ci        fout = self.fout = self.module.open(TESTFN, 'rb')
3127db96d56Sopenharmony_ci        f.rewind()
3137db96d56Sopenharmony_ci        self.assertEqual(f.getparams(), fout.getparams())
3147db96d56Sopenharmony_ci        self.assertEqual(f.readframes(f.getnframes()),
3157db96d56Sopenharmony_ci                         fout.readframes(fout.getnframes()))
3167db96d56Sopenharmony_ci
3177db96d56Sopenharmony_ci    def test_read_not_from_start(self):
3187db96d56Sopenharmony_ci        with open(TESTFN, 'wb') as testfile:
3197db96d56Sopenharmony_ci            testfile.write(b'ababagalamaga')
3207db96d56Sopenharmony_ci            with open(self.sndfilepath, 'rb') as f:
3217db96d56Sopenharmony_ci                testfile.write(f.read())
3227db96d56Sopenharmony_ci
3237db96d56Sopenharmony_ci        with open(TESTFN, 'rb') as testfile:
3247db96d56Sopenharmony_ci            self.assertEqual(testfile.read(13), b'ababagalamaga')
3257db96d56Sopenharmony_ci            with self.module.open(testfile, 'rb') as f:
3267db96d56Sopenharmony_ci                self.assertEqual(f.getnchannels(), self.nchannels)
3277db96d56Sopenharmony_ci                self.assertEqual(f.getsampwidth(), self.sampwidth)
3287db96d56Sopenharmony_ci                self.assertEqual(f.getframerate(), self.framerate)
3297db96d56Sopenharmony_ci                self.assertEqual(f.getnframes(), self.sndfilenframes)
3307db96d56Sopenharmony_ci                self.assertEqual(f.readframes(self.nframes), self.frames)
331