17db96d56Sopenharmony_ci# 27db96d56Sopenharmony_ci# multibytecodec_support.py 37db96d56Sopenharmony_ci# Common Unittest Routines for CJK codecs 47db96d56Sopenharmony_ci# 57db96d56Sopenharmony_ci 67db96d56Sopenharmony_ciimport codecs 77db96d56Sopenharmony_ciimport os 87db96d56Sopenharmony_ciimport re 97db96d56Sopenharmony_ciimport sys 107db96d56Sopenharmony_ciimport unittest 117db96d56Sopenharmony_cifrom http.client import HTTPException 127db96d56Sopenharmony_cifrom test import support 137db96d56Sopenharmony_cifrom io import BytesIO 147db96d56Sopenharmony_ci 157db96d56Sopenharmony_ciclass TestBase: 167db96d56Sopenharmony_ci encoding = '' # codec name 177db96d56Sopenharmony_ci codec = None # codec tuple (with 4 elements) 187db96d56Sopenharmony_ci tstring = None # must set. 2 strings to test StreamReader 197db96d56Sopenharmony_ci 207db96d56Sopenharmony_ci codectests = None # must set. codec test tuple 217db96d56Sopenharmony_ci roundtriptest = 1 # set if roundtrip is possible with unicode 227db96d56Sopenharmony_ci has_iso10646 = 0 # set if this encoding contains whole iso10646 map 237db96d56Sopenharmony_ci xmlcharnametest = None # string to test xmlcharrefreplace 247db96d56Sopenharmony_ci unmappedunicode = '\udeee' # a unicode code point that is not mapped. 257db96d56Sopenharmony_ci 267db96d56Sopenharmony_ci def setUp(self): 277db96d56Sopenharmony_ci if self.codec is None: 287db96d56Sopenharmony_ci self.codec = codecs.lookup(self.encoding) 297db96d56Sopenharmony_ci self.encode = self.codec.encode 307db96d56Sopenharmony_ci self.decode = self.codec.decode 317db96d56Sopenharmony_ci self.reader = self.codec.streamreader 327db96d56Sopenharmony_ci self.writer = self.codec.streamwriter 337db96d56Sopenharmony_ci self.incrementalencoder = self.codec.incrementalencoder 347db96d56Sopenharmony_ci self.incrementaldecoder = self.codec.incrementaldecoder 357db96d56Sopenharmony_ci 367db96d56Sopenharmony_ci def test_chunkcoding(self): 377db96d56Sopenharmony_ci tstring_lines = [] 387db96d56Sopenharmony_ci for b in self.tstring: 397db96d56Sopenharmony_ci lines = b.split(b"\n") 407db96d56Sopenharmony_ci last = lines.pop() 417db96d56Sopenharmony_ci assert last == b"" 427db96d56Sopenharmony_ci lines = [line + b"\n" for line in lines] 437db96d56Sopenharmony_ci tstring_lines.append(lines) 447db96d56Sopenharmony_ci for native, utf8 in zip(*tstring_lines): 457db96d56Sopenharmony_ci u = self.decode(native)[0] 467db96d56Sopenharmony_ci self.assertEqual(u, utf8.decode('utf-8')) 477db96d56Sopenharmony_ci if self.roundtriptest: 487db96d56Sopenharmony_ci self.assertEqual(native, self.encode(u)[0]) 497db96d56Sopenharmony_ci 507db96d56Sopenharmony_ci def test_errorhandle(self): 517db96d56Sopenharmony_ci for source, scheme, expected in self.codectests: 527db96d56Sopenharmony_ci if isinstance(source, bytes): 537db96d56Sopenharmony_ci func = self.decode 547db96d56Sopenharmony_ci else: 557db96d56Sopenharmony_ci func = self.encode 567db96d56Sopenharmony_ci if expected: 577db96d56Sopenharmony_ci result = func(source, scheme)[0] 587db96d56Sopenharmony_ci if func is self.decode: 597db96d56Sopenharmony_ci self.assertTrue(type(result) is str, type(result)) 607db96d56Sopenharmony_ci self.assertEqual(result, expected, 617db96d56Sopenharmony_ci '%a.decode(%r, %r)=%a != %a' 627db96d56Sopenharmony_ci % (source, self.encoding, scheme, result, 637db96d56Sopenharmony_ci expected)) 647db96d56Sopenharmony_ci else: 657db96d56Sopenharmony_ci self.assertTrue(type(result) is bytes, type(result)) 667db96d56Sopenharmony_ci self.assertEqual(result, expected, 677db96d56Sopenharmony_ci '%a.encode(%r, %r)=%a != %a' 687db96d56Sopenharmony_ci % (source, self.encoding, scheme, result, 697db96d56Sopenharmony_ci expected)) 707db96d56Sopenharmony_ci else: 717db96d56Sopenharmony_ci self.assertRaises(UnicodeError, func, source, scheme) 727db96d56Sopenharmony_ci 737db96d56Sopenharmony_ci def test_xmlcharrefreplace(self): 747db96d56Sopenharmony_ci if self.has_iso10646: 757db96d56Sopenharmony_ci self.skipTest('encoding contains full ISO 10646 map') 767db96d56Sopenharmony_ci 777db96d56Sopenharmony_ci s = "\u0b13\u0b23\u0b60 nd eggs" 787db96d56Sopenharmony_ci self.assertEqual( 797db96d56Sopenharmony_ci self.encode(s, "xmlcharrefreplace")[0], 807db96d56Sopenharmony_ci b"ଓଣୠ nd eggs" 817db96d56Sopenharmony_ci ) 827db96d56Sopenharmony_ci 837db96d56Sopenharmony_ci def test_customreplace_encode(self): 847db96d56Sopenharmony_ci if self.has_iso10646: 857db96d56Sopenharmony_ci self.skipTest('encoding contains full ISO 10646 map') 867db96d56Sopenharmony_ci 877db96d56Sopenharmony_ci from html.entities import codepoint2name 887db96d56Sopenharmony_ci 897db96d56Sopenharmony_ci def xmlcharnamereplace(exc): 907db96d56Sopenharmony_ci if not isinstance(exc, UnicodeEncodeError): 917db96d56Sopenharmony_ci raise TypeError("don't know how to handle %r" % exc) 927db96d56Sopenharmony_ci l = [] 937db96d56Sopenharmony_ci for c in exc.object[exc.start:exc.end]: 947db96d56Sopenharmony_ci if ord(c) in codepoint2name: 957db96d56Sopenharmony_ci l.append("&%s;" % codepoint2name[ord(c)]) 967db96d56Sopenharmony_ci else: 977db96d56Sopenharmony_ci l.append("&#%d;" % ord(c)) 987db96d56Sopenharmony_ci return ("".join(l), exc.end) 997db96d56Sopenharmony_ci 1007db96d56Sopenharmony_ci codecs.register_error("test.xmlcharnamereplace", xmlcharnamereplace) 1017db96d56Sopenharmony_ci 1027db96d56Sopenharmony_ci if self.xmlcharnametest: 1037db96d56Sopenharmony_ci sin, sout = self.xmlcharnametest 1047db96d56Sopenharmony_ci else: 1057db96d56Sopenharmony_ci sin = "\xab\u211c\xbb = \u2329\u1234\u232a" 1067db96d56Sopenharmony_ci sout = b"«ℜ» = ⟨ሴ⟩" 1077db96d56Sopenharmony_ci self.assertEqual(self.encode(sin, 1087db96d56Sopenharmony_ci "test.xmlcharnamereplace")[0], sout) 1097db96d56Sopenharmony_ci 1107db96d56Sopenharmony_ci def test_callback_returns_bytes(self): 1117db96d56Sopenharmony_ci def myreplace(exc): 1127db96d56Sopenharmony_ci return (b"1234", exc.end) 1137db96d56Sopenharmony_ci codecs.register_error("test.cjktest", myreplace) 1147db96d56Sopenharmony_ci enc = self.encode("abc" + self.unmappedunicode + "def", "test.cjktest")[0] 1157db96d56Sopenharmony_ci self.assertEqual(enc, b"abc1234def") 1167db96d56Sopenharmony_ci 1177db96d56Sopenharmony_ci def test_callback_wrong_objects(self): 1187db96d56Sopenharmony_ci def myreplace(exc): 1197db96d56Sopenharmony_ci return (ret, exc.end) 1207db96d56Sopenharmony_ci codecs.register_error("test.cjktest", myreplace) 1217db96d56Sopenharmony_ci 1227db96d56Sopenharmony_ci for ret in ([1, 2, 3], [], None, object()): 1237db96d56Sopenharmony_ci self.assertRaises(TypeError, self.encode, self.unmappedunicode, 1247db96d56Sopenharmony_ci 'test.cjktest') 1257db96d56Sopenharmony_ci 1267db96d56Sopenharmony_ci def test_callback_long_index(self): 1277db96d56Sopenharmony_ci def myreplace(exc): 1287db96d56Sopenharmony_ci return ('x', int(exc.end)) 1297db96d56Sopenharmony_ci codecs.register_error("test.cjktest", myreplace) 1307db96d56Sopenharmony_ci self.assertEqual(self.encode('abcd' + self.unmappedunicode + 'efgh', 1317db96d56Sopenharmony_ci 'test.cjktest'), (b'abcdxefgh', 9)) 1327db96d56Sopenharmony_ci 1337db96d56Sopenharmony_ci def myreplace(exc): 1347db96d56Sopenharmony_ci return ('x', sys.maxsize + 1) 1357db96d56Sopenharmony_ci codecs.register_error("test.cjktest", myreplace) 1367db96d56Sopenharmony_ci self.assertRaises(IndexError, self.encode, self.unmappedunicode, 1377db96d56Sopenharmony_ci 'test.cjktest') 1387db96d56Sopenharmony_ci 1397db96d56Sopenharmony_ci def test_callback_None_index(self): 1407db96d56Sopenharmony_ci def myreplace(exc): 1417db96d56Sopenharmony_ci return ('x', None) 1427db96d56Sopenharmony_ci codecs.register_error("test.cjktest", myreplace) 1437db96d56Sopenharmony_ci self.assertRaises(TypeError, self.encode, self.unmappedunicode, 1447db96d56Sopenharmony_ci 'test.cjktest') 1457db96d56Sopenharmony_ci 1467db96d56Sopenharmony_ci def test_callback_backward_index(self): 1477db96d56Sopenharmony_ci def myreplace(exc): 1487db96d56Sopenharmony_ci if myreplace.limit > 0: 1497db96d56Sopenharmony_ci myreplace.limit -= 1 1507db96d56Sopenharmony_ci return ('REPLACED', 0) 1517db96d56Sopenharmony_ci else: 1527db96d56Sopenharmony_ci return ('TERMINAL', exc.end) 1537db96d56Sopenharmony_ci myreplace.limit = 3 1547db96d56Sopenharmony_ci codecs.register_error("test.cjktest", myreplace) 1557db96d56Sopenharmony_ci self.assertEqual(self.encode('abcd' + self.unmappedunicode + 'efgh', 1567db96d56Sopenharmony_ci 'test.cjktest'), 1577db96d56Sopenharmony_ci (b'abcdREPLACEDabcdREPLACEDabcdREPLACEDabcdTERMINALefgh', 9)) 1587db96d56Sopenharmony_ci 1597db96d56Sopenharmony_ci def test_callback_forward_index(self): 1607db96d56Sopenharmony_ci def myreplace(exc): 1617db96d56Sopenharmony_ci return ('REPLACED', exc.end + 2) 1627db96d56Sopenharmony_ci codecs.register_error("test.cjktest", myreplace) 1637db96d56Sopenharmony_ci self.assertEqual(self.encode('abcd' + self.unmappedunicode + 'efgh', 1647db96d56Sopenharmony_ci 'test.cjktest'), (b'abcdREPLACEDgh', 9)) 1657db96d56Sopenharmony_ci 1667db96d56Sopenharmony_ci def test_callback_index_outofbound(self): 1677db96d56Sopenharmony_ci def myreplace(exc): 1687db96d56Sopenharmony_ci return ('TERM', 100) 1697db96d56Sopenharmony_ci codecs.register_error("test.cjktest", myreplace) 1707db96d56Sopenharmony_ci self.assertRaises(IndexError, self.encode, self.unmappedunicode, 1717db96d56Sopenharmony_ci 'test.cjktest') 1727db96d56Sopenharmony_ci 1737db96d56Sopenharmony_ci def test_incrementalencoder(self): 1747db96d56Sopenharmony_ci UTF8Reader = codecs.getreader('utf-8') 1757db96d56Sopenharmony_ci for sizehint in [None] + list(range(1, 33)) + \ 1767db96d56Sopenharmony_ci [64, 128, 256, 512, 1024]: 1777db96d56Sopenharmony_ci istream = UTF8Reader(BytesIO(self.tstring[1])) 1787db96d56Sopenharmony_ci ostream = BytesIO() 1797db96d56Sopenharmony_ci encoder = self.incrementalencoder() 1807db96d56Sopenharmony_ci while 1: 1817db96d56Sopenharmony_ci if sizehint is not None: 1827db96d56Sopenharmony_ci data = istream.read(sizehint) 1837db96d56Sopenharmony_ci else: 1847db96d56Sopenharmony_ci data = istream.read() 1857db96d56Sopenharmony_ci 1867db96d56Sopenharmony_ci if not data: 1877db96d56Sopenharmony_ci break 1887db96d56Sopenharmony_ci e = encoder.encode(data) 1897db96d56Sopenharmony_ci ostream.write(e) 1907db96d56Sopenharmony_ci 1917db96d56Sopenharmony_ci self.assertEqual(ostream.getvalue(), self.tstring[0]) 1927db96d56Sopenharmony_ci 1937db96d56Sopenharmony_ci def test_incrementaldecoder(self): 1947db96d56Sopenharmony_ci UTF8Writer = codecs.getwriter('utf-8') 1957db96d56Sopenharmony_ci for sizehint in [None, -1] + list(range(1, 33)) + \ 1967db96d56Sopenharmony_ci [64, 128, 256, 512, 1024]: 1977db96d56Sopenharmony_ci istream = BytesIO(self.tstring[0]) 1987db96d56Sopenharmony_ci ostream = UTF8Writer(BytesIO()) 1997db96d56Sopenharmony_ci decoder = self.incrementaldecoder() 2007db96d56Sopenharmony_ci while 1: 2017db96d56Sopenharmony_ci data = istream.read(sizehint) 2027db96d56Sopenharmony_ci if not data: 2037db96d56Sopenharmony_ci break 2047db96d56Sopenharmony_ci else: 2057db96d56Sopenharmony_ci u = decoder.decode(data) 2067db96d56Sopenharmony_ci ostream.write(u) 2077db96d56Sopenharmony_ci 2087db96d56Sopenharmony_ci self.assertEqual(ostream.getvalue(), self.tstring[1]) 2097db96d56Sopenharmony_ci 2107db96d56Sopenharmony_ci def test_incrementalencoder_error_callback(self): 2117db96d56Sopenharmony_ci inv = self.unmappedunicode 2127db96d56Sopenharmony_ci 2137db96d56Sopenharmony_ci e = self.incrementalencoder() 2147db96d56Sopenharmony_ci self.assertRaises(UnicodeEncodeError, e.encode, inv, True) 2157db96d56Sopenharmony_ci 2167db96d56Sopenharmony_ci e.errors = 'ignore' 2177db96d56Sopenharmony_ci self.assertEqual(e.encode(inv, True), b'') 2187db96d56Sopenharmony_ci 2197db96d56Sopenharmony_ci e.reset() 2207db96d56Sopenharmony_ci def tempreplace(exc): 2217db96d56Sopenharmony_ci return ('called', exc.end) 2227db96d56Sopenharmony_ci codecs.register_error('test.incremental_error_callback', tempreplace) 2237db96d56Sopenharmony_ci e.errors = 'test.incremental_error_callback' 2247db96d56Sopenharmony_ci self.assertEqual(e.encode(inv, True), b'called') 2257db96d56Sopenharmony_ci 2267db96d56Sopenharmony_ci # again 2277db96d56Sopenharmony_ci e.errors = 'ignore' 2287db96d56Sopenharmony_ci self.assertEqual(e.encode(inv, True), b'') 2297db96d56Sopenharmony_ci 2307db96d56Sopenharmony_ci def test_streamreader(self): 2317db96d56Sopenharmony_ci UTF8Writer = codecs.getwriter('utf-8') 2327db96d56Sopenharmony_ci for name in ["read", "readline", "readlines"]: 2337db96d56Sopenharmony_ci for sizehint in [None, -1] + list(range(1, 33)) + \ 2347db96d56Sopenharmony_ci [64, 128, 256, 512, 1024]: 2357db96d56Sopenharmony_ci istream = self.reader(BytesIO(self.tstring[0])) 2367db96d56Sopenharmony_ci ostream = UTF8Writer(BytesIO()) 2377db96d56Sopenharmony_ci func = getattr(istream, name) 2387db96d56Sopenharmony_ci while 1: 2397db96d56Sopenharmony_ci data = func(sizehint) 2407db96d56Sopenharmony_ci if not data: 2417db96d56Sopenharmony_ci break 2427db96d56Sopenharmony_ci if name == "readlines": 2437db96d56Sopenharmony_ci ostream.writelines(data) 2447db96d56Sopenharmony_ci else: 2457db96d56Sopenharmony_ci ostream.write(data) 2467db96d56Sopenharmony_ci 2477db96d56Sopenharmony_ci self.assertEqual(ostream.getvalue(), self.tstring[1]) 2487db96d56Sopenharmony_ci 2497db96d56Sopenharmony_ci def test_streamwriter(self): 2507db96d56Sopenharmony_ci readfuncs = ('read', 'readline', 'readlines') 2517db96d56Sopenharmony_ci UTF8Reader = codecs.getreader('utf-8') 2527db96d56Sopenharmony_ci for name in readfuncs: 2537db96d56Sopenharmony_ci for sizehint in [None] + list(range(1, 33)) + \ 2547db96d56Sopenharmony_ci [64, 128, 256, 512, 1024]: 2557db96d56Sopenharmony_ci istream = UTF8Reader(BytesIO(self.tstring[1])) 2567db96d56Sopenharmony_ci ostream = self.writer(BytesIO()) 2577db96d56Sopenharmony_ci func = getattr(istream, name) 2587db96d56Sopenharmony_ci while 1: 2597db96d56Sopenharmony_ci if sizehint is not None: 2607db96d56Sopenharmony_ci data = func(sizehint) 2617db96d56Sopenharmony_ci else: 2627db96d56Sopenharmony_ci data = func() 2637db96d56Sopenharmony_ci 2647db96d56Sopenharmony_ci if not data: 2657db96d56Sopenharmony_ci break 2667db96d56Sopenharmony_ci if name == "readlines": 2677db96d56Sopenharmony_ci ostream.writelines(data) 2687db96d56Sopenharmony_ci else: 2697db96d56Sopenharmony_ci ostream.write(data) 2707db96d56Sopenharmony_ci 2717db96d56Sopenharmony_ci self.assertEqual(ostream.getvalue(), self.tstring[0]) 2727db96d56Sopenharmony_ci 2737db96d56Sopenharmony_ci def test_streamwriter_reset_no_pending(self): 2747db96d56Sopenharmony_ci # Issue #23247: Calling reset() on a fresh StreamWriter instance 2757db96d56Sopenharmony_ci # (without pending data) must not crash 2767db96d56Sopenharmony_ci stream = BytesIO() 2777db96d56Sopenharmony_ci writer = self.writer(stream) 2787db96d56Sopenharmony_ci writer.reset() 2797db96d56Sopenharmony_ci 2807db96d56Sopenharmony_ci def test_incrementalencoder_del_segfault(self): 2817db96d56Sopenharmony_ci e = self.incrementalencoder() 2827db96d56Sopenharmony_ci with self.assertRaises(AttributeError): 2837db96d56Sopenharmony_ci del e.errors 2847db96d56Sopenharmony_ci 2857db96d56Sopenharmony_ci 2867db96d56Sopenharmony_ciclass TestBase_Mapping(unittest.TestCase): 2877db96d56Sopenharmony_ci pass_enctest = [] 2887db96d56Sopenharmony_ci pass_dectest = [] 2897db96d56Sopenharmony_ci supmaps = [] 2907db96d56Sopenharmony_ci codectests = [] 2917db96d56Sopenharmony_ci 2927db96d56Sopenharmony_ci def setUp(self): 2937db96d56Sopenharmony_ci try: 2947db96d56Sopenharmony_ci self.open_mapping_file().close() # test it to report the error early 2957db96d56Sopenharmony_ci except (OSError, HTTPException): 2967db96d56Sopenharmony_ci self.skipTest("Could not retrieve "+self.mapfileurl) 2977db96d56Sopenharmony_ci 2987db96d56Sopenharmony_ci def open_mapping_file(self): 2997db96d56Sopenharmony_ci return support.open_urlresource(self.mapfileurl, encoding="utf-8") 3007db96d56Sopenharmony_ci 3017db96d56Sopenharmony_ci def test_mapping_file(self): 3027db96d56Sopenharmony_ci if self.mapfileurl.endswith('.xml'): 3037db96d56Sopenharmony_ci self._test_mapping_file_ucm() 3047db96d56Sopenharmony_ci else: 3057db96d56Sopenharmony_ci self._test_mapping_file_plain() 3067db96d56Sopenharmony_ci 3077db96d56Sopenharmony_ci def _test_mapping_file_plain(self): 3087db96d56Sopenharmony_ci def unichrs(s): 3097db96d56Sopenharmony_ci return ''.join(chr(int(x, 16)) for x in s.split('+')) 3107db96d56Sopenharmony_ci 3117db96d56Sopenharmony_ci urt_wa = {} 3127db96d56Sopenharmony_ci 3137db96d56Sopenharmony_ci with self.open_mapping_file() as f: 3147db96d56Sopenharmony_ci for line in f: 3157db96d56Sopenharmony_ci if not line: 3167db96d56Sopenharmony_ci break 3177db96d56Sopenharmony_ci data = line.split('#')[0].split() 3187db96d56Sopenharmony_ci if len(data) != 2: 3197db96d56Sopenharmony_ci continue 3207db96d56Sopenharmony_ci 3217db96d56Sopenharmony_ci if data[0][:2] != '0x': 3227db96d56Sopenharmony_ci self.fail(f"Invalid line: {line!r}") 3237db96d56Sopenharmony_ci csetch = bytes.fromhex(data[0][2:]) 3247db96d56Sopenharmony_ci if len(csetch) == 1 and 0x80 <= csetch[0]: 3257db96d56Sopenharmony_ci continue 3267db96d56Sopenharmony_ci 3277db96d56Sopenharmony_ci unich = unichrs(data[1]) 3287db96d56Sopenharmony_ci if ord(unich) == 0xfffd or unich in urt_wa: 3297db96d56Sopenharmony_ci continue 3307db96d56Sopenharmony_ci urt_wa[unich] = csetch 3317db96d56Sopenharmony_ci 3327db96d56Sopenharmony_ci self._testpoint(csetch, unich) 3337db96d56Sopenharmony_ci 3347db96d56Sopenharmony_ci def _test_mapping_file_ucm(self): 3357db96d56Sopenharmony_ci with self.open_mapping_file() as f: 3367db96d56Sopenharmony_ci ucmdata = f.read() 3377db96d56Sopenharmony_ci uc = re.findall('<a u="([A-F0-9]{4})" b="([0-9A-F ]+)"/>', ucmdata) 3387db96d56Sopenharmony_ci for uni, coded in uc: 3397db96d56Sopenharmony_ci unich = chr(int(uni, 16)) 3407db96d56Sopenharmony_ci codech = bytes.fromhex(coded) 3417db96d56Sopenharmony_ci self._testpoint(codech, unich) 3427db96d56Sopenharmony_ci 3437db96d56Sopenharmony_ci def test_mapping_supplemental(self): 3447db96d56Sopenharmony_ci for mapping in self.supmaps: 3457db96d56Sopenharmony_ci self._testpoint(*mapping) 3467db96d56Sopenharmony_ci 3477db96d56Sopenharmony_ci def _testpoint(self, csetch, unich): 3487db96d56Sopenharmony_ci if (csetch, unich) not in self.pass_enctest: 3497db96d56Sopenharmony_ci self.assertEqual(unich.encode(self.encoding), csetch) 3507db96d56Sopenharmony_ci if (csetch, unich) not in self.pass_dectest: 3517db96d56Sopenharmony_ci self.assertEqual(str(csetch, self.encoding), unich) 3527db96d56Sopenharmony_ci 3537db96d56Sopenharmony_ci def test_errorhandle(self): 3547db96d56Sopenharmony_ci for source, scheme, expected in self.codectests: 3557db96d56Sopenharmony_ci if isinstance(source, bytes): 3567db96d56Sopenharmony_ci func = source.decode 3577db96d56Sopenharmony_ci else: 3587db96d56Sopenharmony_ci func = source.encode 3597db96d56Sopenharmony_ci if expected: 3607db96d56Sopenharmony_ci if isinstance(source, bytes): 3617db96d56Sopenharmony_ci result = func(self.encoding, scheme) 3627db96d56Sopenharmony_ci self.assertTrue(type(result) is str, type(result)) 3637db96d56Sopenharmony_ci self.assertEqual(result, expected, 3647db96d56Sopenharmony_ci '%a.decode(%r, %r)=%a != %a' 3657db96d56Sopenharmony_ci % (source, self.encoding, scheme, result, 3667db96d56Sopenharmony_ci expected)) 3677db96d56Sopenharmony_ci else: 3687db96d56Sopenharmony_ci result = func(self.encoding, scheme) 3697db96d56Sopenharmony_ci self.assertTrue(type(result) is bytes, type(result)) 3707db96d56Sopenharmony_ci self.assertEqual(result, expected, 3717db96d56Sopenharmony_ci '%a.encode(%r, %r)=%a != %a' 3727db96d56Sopenharmony_ci % (source, self.encoding, scheme, result, 3737db96d56Sopenharmony_ci expected)) 3747db96d56Sopenharmony_ci else: 3757db96d56Sopenharmony_ci self.assertRaises(UnicodeError, func, self.encoding, scheme) 3767db96d56Sopenharmony_ci 3777db96d56Sopenharmony_cidef load_teststring(name): 3787db96d56Sopenharmony_ci dir = os.path.join(os.path.dirname(__file__), 'cjkencodings') 3797db96d56Sopenharmony_ci with open(os.path.join(dir, name + '.txt'), 'rb') as f: 3807db96d56Sopenharmony_ci encoded = f.read() 3817db96d56Sopenharmony_ci with open(os.path.join(dir, name + '-utf8.txt'), 'rb') as f: 3827db96d56Sopenharmony_ci utf8 = f.read() 3837db96d56Sopenharmony_ci return encoded, utf8 384