17db96d56Sopenharmony_ci""" 27db96d56Sopenharmony_ciVery minimal unittests for parts of the readline module. 37db96d56Sopenharmony_ci""" 47db96d56Sopenharmony_cifrom contextlib import ExitStack 57db96d56Sopenharmony_cifrom errno import EIO 67db96d56Sopenharmony_ciimport locale 77db96d56Sopenharmony_ciimport os 87db96d56Sopenharmony_ciimport selectors 97db96d56Sopenharmony_ciimport subprocess 107db96d56Sopenharmony_ciimport sys 117db96d56Sopenharmony_ciimport tempfile 127db96d56Sopenharmony_ciimport unittest 137db96d56Sopenharmony_cifrom test.support import verbose 147db96d56Sopenharmony_cifrom test.support.import_helper import import_module 157db96d56Sopenharmony_cifrom test.support.os_helper import unlink, temp_dir, TESTFN 167db96d56Sopenharmony_cifrom test.support.script_helper import assert_python_ok 177db96d56Sopenharmony_ci 187db96d56Sopenharmony_ci# Skip tests if there is no readline module 197db96d56Sopenharmony_cireadline = import_module('readline') 207db96d56Sopenharmony_ci 217db96d56Sopenharmony_ciif hasattr(readline, "_READLINE_LIBRARY_VERSION"): 227db96d56Sopenharmony_ci is_editline = ("EditLine wrapper" in readline._READLINE_LIBRARY_VERSION) 237db96d56Sopenharmony_cielse: 247db96d56Sopenharmony_ci is_editline = (readline.__doc__ and "libedit" in readline.__doc__) 257db96d56Sopenharmony_ci 267db96d56Sopenharmony_ci 277db96d56Sopenharmony_cidef setUpModule(): 287db96d56Sopenharmony_ci if verbose: 297db96d56Sopenharmony_ci # Python implementations other than CPython may not have 307db96d56Sopenharmony_ci # these private attributes 317db96d56Sopenharmony_ci if hasattr(readline, "_READLINE_VERSION"): 327db96d56Sopenharmony_ci print(f"readline version: {readline._READLINE_VERSION:#x}") 337db96d56Sopenharmony_ci print(f"readline runtime version: {readline._READLINE_RUNTIME_VERSION:#x}") 347db96d56Sopenharmony_ci if hasattr(readline, "_READLINE_LIBRARY_VERSION"): 357db96d56Sopenharmony_ci print(f"readline library version: {readline._READLINE_LIBRARY_VERSION!r}") 367db96d56Sopenharmony_ci print(f"use libedit emulation? {is_editline}") 377db96d56Sopenharmony_ci 387db96d56Sopenharmony_ci 397db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(readline, "clear_history"), 407db96d56Sopenharmony_ci "The history update test cannot be run because the " 417db96d56Sopenharmony_ci "clear_history method is not available.") 427db96d56Sopenharmony_ciclass TestHistoryManipulation (unittest.TestCase): 437db96d56Sopenharmony_ci """ 447db96d56Sopenharmony_ci These tests were added to check that the libedit emulation on OSX and the 457db96d56Sopenharmony_ci "real" readline have the same interface for history manipulation. That's 467db96d56Sopenharmony_ci why the tests cover only a small subset of the interface. 477db96d56Sopenharmony_ci """ 487db96d56Sopenharmony_ci 497db96d56Sopenharmony_ci def testHistoryUpdates(self): 507db96d56Sopenharmony_ci readline.clear_history() 517db96d56Sopenharmony_ci 527db96d56Sopenharmony_ci readline.add_history("first line") 537db96d56Sopenharmony_ci readline.add_history("second line") 547db96d56Sopenharmony_ci 557db96d56Sopenharmony_ci self.assertEqual(readline.get_history_item(0), None) 567db96d56Sopenharmony_ci self.assertEqual(readline.get_history_item(1), "first line") 577db96d56Sopenharmony_ci self.assertEqual(readline.get_history_item(2), "second line") 587db96d56Sopenharmony_ci 597db96d56Sopenharmony_ci readline.replace_history_item(0, "replaced line") 607db96d56Sopenharmony_ci self.assertEqual(readline.get_history_item(0), None) 617db96d56Sopenharmony_ci self.assertEqual(readline.get_history_item(1), "replaced line") 627db96d56Sopenharmony_ci self.assertEqual(readline.get_history_item(2), "second line") 637db96d56Sopenharmony_ci 647db96d56Sopenharmony_ci self.assertEqual(readline.get_current_history_length(), 2) 657db96d56Sopenharmony_ci 667db96d56Sopenharmony_ci readline.remove_history_item(0) 677db96d56Sopenharmony_ci self.assertEqual(readline.get_history_item(0), None) 687db96d56Sopenharmony_ci self.assertEqual(readline.get_history_item(1), "second line") 697db96d56Sopenharmony_ci 707db96d56Sopenharmony_ci self.assertEqual(readline.get_current_history_length(), 1) 717db96d56Sopenharmony_ci 727db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(readline, "append_history_file"), 737db96d56Sopenharmony_ci "append_history not available") 747db96d56Sopenharmony_ci def test_write_read_append(self): 757db96d56Sopenharmony_ci hfile = tempfile.NamedTemporaryFile(delete=False) 767db96d56Sopenharmony_ci hfile.close() 777db96d56Sopenharmony_ci hfilename = hfile.name 787db96d56Sopenharmony_ci self.addCleanup(unlink, hfilename) 797db96d56Sopenharmony_ci 807db96d56Sopenharmony_ci # test write-clear-read == nop 817db96d56Sopenharmony_ci readline.clear_history() 827db96d56Sopenharmony_ci readline.add_history("first line") 837db96d56Sopenharmony_ci readline.add_history("second line") 847db96d56Sopenharmony_ci readline.write_history_file(hfilename) 857db96d56Sopenharmony_ci 867db96d56Sopenharmony_ci readline.clear_history() 877db96d56Sopenharmony_ci self.assertEqual(readline.get_current_history_length(), 0) 887db96d56Sopenharmony_ci 897db96d56Sopenharmony_ci readline.read_history_file(hfilename) 907db96d56Sopenharmony_ci self.assertEqual(readline.get_current_history_length(), 2) 917db96d56Sopenharmony_ci self.assertEqual(readline.get_history_item(1), "first line") 927db96d56Sopenharmony_ci self.assertEqual(readline.get_history_item(2), "second line") 937db96d56Sopenharmony_ci 947db96d56Sopenharmony_ci # test append 957db96d56Sopenharmony_ci readline.append_history_file(1, hfilename) 967db96d56Sopenharmony_ci readline.clear_history() 977db96d56Sopenharmony_ci readline.read_history_file(hfilename) 987db96d56Sopenharmony_ci self.assertEqual(readline.get_current_history_length(), 3) 997db96d56Sopenharmony_ci self.assertEqual(readline.get_history_item(1), "first line") 1007db96d56Sopenharmony_ci self.assertEqual(readline.get_history_item(2), "second line") 1017db96d56Sopenharmony_ci self.assertEqual(readline.get_history_item(3), "second line") 1027db96d56Sopenharmony_ci 1037db96d56Sopenharmony_ci # test 'no such file' behaviour 1047db96d56Sopenharmony_ci os.unlink(hfilename) 1057db96d56Sopenharmony_ci try: 1067db96d56Sopenharmony_ci readline.append_history_file(1, hfilename) 1077db96d56Sopenharmony_ci except FileNotFoundError: 1087db96d56Sopenharmony_ci pass # Some implementations return this error (libreadline). 1097db96d56Sopenharmony_ci else: 1107db96d56Sopenharmony_ci os.unlink(hfilename) # Some create it anyways (libedit). 1117db96d56Sopenharmony_ci # If the file wasn't created, unlink will fail. 1127db96d56Sopenharmony_ci # We're just testing that one of the two expected behaviors happens 1137db96d56Sopenharmony_ci # instead of an incorrect error. 1147db96d56Sopenharmony_ci 1157db96d56Sopenharmony_ci # write_history_file can create the target 1167db96d56Sopenharmony_ci readline.write_history_file(hfilename) 1177db96d56Sopenharmony_ci 1187db96d56Sopenharmony_ci def test_nonascii_history(self): 1197db96d56Sopenharmony_ci readline.clear_history() 1207db96d56Sopenharmony_ci try: 1217db96d56Sopenharmony_ci readline.add_history("entrée 1") 1227db96d56Sopenharmony_ci except UnicodeEncodeError as err: 1237db96d56Sopenharmony_ci self.skipTest("Locale cannot encode test data: " + format(err)) 1247db96d56Sopenharmony_ci readline.add_history("entrée 2") 1257db96d56Sopenharmony_ci readline.replace_history_item(1, "entrée 22") 1267db96d56Sopenharmony_ci readline.write_history_file(TESTFN) 1277db96d56Sopenharmony_ci self.addCleanup(os.remove, TESTFN) 1287db96d56Sopenharmony_ci readline.clear_history() 1297db96d56Sopenharmony_ci readline.read_history_file(TESTFN) 1307db96d56Sopenharmony_ci if is_editline: 1317db96d56Sopenharmony_ci # An add_history() call seems to be required for get_history_ 1327db96d56Sopenharmony_ci # item() to register items from the file 1337db96d56Sopenharmony_ci readline.add_history("dummy") 1347db96d56Sopenharmony_ci self.assertEqual(readline.get_history_item(1), "entrée 1") 1357db96d56Sopenharmony_ci self.assertEqual(readline.get_history_item(2), "entrée 22") 1367db96d56Sopenharmony_ci 1377db96d56Sopenharmony_ci 1387db96d56Sopenharmony_ciclass TestReadline(unittest.TestCase): 1397db96d56Sopenharmony_ci 1407db96d56Sopenharmony_ci @unittest.skipIf(readline._READLINE_VERSION < 0x0601 and not is_editline, 1417db96d56Sopenharmony_ci "not supported in this library version") 1427db96d56Sopenharmony_ci def test_init(self): 1437db96d56Sopenharmony_ci # Issue #19884: Ensure that the ANSI sequence "\033[1034h" is not 1447db96d56Sopenharmony_ci # written into stdout when the readline module is imported and stdout 1457db96d56Sopenharmony_ci # is redirected to a pipe. 1467db96d56Sopenharmony_ci rc, stdout, stderr = assert_python_ok('-c', 'import readline', 1477db96d56Sopenharmony_ci TERM='xterm-256color') 1487db96d56Sopenharmony_ci self.assertEqual(stdout, b'') 1497db96d56Sopenharmony_ci 1507db96d56Sopenharmony_ci auto_history_script = """\ 1517db96d56Sopenharmony_ciimport readline 1527db96d56Sopenharmony_cireadline.set_auto_history({}) 1537db96d56Sopenharmony_ciinput() 1547db96d56Sopenharmony_ciprint("History length:", readline.get_current_history_length()) 1557db96d56Sopenharmony_ci""" 1567db96d56Sopenharmony_ci 1577db96d56Sopenharmony_ci def test_auto_history_enabled(self): 1587db96d56Sopenharmony_ci output = run_pty(self.auto_history_script.format(True)) 1597db96d56Sopenharmony_ci # bpo-44949: Sometimes, the newline character is not written at the 1607db96d56Sopenharmony_ci # end, so don't expect it in the output. 1617db96d56Sopenharmony_ci self.assertIn(b"History length: 1", output) 1627db96d56Sopenharmony_ci 1637db96d56Sopenharmony_ci def test_auto_history_disabled(self): 1647db96d56Sopenharmony_ci output = run_pty(self.auto_history_script.format(False)) 1657db96d56Sopenharmony_ci # bpo-44949: Sometimes, the newline character is not written at the 1667db96d56Sopenharmony_ci # end, so don't expect it in the output. 1677db96d56Sopenharmony_ci self.assertIn(b"History length: 0", output) 1687db96d56Sopenharmony_ci 1697db96d56Sopenharmony_ci def test_nonascii(self): 1707db96d56Sopenharmony_ci loc = locale.setlocale(locale.LC_CTYPE, None) 1717db96d56Sopenharmony_ci if loc in ('C', 'POSIX'): 1727db96d56Sopenharmony_ci # bpo-29240: On FreeBSD, if the LC_CTYPE locale is C or POSIX, 1737db96d56Sopenharmony_ci # writing and reading non-ASCII bytes into/from a TTY works, but 1747db96d56Sopenharmony_ci # readline or ncurses ignores non-ASCII bytes on read. 1757db96d56Sopenharmony_ci self.skipTest(f"the LC_CTYPE locale is {loc!r}") 1767db96d56Sopenharmony_ci 1777db96d56Sopenharmony_ci try: 1787db96d56Sopenharmony_ci readline.add_history("\xEB\xEF") 1797db96d56Sopenharmony_ci except UnicodeEncodeError as err: 1807db96d56Sopenharmony_ci self.skipTest("Locale cannot encode test data: " + format(err)) 1817db96d56Sopenharmony_ci 1827db96d56Sopenharmony_ci script = r"""import readline 1837db96d56Sopenharmony_ci 1847db96d56Sopenharmony_ciis_editline = readline.__doc__ and "libedit" in readline.__doc__ 1857db96d56Sopenharmony_ciinserted = "[\xEFnserted]" 1867db96d56Sopenharmony_cimacro = "|t\xEB[after]" 1877db96d56Sopenharmony_ciset_pre_input_hook = getattr(readline, "set_pre_input_hook", None) 1887db96d56Sopenharmony_ciif is_editline or not set_pre_input_hook: 1897db96d56Sopenharmony_ci # The insert_line() call via pre_input_hook() does nothing with Editline, 1907db96d56Sopenharmony_ci # so include the extra text that would have been inserted here 1917db96d56Sopenharmony_ci macro = inserted + macro 1927db96d56Sopenharmony_ci 1937db96d56Sopenharmony_ciif is_editline: 1947db96d56Sopenharmony_ci readline.parse_and_bind(r'bind ^B ed-prev-char') 1957db96d56Sopenharmony_ci readline.parse_and_bind(r'bind "\t" rl_complete') 1967db96d56Sopenharmony_ci readline.parse_and_bind(r'bind -s ^A "{}"'.format(macro)) 1977db96d56Sopenharmony_cielse: 1987db96d56Sopenharmony_ci readline.parse_and_bind(r'Control-b: backward-char') 1997db96d56Sopenharmony_ci readline.parse_and_bind(r'"\t": complete') 2007db96d56Sopenharmony_ci readline.parse_and_bind(r'set disable-completion off') 2017db96d56Sopenharmony_ci readline.parse_and_bind(r'set show-all-if-ambiguous off') 2027db96d56Sopenharmony_ci readline.parse_and_bind(r'set show-all-if-unmodified off') 2037db96d56Sopenharmony_ci readline.parse_and_bind(r'Control-a: "{}"'.format(macro)) 2047db96d56Sopenharmony_ci 2057db96d56Sopenharmony_cidef pre_input_hook(): 2067db96d56Sopenharmony_ci readline.insert_text(inserted) 2077db96d56Sopenharmony_ci readline.redisplay() 2087db96d56Sopenharmony_ciif set_pre_input_hook: 2097db96d56Sopenharmony_ci set_pre_input_hook(pre_input_hook) 2107db96d56Sopenharmony_ci 2117db96d56Sopenharmony_cidef completer(text, state): 2127db96d56Sopenharmony_ci if text == "t\xEB": 2137db96d56Sopenharmony_ci if state == 0: 2147db96d56Sopenharmony_ci print("text", ascii(text)) 2157db96d56Sopenharmony_ci print("line", ascii(readline.get_line_buffer())) 2167db96d56Sopenharmony_ci print("indexes", readline.get_begidx(), readline.get_endidx()) 2177db96d56Sopenharmony_ci return "t\xEBnt" 2187db96d56Sopenharmony_ci if state == 1: 2197db96d56Sopenharmony_ci return "t\xEBxt" 2207db96d56Sopenharmony_ci if text == "t\xEBx" and state == 0: 2217db96d56Sopenharmony_ci return "t\xEBxt" 2227db96d56Sopenharmony_ci return None 2237db96d56Sopenharmony_cireadline.set_completer(completer) 2247db96d56Sopenharmony_ci 2257db96d56Sopenharmony_cidef display(substitution, matches, longest_match_length): 2267db96d56Sopenharmony_ci print("substitution", ascii(substitution)) 2277db96d56Sopenharmony_ci print("matches", ascii(matches)) 2287db96d56Sopenharmony_cireadline.set_completion_display_matches_hook(display) 2297db96d56Sopenharmony_ci 2307db96d56Sopenharmony_ciprint("result", ascii(input())) 2317db96d56Sopenharmony_ciprint("history", ascii(readline.get_history_item(1))) 2327db96d56Sopenharmony_ci""" 2337db96d56Sopenharmony_ci 2347db96d56Sopenharmony_ci input = b"\x01" # Ctrl-A, expands to "|t\xEB[after]" 2357db96d56Sopenharmony_ci input += b"\x02" * len("[after]") # Move cursor back 2367db96d56Sopenharmony_ci input += b"\t\t" # Display possible completions 2377db96d56Sopenharmony_ci input += b"x\t" # Complete "t\xEBx" -> "t\xEBxt" 2387db96d56Sopenharmony_ci input += b"\r" 2397db96d56Sopenharmony_ci output = run_pty(script, input) 2407db96d56Sopenharmony_ci self.assertIn(b"text 't\\xeb'\r\n", output) 2417db96d56Sopenharmony_ci self.assertIn(b"line '[\\xefnserted]|t\\xeb[after]'\r\n", output) 2427db96d56Sopenharmony_ci if sys.platform == "darwin" or not is_editline: 2437db96d56Sopenharmony_ci self.assertIn(b"indexes 11 13\r\n", output) 2447db96d56Sopenharmony_ci # Non-macOS libedit does not handle non-ASCII bytes 2457db96d56Sopenharmony_ci # the same way and generates character indices 2467db96d56Sopenharmony_ci # rather than byte indices via get_begidx() and 2477db96d56Sopenharmony_ci # get_endidx(). Ex: libedit2 3.1-20191231-2 on Debian 2487db96d56Sopenharmony_ci # winds up with "indexes 10 12". Stemming from the 2497db96d56Sopenharmony_ci # start and end values calls back into readline.c's 2507db96d56Sopenharmony_ci # rl_attempted_completion_function = flex_complete with: 2517db96d56Sopenharmony_ci # (11, 13) instead of libreadline's (12, 15). 2527db96d56Sopenharmony_ci 2537db96d56Sopenharmony_ci if not is_editline and hasattr(readline, "set_pre_input_hook"): 2547db96d56Sopenharmony_ci self.assertIn(b"substitution 't\\xeb'\r\n", output) 2557db96d56Sopenharmony_ci self.assertIn(b"matches ['t\\xebnt', 't\\xebxt']\r\n", output) 2567db96d56Sopenharmony_ci expected = br"'[\xefnserted]|t\xebxt[after]'" 2577db96d56Sopenharmony_ci self.assertIn(b"result " + expected + b"\r\n", output) 2587db96d56Sopenharmony_ci # bpo-45195: Sometimes, the newline character is not written at the 2597db96d56Sopenharmony_ci # end, so don't expect it in the output. 2607db96d56Sopenharmony_ci self.assertIn(b"history " + expected, output) 2617db96d56Sopenharmony_ci 2627db96d56Sopenharmony_ci # We have 2 reasons to skip this test: 2637db96d56Sopenharmony_ci # - readline: history size was added in 6.0 2647db96d56Sopenharmony_ci # See https://cnswww.cns.cwru.edu/php/chet/readline/CHANGES 2657db96d56Sopenharmony_ci # - editline: history size is broken on OS X 10.11.6. 2667db96d56Sopenharmony_ci # Newer versions were not tested yet. 2677db96d56Sopenharmony_ci @unittest.skipIf(readline._READLINE_VERSION < 0x600, 2687db96d56Sopenharmony_ci "this readline version does not support history-size") 2697db96d56Sopenharmony_ci @unittest.skipIf(is_editline, 2707db96d56Sopenharmony_ci "editline history size configuration is broken") 2717db96d56Sopenharmony_ci def test_history_size(self): 2727db96d56Sopenharmony_ci history_size = 10 2737db96d56Sopenharmony_ci with temp_dir() as test_dir: 2747db96d56Sopenharmony_ci inputrc = os.path.join(test_dir, "inputrc") 2757db96d56Sopenharmony_ci with open(inputrc, "wb") as f: 2767db96d56Sopenharmony_ci f.write(b"set history-size %d\n" % history_size) 2777db96d56Sopenharmony_ci 2787db96d56Sopenharmony_ci history_file = os.path.join(test_dir, "history") 2797db96d56Sopenharmony_ci with open(history_file, "wb") as f: 2807db96d56Sopenharmony_ci # history_size * 2 items crashes readline 2817db96d56Sopenharmony_ci data = b"".join(b"item %d\n" % i 2827db96d56Sopenharmony_ci for i in range(history_size * 2)) 2837db96d56Sopenharmony_ci f.write(data) 2847db96d56Sopenharmony_ci 2857db96d56Sopenharmony_ci script = """ 2867db96d56Sopenharmony_ciimport os 2877db96d56Sopenharmony_ciimport readline 2887db96d56Sopenharmony_ci 2897db96d56Sopenharmony_cihistory_file = os.environ["HISTORY_FILE"] 2907db96d56Sopenharmony_cireadline.read_history_file(history_file) 2917db96d56Sopenharmony_ciinput() 2927db96d56Sopenharmony_cireadline.write_history_file(history_file) 2937db96d56Sopenharmony_ci""" 2947db96d56Sopenharmony_ci 2957db96d56Sopenharmony_ci env = dict(os.environ) 2967db96d56Sopenharmony_ci env["INPUTRC"] = inputrc 2977db96d56Sopenharmony_ci env["HISTORY_FILE"] = history_file 2987db96d56Sopenharmony_ci 2997db96d56Sopenharmony_ci run_pty(script, input=b"last input\r", env=env) 3007db96d56Sopenharmony_ci 3017db96d56Sopenharmony_ci with open(history_file, "rb") as f: 3027db96d56Sopenharmony_ci lines = f.readlines() 3037db96d56Sopenharmony_ci self.assertEqual(len(lines), history_size) 3047db96d56Sopenharmony_ci self.assertEqual(lines[-1].strip(), b"last input") 3057db96d56Sopenharmony_ci 3067db96d56Sopenharmony_ci 3077db96d56Sopenharmony_cidef run_pty(script, input=b"dummy input\r", env=None): 3087db96d56Sopenharmony_ci pty = import_module('pty') 3097db96d56Sopenharmony_ci output = bytearray() 3107db96d56Sopenharmony_ci [master, slave] = pty.openpty() 3117db96d56Sopenharmony_ci args = (sys.executable, '-c', script) 3127db96d56Sopenharmony_ci proc = subprocess.Popen(args, stdin=slave, stdout=slave, stderr=slave, env=env) 3137db96d56Sopenharmony_ci os.close(slave) 3147db96d56Sopenharmony_ci with ExitStack() as cleanup: 3157db96d56Sopenharmony_ci cleanup.enter_context(proc) 3167db96d56Sopenharmony_ci def terminate(proc): 3177db96d56Sopenharmony_ci try: 3187db96d56Sopenharmony_ci proc.terminate() 3197db96d56Sopenharmony_ci except ProcessLookupError: 3207db96d56Sopenharmony_ci # Workaround for Open/Net BSD bug (Issue 16762) 3217db96d56Sopenharmony_ci pass 3227db96d56Sopenharmony_ci cleanup.callback(terminate, proc) 3237db96d56Sopenharmony_ci cleanup.callback(os.close, master) 3247db96d56Sopenharmony_ci # Avoid using DefaultSelector and PollSelector. Kqueue() does not 3257db96d56Sopenharmony_ci # work with pseudo-terminals on OS X < 10.9 (Issue 20365) and Open 3267db96d56Sopenharmony_ci # BSD (Issue 20667). Poll() does not work with OS X 10.6 or 10.4 3277db96d56Sopenharmony_ci # either (Issue 20472). Hopefully the file descriptor is low enough 3287db96d56Sopenharmony_ci # to use with select(). 3297db96d56Sopenharmony_ci sel = cleanup.enter_context(selectors.SelectSelector()) 3307db96d56Sopenharmony_ci sel.register(master, selectors.EVENT_READ | selectors.EVENT_WRITE) 3317db96d56Sopenharmony_ci os.set_blocking(master, False) 3327db96d56Sopenharmony_ci while True: 3337db96d56Sopenharmony_ci for [_, events] in sel.select(): 3347db96d56Sopenharmony_ci if events & selectors.EVENT_READ: 3357db96d56Sopenharmony_ci try: 3367db96d56Sopenharmony_ci chunk = os.read(master, 0x10000) 3377db96d56Sopenharmony_ci except OSError as err: 3387db96d56Sopenharmony_ci # Linux raises EIO when slave is closed (Issue 5380) 3397db96d56Sopenharmony_ci if err.errno != EIO: 3407db96d56Sopenharmony_ci raise 3417db96d56Sopenharmony_ci chunk = b"" 3427db96d56Sopenharmony_ci if not chunk: 3437db96d56Sopenharmony_ci return output 3447db96d56Sopenharmony_ci output.extend(chunk) 3457db96d56Sopenharmony_ci if events & selectors.EVENT_WRITE: 3467db96d56Sopenharmony_ci try: 3477db96d56Sopenharmony_ci input = input[os.write(master, input):] 3487db96d56Sopenharmony_ci except OSError as err: 3497db96d56Sopenharmony_ci # Apparently EIO means the slave was closed 3507db96d56Sopenharmony_ci if err.errno != EIO: 3517db96d56Sopenharmony_ci raise 3527db96d56Sopenharmony_ci input = b"" # Stop writing 3537db96d56Sopenharmony_ci if not input: 3547db96d56Sopenharmony_ci sel.modify(master, selectors.EVENT_READ) 3557db96d56Sopenharmony_ci 3567db96d56Sopenharmony_ci 3577db96d56Sopenharmony_ciif __name__ == "__main__": 3587db96d56Sopenharmony_ci unittest.main() 359