17db96d56Sopenharmony_ciimport io 27db96d56Sopenharmony_ciimport socket 37db96d56Sopenharmony_ciimport datetime 47db96d56Sopenharmony_ciimport textwrap 57db96d56Sopenharmony_ciimport unittest 67db96d56Sopenharmony_ciimport functools 77db96d56Sopenharmony_ciimport contextlib 87db96d56Sopenharmony_ciimport os.path 97db96d56Sopenharmony_ciimport re 107db96d56Sopenharmony_ciimport threading 117db96d56Sopenharmony_ci 127db96d56Sopenharmony_cifrom test import support 137db96d56Sopenharmony_cifrom test.support import socket_helper, warnings_helper 147db96d56Sopenharmony_cinntplib = warnings_helper.import_deprecated("nntplib") 157db96d56Sopenharmony_cifrom nntplib import NNTP, GroupInfo 167db96d56Sopenharmony_cifrom unittest.mock import patch 177db96d56Sopenharmony_citry: 187db96d56Sopenharmony_ci import ssl 197db96d56Sopenharmony_ciexcept ImportError: 207db96d56Sopenharmony_ci ssl = None 217db96d56Sopenharmony_ci 227db96d56Sopenharmony_ci 237db96d56Sopenharmony_cicertfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem') 247db96d56Sopenharmony_ci 257db96d56Sopenharmony_ciif ssl is not None: 267db96d56Sopenharmony_ci SSLError = ssl.SSLError 277db96d56Sopenharmony_cielse: 287db96d56Sopenharmony_ci class SSLError(Exception): 297db96d56Sopenharmony_ci """Non-existent exception class when we lack SSL support.""" 307db96d56Sopenharmony_ci reason = "This will never be raised." 317db96d56Sopenharmony_ci 327db96d56Sopenharmony_ci# TODO: 337db96d56Sopenharmony_ci# - test the `file` arg to more commands 347db96d56Sopenharmony_ci# - test error conditions 357db96d56Sopenharmony_ci# - test auth and `usenetrc` 367db96d56Sopenharmony_ci 377db96d56Sopenharmony_ci 387db96d56Sopenharmony_ciclass NetworkedNNTPTestsMixin: 397db96d56Sopenharmony_ci 407db96d56Sopenharmony_ci ssl_context = None 417db96d56Sopenharmony_ci 427db96d56Sopenharmony_ci def test_welcome(self): 437db96d56Sopenharmony_ci welcome = self.server.getwelcome() 447db96d56Sopenharmony_ci self.assertEqual(str, type(welcome)) 457db96d56Sopenharmony_ci 467db96d56Sopenharmony_ci def test_help(self): 477db96d56Sopenharmony_ci resp, lines = self.server.help() 487db96d56Sopenharmony_ci self.assertTrue(resp.startswith("100 "), resp) 497db96d56Sopenharmony_ci for line in lines: 507db96d56Sopenharmony_ci self.assertEqual(str, type(line)) 517db96d56Sopenharmony_ci 527db96d56Sopenharmony_ci def test_list(self): 537db96d56Sopenharmony_ci resp, groups = self.server.list() 547db96d56Sopenharmony_ci if len(groups) > 0: 557db96d56Sopenharmony_ci self.assertEqual(GroupInfo, type(groups[0])) 567db96d56Sopenharmony_ci self.assertEqual(str, type(groups[0].group)) 577db96d56Sopenharmony_ci 587db96d56Sopenharmony_ci def test_list_active(self): 597db96d56Sopenharmony_ci resp, groups = self.server.list(self.GROUP_PAT) 607db96d56Sopenharmony_ci if len(groups) > 0: 617db96d56Sopenharmony_ci self.assertEqual(GroupInfo, type(groups[0])) 627db96d56Sopenharmony_ci self.assertEqual(str, type(groups[0].group)) 637db96d56Sopenharmony_ci 647db96d56Sopenharmony_ci def test_unknown_command(self): 657db96d56Sopenharmony_ci with self.assertRaises(nntplib.NNTPPermanentError) as cm: 667db96d56Sopenharmony_ci self.server._shortcmd("XYZZY") 677db96d56Sopenharmony_ci resp = cm.exception.response 687db96d56Sopenharmony_ci self.assertTrue(resp.startswith("500 "), resp) 697db96d56Sopenharmony_ci 707db96d56Sopenharmony_ci def test_newgroups(self): 717db96d56Sopenharmony_ci # gmane gets a constant influx of new groups. In order not to stress 727db96d56Sopenharmony_ci # the server too much, we choose a recent date in the past. 737db96d56Sopenharmony_ci dt = datetime.date.today() - datetime.timedelta(days=7) 747db96d56Sopenharmony_ci resp, groups = self.server.newgroups(dt) 757db96d56Sopenharmony_ci if len(groups) > 0: 767db96d56Sopenharmony_ci self.assertIsInstance(groups[0], GroupInfo) 777db96d56Sopenharmony_ci self.assertIsInstance(groups[0].group, str) 787db96d56Sopenharmony_ci 797db96d56Sopenharmony_ci def test_description(self): 807db96d56Sopenharmony_ci def _check_desc(desc): 817db96d56Sopenharmony_ci # Sanity checks 827db96d56Sopenharmony_ci self.assertIsInstance(desc, str) 837db96d56Sopenharmony_ci self.assertNotIn(self.GROUP_NAME, desc) 847db96d56Sopenharmony_ci desc = self.server.description(self.GROUP_NAME) 857db96d56Sopenharmony_ci _check_desc(desc) 867db96d56Sopenharmony_ci # Another sanity check 877db96d56Sopenharmony_ci self.assertIn(self.DESC, desc) 887db96d56Sopenharmony_ci # With a pattern 897db96d56Sopenharmony_ci desc = self.server.description(self.GROUP_PAT) 907db96d56Sopenharmony_ci _check_desc(desc) 917db96d56Sopenharmony_ci # Shouldn't exist 927db96d56Sopenharmony_ci desc = self.server.description("zk.brrtt.baz") 937db96d56Sopenharmony_ci self.assertEqual(desc, '') 947db96d56Sopenharmony_ci 957db96d56Sopenharmony_ci def test_descriptions(self): 967db96d56Sopenharmony_ci resp, descs = self.server.descriptions(self.GROUP_PAT) 977db96d56Sopenharmony_ci # 215 for LIST NEWSGROUPS, 282 for XGTITLE 987db96d56Sopenharmony_ci self.assertTrue( 997db96d56Sopenharmony_ci resp.startswith("215 ") or resp.startswith("282 "), resp) 1007db96d56Sopenharmony_ci self.assertIsInstance(descs, dict) 1017db96d56Sopenharmony_ci desc = descs[self.GROUP_NAME] 1027db96d56Sopenharmony_ci self.assertEqual(desc, self.server.description(self.GROUP_NAME)) 1037db96d56Sopenharmony_ci 1047db96d56Sopenharmony_ci def test_group(self): 1057db96d56Sopenharmony_ci result = self.server.group(self.GROUP_NAME) 1067db96d56Sopenharmony_ci self.assertEqual(5, len(result)) 1077db96d56Sopenharmony_ci resp, count, first, last, group = result 1087db96d56Sopenharmony_ci self.assertEqual(group, self.GROUP_NAME) 1097db96d56Sopenharmony_ci self.assertIsInstance(count, int) 1107db96d56Sopenharmony_ci self.assertIsInstance(first, int) 1117db96d56Sopenharmony_ci self.assertIsInstance(last, int) 1127db96d56Sopenharmony_ci self.assertLessEqual(first, last) 1137db96d56Sopenharmony_ci self.assertTrue(resp.startswith("211 "), resp) 1147db96d56Sopenharmony_ci 1157db96d56Sopenharmony_ci def test_date(self): 1167db96d56Sopenharmony_ci resp, date = self.server.date() 1177db96d56Sopenharmony_ci self.assertIsInstance(date, datetime.datetime) 1187db96d56Sopenharmony_ci # Sanity check 1197db96d56Sopenharmony_ci self.assertGreaterEqual(date.year, 1995) 1207db96d56Sopenharmony_ci self.assertLessEqual(date.year, 2030) 1217db96d56Sopenharmony_ci 1227db96d56Sopenharmony_ci def _check_art_dict(self, art_dict): 1237db96d56Sopenharmony_ci # Some sanity checks for a field dictionary returned by OVER / XOVER 1247db96d56Sopenharmony_ci self.assertIsInstance(art_dict, dict) 1257db96d56Sopenharmony_ci # NNTP has 7 mandatory fields 1267db96d56Sopenharmony_ci self.assertGreaterEqual(art_dict.keys(), 1277db96d56Sopenharmony_ci {"subject", "from", "date", "message-id", 1287db96d56Sopenharmony_ci "references", ":bytes", ":lines"} 1297db96d56Sopenharmony_ci ) 1307db96d56Sopenharmony_ci for v in art_dict.values(): 1317db96d56Sopenharmony_ci self.assertIsInstance(v, (str, type(None))) 1327db96d56Sopenharmony_ci 1337db96d56Sopenharmony_ci def test_xover(self): 1347db96d56Sopenharmony_ci resp, count, first, last, name = self.server.group(self.GROUP_NAME) 1357db96d56Sopenharmony_ci resp, lines = self.server.xover(last - 5, last) 1367db96d56Sopenharmony_ci if len(lines) == 0: 1377db96d56Sopenharmony_ci self.skipTest("no articles retrieved") 1387db96d56Sopenharmony_ci # The 'last' article is not necessarily part of the output (cancelled?) 1397db96d56Sopenharmony_ci art_num, art_dict = lines[0] 1407db96d56Sopenharmony_ci self.assertGreaterEqual(art_num, last - 5) 1417db96d56Sopenharmony_ci self.assertLessEqual(art_num, last) 1427db96d56Sopenharmony_ci self._check_art_dict(art_dict) 1437db96d56Sopenharmony_ci 1447db96d56Sopenharmony_ci @unittest.skipIf(True, 'temporarily skipped until a permanent solution' 1457db96d56Sopenharmony_ci ' is found for issue #28971') 1467db96d56Sopenharmony_ci def test_over(self): 1477db96d56Sopenharmony_ci resp, count, first, last, name = self.server.group(self.GROUP_NAME) 1487db96d56Sopenharmony_ci start = last - 10 1497db96d56Sopenharmony_ci # The "start-" article range form 1507db96d56Sopenharmony_ci resp, lines = self.server.over((start, None)) 1517db96d56Sopenharmony_ci art_num, art_dict = lines[0] 1527db96d56Sopenharmony_ci self._check_art_dict(art_dict) 1537db96d56Sopenharmony_ci # The "start-end" article range form 1547db96d56Sopenharmony_ci resp, lines = self.server.over((start, last)) 1557db96d56Sopenharmony_ci art_num, art_dict = lines[-1] 1567db96d56Sopenharmony_ci # The 'last' article is not necessarily part of the output (cancelled?) 1577db96d56Sopenharmony_ci self.assertGreaterEqual(art_num, start) 1587db96d56Sopenharmony_ci self.assertLessEqual(art_num, last) 1597db96d56Sopenharmony_ci self._check_art_dict(art_dict) 1607db96d56Sopenharmony_ci # XXX The "message_id" form is unsupported by gmane 1617db96d56Sopenharmony_ci # 503 Overview by message-ID unsupported 1627db96d56Sopenharmony_ci 1637db96d56Sopenharmony_ci def test_xhdr(self): 1647db96d56Sopenharmony_ci resp, count, first, last, name = self.server.group(self.GROUP_NAME) 1657db96d56Sopenharmony_ci resp, lines = self.server.xhdr('subject', last) 1667db96d56Sopenharmony_ci for line in lines: 1677db96d56Sopenharmony_ci self.assertEqual(str, type(line[1])) 1687db96d56Sopenharmony_ci 1697db96d56Sopenharmony_ci def check_article_resp(self, resp, article, art_num=None): 1707db96d56Sopenharmony_ci self.assertIsInstance(article, nntplib.ArticleInfo) 1717db96d56Sopenharmony_ci if art_num is not None: 1727db96d56Sopenharmony_ci self.assertEqual(article.number, art_num) 1737db96d56Sopenharmony_ci for line in article.lines: 1747db96d56Sopenharmony_ci self.assertIsInstance(line, bytes) 1757db96d56Sopenharmony_ci # XXX this could exceptionally happen... 1767db96d56Sopenharmony_ci self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n")) 1777db96d56Sopenharmony_ci 1787db96d56Sopenharmony_ci @unittest.skipIf(True, "FIXME: see bpo-32128") 1797db96d56Sopenharmony_ci def test_article_head_body(self): 1807db96d56Sopenharmony_ci resp, count, first, last, name = self.server.group(self.GROUP_NAME) 1817db96d56Sopenharmony_ci # Try to find an available article 1827db96d56Sopenharmony_ci for art_num in (last, first, last - 1): 1837db96d56Sopenharmony_ci try: 1847db96d56Sopenharmony_ci resp, head = self.server.head(art_num) 1857db96d56Sopenharmony_ci except nntplib.NNTPTemporaryError as e: 1867db96d56Sopenharmony_ci if not e.response.startswith("423 "): 1877db96d56Sopenharmony_ci raise 1887db96d56Sopenharmony_ci # "423 No such article" => choose another one 1897db96d56Sopenharmony_ci continue 1907db96d56Sopenharmony_ci break 1917db96d56Sopenharmony_ci else: 1927db96d56Sopenharmony_ci self.skipTest("could not find a suitable article number") 1937db96d56Sopenharmony_ci self.assertTrue(resp.startswith("221 "), resp) 1947db96d56Sopenharmony_ci self.check_article_resp(resp, head, art_num) 1957db96d56Sopenharmony_ci resp, body = self.server.body(art_num) 1967db96d56Sopenharmony_ci self.assertTrue(resp.startswith("222 "), resp) 1977db96d56Sopenharmony_ci self.check_article_resp(resp, body, art_num) 1987db96d56Sopenharmony_ci resp, article = self.server.article(art_num) 1997db96d56Sopenharmony_ci self.assertTrue(resp.startswith("220 "), resp) 2007db96d56Sopenharmony_ci self.check_article_resp(resp, article, art_num) 2017db96d56Sopenharmony_ci # Tolerate running the tests from behind a NNTP virus checker 2027db96d56Sopenharmony_ci denylist = lambda line: line.startswith(b'X-Antivirus') 2037db96d56Sopenharmony_ci filtered_head_lines = [line for line in head.lines 2047db96d56Sopenharmony_ci if not denylist(line)] 2057db96d56Sopenharmony_ci filtered_lines = [line for line in article.lines 2067db96d56Sopenharmony_ci if not denylist(line)] 2077db96d56Sopenharmony_ci self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines) 2087db96d56Sopenharmony_ci 2097db96d56Sopenharmony_ci def test_capabilities(self): 2107db96d56Sopenharmony_ci # The server under test implements NNTP version 2 and has a 2117db96d56Sopenharmony_ci # couple of well-known capabilities. Just sanity check that we 2127db96d56Sopenharmony_ci # got them. 2137db96d56Sopenharmony_ci def _check_caps(caps): 2147db96d56Sopenharmony_ci caps_list = caps['LIST'] 2157db96d56Sopenharmony_ci self.assertIsInstance(caps_list, (list, tuple)) 2167db96d56Sopenharmony_ci self.assertIn('OVERVIEW.FMT', caps_list) 2177db96d56Sopenharmony_ci self.assertGreaterEqual(self.server.nntp_version, 2) 2187db96d56Sopenharmony_ci _check_caps(self.server.getcapabilities()) 2197db96d56Sopenharmony_ci # This re-emits the command 2207db96d56Sopenharmony_ci resp, caps = self.server.capabilities() 2217db96d56Sopenharmony_ci _check_caps(caps) 2227db96d56Sopenharmony_ci 2237db96d56Sopenharmony_ci def test_zlogin(self): 2247db96d56Sopenharmony_ci # This test must be the penultimate because further commands will be 2257db96d56Sopenharmony_ci # refused. 2267db96d56Sopenharmony_ci baduser = "notarealuser" 2277db96d56Sopenharmony_ci badpw = "notarealpassword" 2287db96d56Sopenharmony_ci # Check that bogus credentials cause failure 2297db96d56Sopenharmony_ci self.assertRaises(nntplib.NNTPError, self.server.login, 2307db96d56Sopenharmony_ci user=baduser, password=badpw, usenetrc=False) 2317db96d56Sopenharmony_ci # FIXME: We should check that correct credentials succeed, but that 2327db96d56Sopenharmony_ci # would require valid details for some server somewhere to be in the 2337db96d56Sopenharmony_ci # test suite, I think. Gmane is anonymous, at least as used for the 2347db96d56Sopenharmony_ci # other tests. 2357db96d56Sopenharmony_ci 2367db96d56Sopenharmony_ci def test_zzquit(self): 2377db96d56Sopenharmony_ci # This test must be called last, hence the name 2387db96d56Sopenharmony_ci cls = type(self) 2397db96d56Sopenharmony_ci try: 2407db96d56Sopenharmony_ci self.server.quit() 2417db96d56Sopenharmony_ci finally: 2427db96d56Sopenharmony_ci cls.server = None 2437db96d56Sopenharmony_ci 2447db96d56Sopenharmony_ci @classmethod 2457db96d56Sopenharmony_ci def wrap_methods(cls): 2467db96d56Sopenharmony_ci # Wrap all methods in a transient_internet() exception catcher 2477db96d56Sopenharmony_ci # XXX put a generic version in test.support? 2487db96d56Sopenharmony_ci def wrap_meth(meth): 2497db96d56Sopenharmony_ci @functools.wraps(meth) 2507db96d56Sopenharmony_ci def wrapped(self): 2517db96d56Sopenharmony_ci with socket_helper.transient_internet(self.NNTP_HOST): 2527db96d56Sopenharmony_ci meth(self) 2537db96d56Sopenharmony_ci return wrapped 2547db96d56Sopenharmony_ci for name in dir(cls): 2557db96d56Sopenharmony_ci if not name.startswith('test_'): 2567db96d56Sopenharmony_ci continue 2577db96d56Sopenharmony_ci meth = getattr(cls, name) 2587db96d56Sopenharmony_ci if not callable(meth): 2597db96d56Sopenharmony_ci continue 2607db96d56Sopenharmony_ci # Need to use a closure so that meth remains bound to its current 2617db96d56Sopenharmony_ci # value 2627db96d56Sopenharmony_ci setattr(cls, name, wrap_meth(meth)) 2637db96d56Sopenharmony_ci 2647db96d56Sopenharmony_ci def test_timeout(self): 2657db96d56Sopenharmony_ci with self.assertRaises(ValueError): 2667db96d56Sopenharmony_ci self.NNTP_CLASS(self.NNTP_HOST, timeout=0, usenetrc=False) 2677db96d56Sopenharmony_ci 2687db96d56Sopenharmony_ci def test_with_statement(self): 2697db96d56Sopenharmony_ci def is_connected(): 2707db96d56Sopenharmony_ci if not hasattr(server, 'file'): 2717db96d56Sopenharmony_ci return False 2727db96d56Sopenharmony_ci try: 2737db96d56Sopenharmony_ci server.help() 2747db96d56Sopenharmony_ci except (OSError, EOFError): 2757db96d56Sopenharmony_ci return False 2767db96d56Sopenharmony_ci return True 2777db96d56Sopenharmony_ci 2787db96d56Sopenharmony_ci kwargs = dict( 2797db96d56Sopenharmony_ci timeout=support.INTERNET_TIMEOUT, 2807db96d56Sopenharmony_ci usenetrc=False 2817db96d56Sopenharmony_ci ) 2827db96d56Sopenharmony_ci if self.ssl_context is not None: 2837db96d56Sopenharmony_ci kwargs["ssl_context"] = self.ssl_context 2847db96d56Sopenharmony_ci 2857db96d56Sopenharmony_ci try: 2867db96d56Sopenharmony_ci server = self.NNTP_CLASS(self.NNTP_HOST, **kwargs) 2877db96d56Sopenharmony_ci with server: 2887db96d56Sopenharmony_ci self.assertTrue(is_connected()) 2897db96d56Sopenharmony_ci self.assertTrue(server.help()) 2907db96d56Sopenharmony_ci self.assertFalse(is_connected()) 2917db96d56Sopenharmony_ci 2927db96d56Sopenharmony_ci server = self.NNTP_CLASS(self.NNTP_HOST, **kwargs) 2937db96d56Sopenharmony_ci with server: 2947db96d56Sopenharmony_ci server.quit() 2957db96d56Sopenharmony_ci self.assertFalse(is_connected()) 2967db96d56Sopenharmony_ci except SSLError as ssl_err: 2977db96d56Sopenharmony_ci # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small" 2987db96d56Sopenharmony_ci if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason): 2997db96d56Sopenharmony_ci raise unittest.SkipTest(f"Got {ssl_err} connecting " 3007db96d56Sopenharmony_ci f"to {self.NNTP_HOST!r}") 3017db96d56Sopenharmony_ci raise 3027db96d56Sopenharmony_ci 3037db96d56Sopenharmony_ci 3047db96d56Sopenharmony_ciNetworkedNNTPTestsMixin.wrap_methods() 3057db96d56Sopenharmony_ci 3067db96d56Sopenharmony_ci 3077db96d56Sopenharmony_ciEOF_ERRORS = (EOFError,) 3087db96d56Sopenharmony_ciif ssl is not None: 3097db96d56Sopenharmony_ci EOF_ERRORS += (ssl.SSLEOFError,) 3107db96d56Sopenharmony_ci 3117db96d56Sopenharmony_ci 3127db96d56Sopenharmony_ciclass NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase): 3137db96d56Sopenharmony_ci # This server supports STARTTLS (gmane doesn't) 3147db96d56Sopenharmony_ci NNTP_HOST = 'news.trigofacile.com' 3157db96d56Sopenharmony_ci GROUP_NAME = 'fr.comp.lang.python' 3167db96d56Sopenharmony_ci GROUP_PAT = 'fr.comp.lang.*' 3177db96d56Sopenharmony_ci DESC = 'Python' 3187db96d56Sopenharmony_ci 3197db96d56Sopenharmony_ci NNTP_CLASS = NNTP 3207db96d56Sopenharmony_ci 3217db96d56Sopenharmony_ci @classmethod 3227db96d56Sopenharmony_ci def setUpClass(cls): 3237db96d56Sopenharmony_ci support.requires("network") 3247db96d56Sopenharmony_ci kwargs = dict( 3257db96d56Sopenharmony_ci timeout=support.INTERNET_TIMEOUT, 3267db96d56Sopenharmony_ci usenetrc=False 3277db96d56Sopenharmony_ci ) 3287db96d56Sopenharmony_ci if cls.ssl_context is not None: 3297db96d56Sopenharmony_ci kwargs["ssl_context"] = cls.ssl_context 3307db96d56Sopenharmony_ci with socket_helper.transient_internet(cls.NNTP_HOST): 3317db96d56Sopenharmony_ci try: 3327db96d56Sopenharmony_ci cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, **kwargs) 3337db96d56Sopenharmony_ci except SSLError as ssl_err: 3347db96d56Sopenharmony_ci # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small" 3357db96d56Sopenharmony_ci if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason): 3367db96d56Sopenharmony_ci raise unittest.SkipTest(f"{cls} got {ssl_err} connecting " 3377db96d56Sopenharmony_ci f"to {cls.NNTP_HOST!r}") 3387db96d56Sopenharmony_ci print(cls.NNTP_HOST) 3397db96d56Sopenharmony_ci raise 3407db96d56Sopenharmony_ci except EOF_ERRORS: 3417db96d56Sopenharmony_ci raise unittest.SkipTest(f"{cls} got EOF error on connecting " 3427db96d56Sopenharmony_ci f"to {cls.NNTP_HOST!r}") 3437db96d56Sopenharmony_ci 3447db96d56Sopenharmony_ci @classmethod 3457db96d56Sopenharmony_ci def tearDownClass(cls): 3467db96d56Sopenharmony_ci if cls.server is not None: 3477db96d56Sopenharmony_ci cls.server.quit() 3487db96d56Sopenharmony_ci 3497db96d56Sopenharmony_ci@unittest.skipUnless(ssl, 'requires SSL support') 3507db96d56Sopenharmony_ciclass NetworkedNNTP_SSLTests(NetworkedNNTPTests): 3517db96d56Sopenharmony_ci 3527db96d56Sopenharmony_ci # Technical limits for this public NNTP server (see http://www.aioe.org): 3537db96d56Sopenharmony_ci # "Only two concurrent connections per IP address are allowed and 3547db96d56Sopenharmony_ci # 400 connections per day are accepted from each IP address." 3557db96d56Sopenharmony_ci 3567db96d56Sopenharmony_ci NNTP_HOST = 'nntp.aioe.org' 3577db96d56Sopenharmony_ci # bpo-42794: aioe.test is one of the official groups on this server 3587db96d56Sopenharmony_ci # used for testing: https://news.aioe.org/manual/aioe-hierarchy/ 3597db96d56Sopenharmony_ci GROUP_NAME = 'aioe.test' 3607db96d56Sopenharmony_ci GROUP_PAT = 'aioe.*' 3617db96d56Sopenharmony_ci DESC = 'test' 3627db96d56Sopenharmony_ci 3637db96d56Sopenharmony_ci NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None) 3647db96d56Sopenharmony_ci 3657db96d56Sopenharmony_ci # Disabled as it produces too much data 3667db96d56Sopenharmony_ci test_list = None 3677db96d56Sopenharmony_ci 3687db96d56Sopenharmony_ci # Disabled as the connection will already be encrypted. 3697db96d56Sopenharmony_ci test_starttls = None 3707db96d56Sopenharmony_ci 3717db96d56Sopenharmony_ci if ssl is not None: 3727db96d56Sopenharmony_ci ssl_context = ssl._create_unverified_context() 3737db96d56Sopenharmony_ci ssl_context.set_ciphers("DEFAULT") 3747db96d56Sopenharmony_ci ssl_context.maximum_version = ssl.TLSVersion.TLSv1_2 3757db96d56Sopenharmony_ci 3767db96d56Sopenharmony_ci# 3777db96d56Sopenharmony_ci# Non-networked tests using a local server (or something mocking it). 3787db96d56Sopenharmony_ci# 3797db96d56Sopenharmony_ci 3807db96d56Sopenharmony_ciclass _NNTPServerIO(io.RawIOBase): 3817db96d56Sopenharmony_ci """A raw IO object allowing NNTP commands to be received and processed 3827db96d56Sopenharmony_ci by a handler. The handler can push responses which can then be read 3837db96d56Sopenharmony_ci from the IO object.""" 3847db96d56Sopenharmony_ci 3857db96d56Sopenharmony_ci def __init__(self, handler): 3867db96d56Sopenharmony_ci io.RawIOBase.__init__(self) 3877db96d56Sopenharmony_ci # The channel from the client 3887db96d56Sopenharmony_ci self.c2s = io.BytesIO() 3897db96d56Sopenharmony_ci # The channel to the client 3907db96d56Sopenharmony_ci self.s2c = io.BytesIO() 3917db96d56Sopenharmony_ci self.handler = handler 3927db96d56Sopenharmony_ci self.handler.start(self.c2s.readline, self.push_data) 3937db96d56Sopenharmony_ci 3947db96d56Sopenharmony_ci def readable(self): 3957db96d56Sopenharmony_ci return True 3967db96d56Sopenharmony_ci 3977db96d56Sopenharmony_ci def writable(self): 3987db96d56Sopenharmony_ci return True 3997db96d56Sopenharmony_ci 4007db96d56Sopenharmony_ci def push_data(self, data): 4017db96d56Sopenharmony_ci """Push (buffer) some data to send to the client.""" 4027db96d56Sopenharmony_ci pos = self.s2c.tell() 4037db96d56Sopenharmony_ci self.s2c.seek(0, 2) 4047db96d56Sopenharmony_ci self.s2c.write(data) 4057db96d56Sopenharmony_ci self.s2c.seek(pos) 4067db96d56Sopenharmony_ci 4077db96d56Sopenharmony_ci def write(self, b): 4087db96d56Sopenharmony_ci """The client sends us some data""" 4097db96d56Sopenharmony_ci pos = self.c2s.tell() 4107db96d56Sopenharmony_ci self.c2s.write(b) 4117db96d56Sopenharmony_ci self.c2s.seek(pos) 4127db96d56Sopenharmony_ci self.handler.process_pending() 4137db96d56Sopenharmony_ci return len(b) 4147db96d56Sopenharmony_ci 4157db96d56Sopenharmony_ci def readinto(self, buf): 4167db96d56Sopenharmony_ci """The client wants to read a response""" 4177db96d56Sopenharmony_ci self.handler.process_pending() 4187db96d56Sopenharmony_ci b = self.s2c.read(len(buf)) 4197db96d56Sopenharmony_ci n = len(b) 4207db96d56Sopenharmony_ci buf[:n] = b 4217db96d56Sopenharmony_ci return n 4227db96d56Sopenharmony_ci 4237db96d56Sopenharmony_ci 4247db96d56Sopenharmony_cidef make_mock_file(handler): 4257db96d56Sopenharmony_ci sio = _NNTPServerIO(handler) 4267db96d56Sopenharmony_ci # Using BufferedRWPair instead of BufferedRandom ensures the file 4277db96d56Sopenharmony_ci # isn't seekable. 4287db96d56Sopenharmony_ci file = io.BufferedRWPair(sio, sio) 4297db96d56Sopenharmony_ci return (sio, file) 4307db96d56Sopenharmony_ci 4317db96d56Sopenharmony_ci 4327db96d56Sopenharmony_ciclass NNTPServer(nntplib.NNTP): 4337db96d56Sopenharmony_ci 4347db96d56Sopenharmony_ci def __init__(self, f, host, readermode=None): 4357db96d56Sopenharmony_ci self.file = f 4367db96d56Sopenharmony_ci self.host = host 4377db96d56Sopenharmony_ci self._base_init(readermode) 4387db96d56Sopenharmony_ci 4397db96d56Sopenharmony_ci def _close(self): 4407db96d56Sopenharmony_ci self.file.close() 4417db96d56Sopenharmony_ci del self.file 4427db96d56Sopenharmony_ci 4437db96d56Sopenharmony_ci 4447db96d56Sopenharmony_ciclass MockedNNTPTestsMixin: 4457db96d56Sopenharmony_ci # Override in derived classes 4467db96d56Sopenharmony_ci handler_class = None 4477db96d56Sopenharmony_ci 4487db96d56Sopenharmony_ci def setUp(self): 4497db96d56Sopenharmony_ci super().setUp() 4507db96d56Sopenharmony_ci self.make_server() 4517db96d56Sopenharmony_ci 4527db96d56Sopenharmony_ci def tearDown(self): 4537db96d56Sopenharmony_ci super().tearDown() 4547db96d56Sopenharmony_ci del self.server 4557db96d56Sopenharmony_ci 4567db96d56Sopenharmony_ci def make_server(self, *args, **kwargs): 4577db96d56Sopenharmony_ci self.handler = self.handler_class() 4587db96d56Sopenharmony_ci self.sio, file = make_mock_file(self.handler) 4597db96d56Sopenharmony_ci self.server = NNTPServer(file, 'test.server', *args, **kwargs) 4607db96d56Sopenharmony_ci return self.server 4617db96d56Sopenharmony_ci 4627db96d56Sopenharmony_ci 4637db96d56Sopenharmony_ciclass MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin): 4647db96d56Sopenharmony_ci def setUp(self): 4657db96d56Sopenharmony_ci super().setUp() 4667db96d56Sopenharmony_ci self.make_server(readermode=True) 4677db96d56Sopenharmony_ci 4687db96d56Sopenharmony_ci 4697db96d56Sopenharmony_ciclass NNTPv1Handler: 4707db96d56Sopenharmony_ci """A handler for RFC 977""" 4717db96d56Sopenharmony_ci 4727db96d56Sopenharmony_ci welcome = "200 NNTP mock server" 4737db96d56Sopenharmony_ci 4747db96d56Sopenharmony_ci def start(self, readline, push_data): 4757db96d56Sopenharmony_ci self.in_body = False 4767db96d56Sopenharmony_ci self.allow_posting = True 4777db96d56Sopenharmony_ci self._readline = readline 4787db96d56Sopenharmony_ci self._push_data = push_data 4797db96d56Sopenharmony_ci self._logged_in = False 4807db96d56Sopenharmony_ci self._user_sent = False 4817db96d56Sopenharmony_ci # Our welcome 4827db96d56Sopenharmony_ci self.handle_welcome() 4837db96d56Sopenharmony_ci 4847db96d56Sopenharmony_ci def _decode(self, data): 4857db96d56Sopenharmony_ci return str(data, "utf-8", "surrogateescape") 4867db96d56Sopenharmony_ci 4877db96d56Sopenharmony_ci def process_pending(self): 4887db96d56Sopenharmony_ci if self.in_body: 4897db96d56Sopenharmony_ci while True: 4907db96d56Sopenharmony_ci line = self._readline() 4917db96d56Sopenharmony_ci if not line: 4927db96d56Sopenharmony_ci return 4937db96d56Sopenharmony_ci self.body.append(line) 4947db96d56Sopenharmony_ci if line == b".\r\n": 4957db96d56Sopenharmony_ci break 4967db96d56Sopenharmony_ci try: 4977db96d56Sopenharmony_ci meth, tokens = self.body_callback 4987db96d56Sopenharmony_ci meth(*tokens, body=self.body) 4997db96d56Sopenharmony_ci finally: 5007db96d56Sopenharmony_ci self.body_callback = None 5017db96d56Sopenharmony_ci self.body = None 5027db96d56Sopenharmony_ci self.in_body = False 5037db96d56Sopenharmony_ci while True: 5047db96d56Sopenharmony_ci line = self._decode(self._readline()) 5057db96d56Sopenharmony_ci if not line: 5067db96d56Sopenharmony_ci return 5077db96d56Sopenharmony_ci if not line.endswith("\r\n"): 5087db96d56Sopenharmony_ci raise ValueError("line doesn't end with \\r\\n: {!r}".format(line)) 5097db96d56Sopenharmony_ci line = line[:-2] 5107db96d56Sopenharmony_ci cmd, *tokens = line.split() 5117db96d56Sopenharmony_ci #meth = getattr(self.handler, "handle_" + cmd.upper(), None) 5127db96d56Sopenharmony_ci meth = getattr(self, "handle_" + cmd.upper(), None) 5137db96d56Sopenharmony_ci if meth is None: 5147db96d56Sopenharmony_ci self.handle_unknown() 5157db96d56Sopenharmony_ci else: 5167db96d56Sopenharmony_ci try: 5177db96d56Sopenharmony_ci meth(*tokens) 5187db96d56Sopenharmony_ci except Exception as e: 5197db96d56Sopenharmony_ci raise ValueError("command failed: {!r}".format(line)) from e 5207db96d56Sopenharmony_ci else: 5217db96d56Sopenharmony_ci if self.in_body: 5227db96d56Sopenharmony_ci self.body_callback = meth, tokens 5237db96d56Sopenharmony_ci self.body = [] 5247db96d56Sopenharmony_ci 5257db96d56Sopenharmony_ci def expect_body(self): 5267db96d56Sopenharmony_ci """Flag that the client is expected to post a request body""" 5277db96d56Sopenharmony_ci self.in_body = True 5287db96d56Sopenharmony_ci 5297db96d56Sopenharmony_ci def push_data(self, data): 5307db96d56Sopenharmony_ci """Push some binary data""" 5317db96d56Sopenharmony_ci self._push_data(data) 5327db96d56Sopenharmony_ci 5337db96d56Sopenharmony_ci def push_lit(self, lit): 5347db96d56Sopenharmony_ci """Push a string literal""" 5357db96d56Sopenharmony_ci lit = textwrap.dedent(lit) 5367db96d56Sopenharmony_ci lit = "\r\n".join(lit.splitlines()) + "\r\n" 5377db96d56Sopenharmony_ci lit = lit.encode('utf-8') 5387db96d56Sopenharmony_ci self.push_data(lit) 5397db96d56Sopenharmony_ci 5407db96d56Sopenharmony_ci def handle_unknown(self): 5417db96d56Sopenharmony_ci self.push_lit("500 What?") 5427db96d56Sopenharmony_ci 5437db96d56Sopenharmony_ci def handle_welcome(self): 5447db96d56Sopenharmony_ci self.push_lit(self.welcome) 5457db96d56Sopenharmony_ci 5467db96d56Sopenharmony_ci def handle_QUIT(self): 5477db96d56Sopenharmony_ci self.push_lit("205 Bye!") 5487db96d56Sopenharmony_ci 5497db96d56Sopenharmony_ci def handle_DATE(self): 5507db96d56Sopenharmony_ci self.push_lit("111 20100914001155") 5517db96d56Sopenharmony_ci 5527db96d56Sopenharmony_ci def handle_GROUP(self, group): 5537db96d56Sopenharmony_ci if group == "fr.comp.lang.python": 5547db96d56Sopenharmony_ci self.push_lit("211 486 761 1265 fr.comp.lang.python") 5557db96d56Sopenharmony_ci else: 5567db96d56Sopenharmony_ci self.push_lit("411 No such group {}".format(group)) 5577db96d56Sopenharmony_ci 5587db96d56Sopenharmony_ci def handle_HELP(self): 5597db96d56Sopenharmony_ci self.push_lit("""\ 5607db96d56Sopenharmony_ci 100 Legal commands 5617db96d56Sopenharmony_ci authinfo user Name|pass Password|generic <prog> <args> 5627db96d56Sopenharmony_ci date 5637db96d56Sopenharmony_ci help 5647db96d56Sopenharmony_ci Report problems to <root@example.org> 5657db96d56Sopenharmony_ci .""") 5667db96d56Sopenharmony_ci 5677db96d56Sopenharmony_ci def handle_STAT(self, message_spec=None): 5687db96d56Sopenharmony_ci if message_spec is None: 5697db96d56Sopenharmony_ci self.push_lit("412 No newsgroup selected") 5707db96d56Sopenharmony_ci elif message_spec == "3000234": 5717db96d56Sopenharmony_ci self.push_lit("223 3000234 <45223423@example.com>") 5727db96d56Sopenharmony_ci elif message_spec == "<45223423@example.com>": 5737db96d56Sopenharmony_ci self.push_lit("223 0 <45223423@example.com>") 5747db96d56Sopenharmony_ci else: 5757db96d56Sopenharmony_ci self.push_lit("430 No Such Article Found") 5767db96d56Sopenharmony_ci 5777db96d56Sopenharmony_ci def handle_NEXT(self): 5787db96d56Sopenharmony_ci self.push_lit("223 3000237 <668929@example.org> retrieved") 5797db96d56Sopenharmony_ci 5807db96d56Sopenharmony_ci def handle_LAST(self): 5817db96d56Sopenharmony_ci self.push_lit("223 3000234 <45223423@example.com> retrieved") 5827db96d56Sopenharmony_ci 5837db96d56Sopenharmony_ci def handle_LIST(self, action=None, param=None): 5847db96d56Sopenharmony_ci if action is None: 5857db96d56Sopenharmony_ci self.push_lit("""\ 5867db96d56Sopenharmony_ci 215 Newsgroups in form "group high low flags". 5877db96d56Sopenharmony_ci comp.lang.python 0000052340 0000002828 y 5887db96d56Sopenharmony_ci comp.lang.python.announce 0000001153 0000000993 m 5897db96d56Sopenharmony_ci free.it.comp.lang.python 0000000002 0000000002 y 5907db96d56Sopenharmony_ci fr.comp.lang.python 0000001254 0000000760 y 5917db96d56Sopenharmony_ci free.it.comp.lang.python.learner 0000000000 0000000001 y 5927db96d56Sopenharmony_ci tw.bbs.comp.lang.python 0000000304 0000000304 y 5937db96d56Sopenharmony_ci .""") 5947db96d56Sopenharmony_ci elif action == "ACTIVE": 5957db96d56Sopenharmony_ci if param == "*distutils*": 5967db96d56Sopenharmony_ci self.push_lit("""\ 5977db96d56Sopenharmony_ci 215 Newsgroups in form "group high low flags" 5987db96d56Sopenharmony_ci gmane.comp.python.distutils.devel 0000014104 0000000001 m 5997db96d56Sopenharmony_ci gmane.comp.python.distutils.cvs 0000000000 0000000001 m 6007db96d56Sopenharmony_ci .""") 6017db96d56Sopenharmony_ci else: 6027db96d56Sopenharmony_ci self.push_lit("""\ 6037db96d56Sopenharmony_ci 215 Newsgroups in form "group high low flags" 6047db96d56Sopenharmony_ci .""") 6057db96d56Sopenharmony_ci elif action == "OVERVIEW.FMT": 6067db96d56Sopenharmony_ci self.push_lit("""\ 6077db96d56Sopenharmony_ci 215 Order of fields in overview database. 6087db96d56Sopenharmony_ci Subject: 6097db96d56Sopenharmony_ci From: 6107db96d56Sopenharmony_ci Date: 6117db96d56Sopenharmony_ci Message-ID: 6127db96d56Sopenharmony_ci References: 6137db96d56Sopenharmony_ci Bytes: 6147db96d56Sopenharmony_ci Lines: 6157db96d56Sopenharmony_ci Xref:full 6167db96d56Sopenharmony_ci .""") 6177db96d56Sopenharmony_ci elif action == "NEWSGROUPS": 6187db96d56Sopenharmony_ci assert param is not None 6197db96d56Sopenharmony_ci if param == "comp.lang.python": 6207db96d56Sopenharmony_ci self.push_lit("""\ 6217db96d56Sopenharmony_ci 215 Descriptions in form "group description". 6227db96d56Sopenharmony_ci comp.lang.python\tThe Python computer language. 6237db96d56Sopenharmony_ci .""") 6247db96d56Sopenharmony_ci elif param == "comp.lang.python*": 6257db96d56Sopenharmony_ci self.push_lit("""\ 6267db96d56Sopenharmony_ci 215 Descriptions in form "group description". 6277db96d56Sopenharmony_ci comp.lang.python.announce\tAnnouncements about the Python language. (Moderated) 6287db96d56Sopenharmony_ci comp.lang.python\tThe Python computer language. 6297db96d56Sopenharmony_ci .""") 6307db96d56Sopenharmony_ci else: 6317db96d56Sopenharmony_ci self.push_lit("""\ 6327db96d56Sopenharmony_ci 215 Descriptions in form "group description". 6337db96d56Sopenharmony_ci .""") 6347db96d56Sopenharmony_ci else: 6357db96d56Sopenharmony_ci self.push_lit('501 Unknown LIST keyword') 6367db96d56Sopenharmony_ci 6377db96d56Sopenharmony_ci def handle_NEWNEWS(self, group, date_str, time_str): 6387db96d56Sopenharmony_ci # We hard code different return messages depending on passed 6397db96d56Sopenharmony_ci # argument and date syntax. 6407db96d56Sopenharmony_ci if (group == "comp.lang.python" and date_str == "20100913" 6417db96d56Sopenharmony_ci and time_str == "082004"): 6427db96d56Sopenharmony_ci # Date was passed in RFC 3977 format (NNTP "v2") 6437db96d56Sopenharmony_ci self.push_lit("""\ 6447db96d56Sopenharmony_ci 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows 6457db96d56Sopenharmony_ci <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com> 6467db96d56Sopenharmony_ci <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com> 6477db96d56Sopenharmony_ci .""") 6487db96d56Sopenharmony_ci elif (group == "comp.lang.python" and date_str == "100913" 6497db96d56Sopenharmony_ci and time_str == "082004"): 6507db96d56Sopenharmony_ci # Date was passed in RFC 977 format (NNTP "v1") 6517db96d56Sopenharmony_ci self.push_lit("""\ 6527db96d56Sopenharmony_ci 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows 6537db96d56Sopenharmony_ci <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com> 6547db96d56Sopenharmony_ci <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com> 6557db96d56Sopenharmony_ci .""") 6567db96d56Sopenharmony_ci elif (group == 'comp.lang.python' and 6577db96d56Sopenharmony_ci date_str in ('20100101', '100101') and 6587db96d56Sopenharmony_ci time_str == '090000'): 6597db96d56Sopenharmony_ci self.push_lit('too long line' * 3000 + 6607db96d56Sopenharmony_ci '\n.') 6617db96d56Sopenharmony_ci else: 6627db96d56Sopenharmony_ci self.push_lit("""\ 6637db96d56Sopenharmony_ci 230 An empty list of newsarticles follows 6647db96d56Sopenharmony_ci .""") 6657db96d56Sopenharmony_ci # (Note for experiments: many servers disable NEWNEWS. 6667db96d56Sopenharmony_ci # As of this writing, sicinfo3.epfl.ch doesn't.) 6677db96d56Sopenharmony_ci 6687db96d56Sopenharmony_ci def handle_XOVER(self, message_spec): 6697db96d56Sopenharmony_ci if message_spec == "57-59": 6707db96d56Sopenharmony_ci self.push_lit( 6717db96d56Sopenharmony_ci "224 Overview information for 57-58 follows\n" 6727db96d56Sopenharmony_ci "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout" 6737db96d56Sopenharmony_ci "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>" 6747db96d56Sopenharmony_ci "\tSat, 19 Jun 2010 18:04:08 -0400" 6757db96d56Sopenharmony_ci "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>" 6767db96d56Sopenharmony_ci "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16" 6777db96d56Sopenharmony_ci "\tXref: news.gmane.io gmane.comp.python.authors:57" 6787db96d56Sopenharmony_ci "\n" 6797db96d56Sopenharmony_ci "58\tLooking for a few good bloggers" 6807db96d56Sopenharmony_ci "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>" 6817db96d56Sopenharmony_ci "\tThu, 22 Jul 2010 09:14:14 -0400" 6827db96d56Sopenharmony_ci "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>" 6837db96d56Sopenharmony_ci "\t\t6683\t16" 6847db96d56Sopenharmony_ci "\t" 6857db96d56Sopenharmony_ci "\n" 6867db96d56Sopenharmony_ci # A UTF-8 overview line from fr.comp.lang.python 6877db96d56Sopenharmony_ci "59\tRe: Message d'erreur incompréhensible (par moi)" 6887db96d56Sopenharmony_ci "\tEric Brunel <eric.brunel@pragmadev.nospam.com>" 6897db96d56Sopenharmony_ci "\tWed, 15 Sep 2010 18:09:15 +0200" 6907db96d56Sopenharmony_ci "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>" 6917db96d56Sopenharmony_ci "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27" 6927db96d56Sopenharmony_ci "\tXref: saria.nerim.net fr.comp.lang.python:1265" 6937db96d56Sopenharmony_ci "\n" 6947db96d56Sopenharmony_ci ".\n") 6957db96d56Sopenharmony_ci else: 6967db96d56Sopenharmony_ci self.push_lit("""\ 6977db96d56Sopenharmony_ci 224 No articles 6987db96d56Sopenharmony_ci .""") 6997db96d56Sopenharmony_ci 7007db96d56Sopenharmony_ci def handle_POST(self, *, body=None): 7017db96d56Sopenharmony_ci if body is None: 7027db96d56Sopenharmony_ci if self.allow_posting: 7037db96d56Sopenharmony_ci self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>") 7047db96d56Sopenharmony_ci self.expect_body() 7057db96d56Sopenharmony_ci else: 7067db96d56Sopenharmony_ci self.push_lit("440 Posting not permitted") 7077db96d56Sopenharmony_ci else: 7087db96d56Sopenharmony_ci assert self.allow_posting 7097db96d56Sopenharmony_ci self.push_lit("240 Article received OK") 7107db96d56Sopenharmony_ci self.posted_body = body 7117db96d56Sopenharmony_ci 7127db96d56Sopenharmony_ci def handle_IHAVE(self, message_id, *, body=None): 7137db96d56Sopenharmony_ci if body is None: 7147db96d56Sopenharmony_ci if (self.allow_posting and 7157db96d56Sopenharmony_ci message_id == "<i.am.an.article.you.will.want@example.com>"): 7167db96d56Sopenharmony_ci self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>") 7177db96d56Sopenharmony_ci self.expect_body() 7187db96d56Sopenharmony_ci else: 7197db96d56Sopenharmony_ci self.push_lit("435 Article not wanted") 7207db96d56Sopenharmony_ci else: 7217db96d56Sopenharmony_ci assert self.allow_posting 7227db96d56Sopenharmony_ci self.push_lit("235 Article transferred OK") 7237db96d56Sopenharmony_ci self.posted_body = body 7247db96d56Sopenharmony_ci 7257db96d56Sopenharmony_ci sample_head = """\ 7267db96d56Sopenharmony_ci From: "Demo User" <nobody@example.net> 7277db96d56Sopenharmony_ci Subject: I am just a test article 7287db96d56Sopenharmony_ci Content-Type: text/plain; charset=UTF-8; format=flowed 7297db96d56Sopenharmony_ci Message-ID: <i.am.an.article.you.will.want@example.com>""" 7307db96d56Sopenharmony_ci 7317db96d56Sopenharmony_ci sample_body = """\ 7327db96d56Sopenharmony_ci This is just a test article. 7337db96d56Sopenharmony_ci ..Here is a dot-starting line. 7347db96d56Sopenharmony_ci 7357db96d56Sopenharmony_ci -- Signed by Andr\xe9.""" 7367db96d56Sopenharmony_ci 7377db96d56Sopenharmony_ci sample_article = sample_head + "\n\n" + sample_body 7387db96d56Sopenharmony_ci 7397db96d56Sopenharmony_ci def handle_ARTICLE(self, message_spec=None): 7407db96d56Sopenharmony_ci if message_spec is None: 7417db96d56Sopenharmony_ci self.push_lit("220 3000237 <45223423@example.com>") 7427db96d56Sopenharmony_ci elif message_spec == "<45223423@example.com>": 7437db96d56Sopenharmony_ci self.push_lit("220 0 <45223423@example.com>") 7447db96d56Sopenharmony_ci elif message_spec == "3000234": 7457db96d56Sopenharmony_ci self.push_lit("220 3000234 <45223423@example.com>") 7467db96d56Sopenharmony_ci else: 7477db96d56Sopenharmony_ci self.push_lit("430 No Such Article Found") 7487db96d56Sopenharmony_ci return 7497db96d56Sopenharmony_ci self.push_lit(self.sample_article) 7507db96d56Sopenharmony_ci self.push_lit(".") 7517db96d56Sopenharmony_ci 7527db96d56Sopenharmony_ci def handle_HEAD(self, message_spec=None): 7537db96d56Sopenharmony_ci if message_spec is None: 7547db96d56Sopenharmony_ci self.push_lit("221 3000237 <45223423@example.com>") 7557db96d56Sopenharmony_ci elif message_spec == "<45223423@example.com>": 7567db96d56Sopenharmony_ci self.push_lit("221 0 <45223423@example.com>") 7577db96d56Sopenharmony_ci elif message_spec == "3000234": 7587db96d56Sopenharmony_ci self.push_lit("221 3000234 <45223423@example.com>") 7597db96d56Sopenharmony_ci else: 7607db96d56Sopenharmony_ci self.push_lit("430 No Such Article Found") 7617db96d56Sopenharmony_ci return 7627db96d56Sopenharmony_ci self.push_lit(self.sample_head) 7637db96d56Sopenharmony_ci self.push_lit(".") 7647db96d56Sopenharmony_ci 7657db96d56Sopenharmony_ci def handle_BODY(self, message_spec=None): 7667db96d56Sopenharmony_ci if message_spec is None: 7677db96d56Sopenharmony_ci self.push_lit("222 3000237 <45223423@example.com>") 7687db96d56Sopenharmony_ci elif message_spec == "<45223423@example.com>": 7697db96d56Sopenharmony_ci self.push_lit("222 0 <45223423@example.com>") 7707db96d56Sopenharmony_ci elif message_spec == "3000234": 7717db96d56Sopenharmony_ci self.push_lit("222 3000234 <45223423@example.com>") 7727db96d56Sopenharmony_ci else: 7737db96d56Sopenharmony_ci self.push_lit("430 No Such Article Found") 7747db96d56Sopenharmony_ci return 7757db96d56Sopenharmony_ci self.push_lit(self.sample_body) 7767db96d56Sopenharmony_ci self.push_lit(".") 7777db96d56Sopenharmony_ci 7787db96d56Sopenharmony_ci def handle_AUTHINFO(self, cred_type, data): 7797db96d56Sopenharmony_ci if self._logged_in: 7807db96d56Sopenharmony_ci self.push_lit('502 Already Logged In') 7817db96d56Sopenharmony_ci elif cred_type == 'user': 7827db96d56Sopenharmony_ci if self._user_sent: 7837db96d56Sopenharmony_ci self.push_lit('482 User Credential Already Sent') 7847db96d56Sopenharmony_ci else: 7857db96d56Sopenharmony_ci self.push_lit('381 Password Required') 7867db96d56Sopenharmony_ci self._user_sent = True 7877db96d56Sopenharmony_ci elif cred_type == 'pass': 7887db96d56Sopenharmony_ci self.push_lit('281 Login Successful') 7897db96d56Sopenharmony_ci self._logged_in = True 7907db96d56Sopenharmony_ci else: 7917db96d56Sopenharmony_ci raise Exception('Unknown cred type {}'.format(cred_type)) 7927db96d56Sopenharmony_ci 7937db96d56Sopenharmony_ci 7947db96d56Sopenharmony_ciclass NNTPv2Handler(NNTPv1Handler): 7957db96d56Sopenharmony_ci """A handler for RFC 3977 (NNTP "v2")""" 7967db96d56Sopenharmony_ci 7977db96d56Sopenharmony_ci def handle_CAPABILITIES(self): 7987db96d56Sopenharmony_ci fmt = """\ 7997db96d56Sopenharmony_ci 101 Capability list: 8007db96d56Sopenharmony_ci VERSION 2 3 8017db96d56Sopenharmony_ci IMPLEMENTATION INN 2.5.1{} 8027db96d56Sopenharmony_ci HDR 8037db96d56Sopenharmony_ci LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT 8047db96d56Sopenharmony_ci OVER 8057db96d56Sopenharmony_ci POST 8067db96d56Sopenharmony_ci READER 8077db96d56Sopenharmony_ci .""" 8087db96d56Sopenharmony_ci 8097db96d56Sopenharmony_ci if not self._logged_in: 8107db96d56Sopenharmony_ci self.push_lit(fmt.format('\n AUTHINFO USER')) 8117db96d56Sopenharmony_ci else: 8127db96d56Sopenharmony_ci self.push_lit(fmt.format('')) 8137db96d56Sopenharmony_ci 8147db96d56Sopenharmony_ci def handle_MODE(self, _): 8157db96d56Sopenharmony_ci raise Exception('MODE READER sent despite READER has been advertised') 8167db96d56Sopenharmony_ci 8177db96d56Sopenharmony_ci def handle_OVER(self, message_spec=None): 8187db96d56Sopenharmony_ci return self.handle_XOVER(message_spec) 8197db96d56Sopenharmony_ci 8207db96d56Sopenharmony_ci 8217db96d56Sopenharmony_ciclass CapsAfterLoginNNTPv2Handler(NNTPv2Handler): 8227db96d56Sopenharmony_ci """A handler that allows CAPABILITIES only after login""" 8237db96d56Sopenharmony_ci 8247db96d56Sopenharmony_ci def handle_CAPABILITIES(self): 8257db96d56Sopenharmony_ci if not self._logged_in: 8267db96d56Sopenharmony_ci self.push_lit('480 You must log in.') 8277db96d56Sopenharmony_ci else: 8287db96d56Sopenharmony_ci super().handle_CAPABILITIES() 8297db96d56Sopenharmony_ci 8307db96d56Sopenharmony_ci 8317db96d56Sopenharmony_ciclass ModeSwitchingNNTPv2Handler(NNTPv2Handler): 8327db96d56Sopenharmony_ci """A server that starts in transit mode""" 8337db96d56Sopenharmony_ci 8347db96d56Sopenharmony_ci def __init__(self): 8357db96d56Sopenharmony_ci self._switched = False 8367db96d56Sopenharmony_ci 8377db96d56Sopenharmony_ci def handle_CAPABILITIES(self): 8387db96d56Sopenharmony_ci fmt = """\ 8397db96d56Sopenharmony_ci 101 Capability list: 8407db96d56Sopenharmony_ci VERSION 2 3 8417db96d56Sopenharmony_ci IMPLEMENTATION INN 2.5.1 8427db96d56Sopenharmony_ci HDR 8437db96d56Sopenharmony_ci LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT 8447db96d56Sopenharmony_ci OVER 8457db96d56Sopenharmony_ci POST 8467db96d56Sopenharmony_ci {}READER 8477db96d56Sopenharmony_ci .""" 8487db96d56Sopenharmony_ci if self._switched: 8497db96d56Sopenharmony_ci self.push_lit(fmt.format('')) 8507db96d56Sopenharmony_ci else: 8517db96d56Sopenharmony_ci self.push_lit(fmt.format('MODE-')) 8527db96d56Sopenharmony_ci 8537db96d56Sopenharmony_ci def handle_MODE(self, what): 8547db96d56Sopenharmony_ci assert not self._switched and what == 'reader' 8557db96d56Sopenharmony_ci self._switched = True 8567db96d56Sopenharmony_ci self.push_lit('200 Posting allowed') 8577db96d56Sopenharmony_ci 8587db96d56Sopenharmony_ci 8597db96d56Sopenharmony_ciclass NNTPv1v2TestsMixin: 8607db96d56Sopenharmony_ci 8617db96d56Sopenharmony_ci def setUp(self): 8627db96d56Sopenharmony_ci super().setUp() 8637db96d56Sopenharmony_ci 8647db96d56Sopenharmony_ci def test_welcome(self): 8657db96d56Sopenharmony_ci self.assertEqual(self.server.welcome, self.handler.welcome) 8667db96d56Sopenharmony_ci 8677db96d56Sopenharmony_ci def test_authinfo(self): 8687db96d56Sopenharmony_ci if self.nntp_version == 2: 8697db96d56Sopenharmony_ci self.assertIn('AUTHINFO', self.server._caps) 8707db96d56Sopenharmony_ci self.server.login('testuser', 'testpw') 8717db96d56Sopenharmony_ci # if AUTHINFO is gone from _caps we also know that getcapabilities() 8727db96d56Sopenharmony_ci # has been called after login as it should 8737db96d56Sopenharmony_ci self.assertNotIn('AUTHINFO', self.server._caps) 8747db96d56Sopenharmony_ci 8757db96d56Sopenharmony_ci def test_date(self): 8767db96d56Sopenharmony_ci resp, date = self.server.date() 8777db96d56Sopenharmony_ci self.assertEqual(resp, "111 20100914001155") 8787db96d56Sopenharmony_ci self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55)) 8797db96d56Sopenharmony_ci 8807db96d56Sopenharmony_ci def test_quit(self): 8817db96d56Sopenharmony_ci self.assertFalse(self.sio.closed) 8827db96d56Sopenharmony_ci resp = self.server.quit() 8837db96d56Sopenharmony_ci self.assertEqual(resp, "205 Bye!") 8847db96d56Sopenharmony_ci self.assertTrue(self.sio.closed) 8857db96d56Sopenharmony_ci 8867db96d56Sopenharmony_ci def test_help(self): 8877db96d56Sopenharmony_ci resp, help = self.server.help() 8887db96d56Sopenharmony_ci self.assertEqual(resp, "100 Legal commands") 8897db96d56Sopenharmony_ci self.assertEqual(help, [ 8907db96d56Sopenharmony_ci ' authinfo user Name|pass Password|generic <prog> <args>', 8917db96d56Sopenharmony_ci ' date', 8927db96d56Sopenharmony_ci ' help', 8937db96d56Sopenharmony_ci 'Report problems to <root@example.org>', 8947db96d56Sopenharmony_ci ]) 8957db96d56Sopenharmony_ci 8967db96d56Sopenharmony_ci def test_list(self): 8977db96d56Sopenharmony_ci resp, groups = self.server.list() 8987db96d56Sopenharmony_ci self.assertEqual(len(groups), 6) 8997db96d56Sopenharmony_ci g = groups[1] 9007db96d56Sopenharmony_ci self.assertEqual(g, 9017db96d56Sopenharmony_ci GroupInfo("comp.lang.python.announce", "0000001153", 9027db96d56Sopenharmony_ci "0000000993", "m")) 9037db96d56Sopenharmony_ci resp, groups = self.server.list("*distutils*") 9047db96d56Sopenharmony_ci self.assertEqual(len(groups), 2) 9057db96d56Sopenharmony_ci g = groups[0] 9067db96d56Sopenharmony_ci self.assertEqual(g, 9077db96d56Sopenharmony_ci GroupInfo("gmane.comp.python.distutils.devel", "0000014104", 9087db96d56Sopenharmony_ci "0000000001", "m")) 9097db96d56Sopenharmony_ci 9107db96d56Sopenharmony_ci def test_stat(self): 9117db96d56Sopenharmony_ci resp, art_num, message_id = self.server.stat(3000234) 9127db96d56Sopenharmony_ci self.assertEqual(resp, "223 3000234 <45223423@example.com>") 9137db96d56Sopenharmony_ci self.assertEqual(art_num, 3000234) 9147db96d56Sopenharmony_ci self.assertEqual(message_id, "<45223423@example.com>") 9157db96d56Sopenharmony_ci resp, art_num, message_id = self.server.stat("<45223423@example.com>") 9167db96d56Sopenharmony_ci self.assertEqual(resp, "223 0 <45223423@example.com>") 9177db96d56Sopenharmony_ci self.assertEqual(art_num, 0) 9187db96d56Sopenharmony_ci self.assertEqual(message_id, "<45223423@example.com>") 9197db96d56Sopenharmony_ci with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 9207db96d56Sopenharmony_ci self.server.stat("<non.existent.id>") 9217db96d56Sopenharmony_ci self.assertEqual(cm.exception.response, "430 No Such Article Found") 9227db96d56Sopenharmony_ci with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 9237db96d56Sopenharmony_ci self.server.stat() 9247db96d56Sopenharmony_ci self.assertEqual(cm.exception.response, "412 No newsgroup selected") 9257db96d56Sopenharmony_ci 9267db96d56Sopenharmony_ci def test_next(self): 9277db96d56Sopenharmony_ci resp, art_num, message_id = self.server.next() 9287db96d56Sopenharmony_ci self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved") 9297db96d56Sopenharmony_ci self.assertEqual(art_num, 3000237) 9307db96d56Sopenharmony_ci self.assertEqual(message_id, "<668929@example.org>") 9317db96d56Sopenharmony_ci 9327db96d56Sopenharmony_ci def test_last(self): 9337db96d56Sopenharmony_ci resp, art_num, message_id = self.server.last() 9347db96d56Sopenharmony_ci self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved") 9357db96d56Sopenharmony_ci self.assertEqual(art_num, 3000234) 9367db96d56Sopenharmony_ci self.assertEqual(message_id, "<45223423@example.com>") 9377db96d56Sopenharmony_ci 9387db96d56Sopenharmony_ci def test_description(self): 9397db96d56Sopenharmony_ci desc = self.server.description("comp.lang.python") 9407db96d56Sopenharmony_ci self.assertEqual(desc, "The Python computer language.") 9417db96d56Sopenharmony_ci desc = self.server.description("comp.lang.pythonx") 9427db96d56Sopenharmony_ci self.assertEqual(desc, "") 9437db96d56Sopenharmony_ci 9447db96d56Sopenharmony_ci def test_descriptions(self): 9457db96d56Sopenharmony_ci resp, groups = self.server.descriptions("comp.lang.python") 9467db96d56Sopenharmony_ci self.assertEqual(resp, '215 Descriptions in form "group description".') 9477db96d56Sopenharmony_ci self.assertEqual(groups, { 9487db96d56Sopenharmony_ci "comp.lang.python": "The Python computer language.", 9497db96d56Sopenharmony_ci }) 9507db96d56Sopenharmony_ci resp, groups = self.server.descriptions("comp.lang.python*") 9517db96d56Sopenharmony_ci self.assertEqual(groups, { 9527db96d56Sopenharmony_ci "comp.lang.python": "The Python computer language.", 9537db96d56Sopenharmony_ci "comp.lang.python.announce": "Announcements about the Python language. (Moderated)", 9547db96d56Sopenharmony_ci }) 9557db96d56Sopenharmony_ci resp, groups = self.server.descriptions("comp.lang.pythonx") 9567db96d56Sopenharmony_ci self.assertEqual(groups, {}) 9577db96d56Sopenharmony_ci 9587db96d56Sopenharmony_ci def test_group(self): 9597db96d56Sopenharmony_ci resp, count, first, last, group = self.server.group("fr.comp.lang.python") 9607db96d56Sopenharmony_ci self.assertTrue(resp.startswith("211 "), resp) 9617db96d56Sopenharmony_ci self.assertEqual(first, 761) 9627db96d56Sopenharmony_ci self.assertEqual(last, 1265) 9637db96d56Sopenharmony_ci self.assertEqual(count, 486) 9647db96d56Sopenharmony_ci self.assertEqual(group, "fr.comp.lang.python") 9657db96d56Sopenharmony_ci with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 9667db96d56Sopenharmony_ci self.server.group("comp.lang.python.devel") 9677db96d56Sopenharmony_ci exc = cm.exception 9687db96d56Sopenharmony_ci self.assertTrue(exc.response.startswith("411 No such group"), 9697db96d56Sopenharmony_ci exc.response) 9707db96d56Sopenharmony_ci 9717db96d56Sopenharmony_ci def test_newnews(self): 9727db96d56Sopenharmony_ci # NEWNEWS comp.lang.python [20]100913 082004 9737db96d56Sopenharmony_ci dt = datetime.datetime(2010, 9, 13, 8, 20, 4) 9747db96d56Sopenharmony_ci resp, ids = self.server.newnews("comp.lang.python", dt) 9757db96d56Sopenharmony_ci expected = ( 9767db96d56Sopenharmony_ci "230 list of newsarticles (NNTP v{0}) " 9777db96d56Sopenharmony_ci "created after Mon Sep 13 08:20:04 2010 follows" 9787db96d56Sopenharmony_ci ).format(self.nntp_version) 9797db96d56Sopenharmony_ci self.assertEqual(resp, expected) 9807db96d56Sopenharmony_ci self.assertEqual(ids, [ 9817db96d56Sopenharmony_ci "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>", 9827db96d56Sopenharmony_ci "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>", 9837db96d56Sopenharmony_ci ]) 9847db96d56Sopenharmony_ci # NEWNEWS fr.comp.lang.python [20]100913 082004 9857db96d56Sopenharmony_ci dt = datetime.datetime(2010, 9, 13, 8, 20, 4) 9867db96d56Sopenharmony_ci resp, ids = self.server.newnews("fr.comp.lang.python", dt) 9877db96d56Sopenharmony_ci self.assertEqual(resp, "230 An empty list of newsarticles follows") 9887db96d56Sopenharmony_ci self.assertEqual(ids, []) 9897db96d56Sopenharmony_ci 9907db96d56Sopenharmony_ci def _check_article_body(self, lines): 9917db96d56Sopenharmony_ci self.assertEqual(len(lines), 4) 9927db96d56Sopenharmony_ci self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.") 9937db96d56Sopenharmony_ci self.assertEqual(lines[-2], b"") 9947db96d56Sopenharmony_ci self.assertEqual(lines[-3], b".Here is a dot-starting line.") 9957db96d56Sopenharmony_ci self.assertEqual(lines[-4], b"This is just a test article.") 9967db96d56Sopenharmony_ci 9977db96d56Sopenharmony_ci def _check_article_head(self, lines): 9987db96d56Sopenharmony_ci self.assertEqual(len(lines), 4) 9997db96d56Sopenharmony_ci self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>') 10007db96d56Sopenharmony_ci self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>") 10017db96d56Sopenharmony_ci 10027db96d56Sopenharmony_ci def _check_article_data(self, lines): 10037db96d56Sopenharmony_ci self.assertEqual(len(lines), 9) 10047db96d56Sopenharmony_ci self._check_article_head(lines[:4]) 10057db96d56Sopenharmony_ci self._check_article_body(lines[-4:]) 10067db96d56Sopenharmony_ci self.assertEqual(lines[4], b"") 10077db96d56Sopenharmony_ci 10087db96d56Sopenharmony_ci def test_article(self): 10097db96d56Sopenharmony_ci # ARTICLE 10107db96d56Sopenharmony_ci resp, info = self.server.article() 10117db96d56Sopenharmony_ci self.assertEqual(resp, "220 3000237 <45223423@example.com>") 10127db96d56Sopenharmony_ci art_num, message_id, lines = info 10137db96d56Sopenharmony_ci self.assertEqual(art_num, 3000237) 10147db96d56Sopenharmony_ci self.assertEqual(message_id, "<45223423@example.com>") 10157db96d56Sopenharmony_ci self._check_article_data(lines) 10167db96d56Sopenharmony_ci # ARTICLE num 10177db96d56Sopenharmony_ci resp, info = self.server.article(3000234) 10187db96d56Sopenharmony_ci self.assertEqual(resp, "220 3000234 <45223423@example.com>") 10197db96d56Sopenharmony_ci art_num, message_id, lines = info 10207db96d56Sopenharmony_ci self.assertEqual(art_num, 3000234) 10217db96d56Sopenharmony_ci self.assertEqual(message_id, "<45223423@example.com>") 10227db96d56Sopenharmony_ci self._check_article_data(lines) 10237db96d56Sopenharmony_ci # ARTICLE id 10247db96d56Sopenharmony_ci resp, info = self.server.article("<45223423@example.com>") 10257db96d56Sopenharmony_ci self.assertEqual(resp, "220 0 <45223423@example.com>") 10267db96d56Sopenharmony_ci art_num, message_id, lines = info 10277db96d56Sopenharmony_ci self.assertEqual(art_num, 0) 10287db96d56Sopenharmony_ci self.assertEqual(message_id, "<45223423@example.com>") 10297db96d56Sopenharmony_ci self._check_article_data(lines) 10307db96d56Sopenharmony_ci # Non-existent id 10317db96d56Sopenharmony_ci with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 10327db96d56Sopenharmony_ci self.server.article("<non-existent@example.com>") 10337db96d56Sopenharmony_ci self.assertEqual(cm.exception.response, "430 No Such Article Found") 10347db96d56Sopenharmony_ci 10357db96d56Sopenharmony_ci def test_article_file(self): 10367db96d56Sopenharmony_ci # With a "file" argument 10377db96d56Sopenharmony_ci f = io.BytesIO() 10387db96d56Sopenharmony_ci resp, info = self.server.article(file=f) 10397db96d56Sopenharmony_ci self.assertEqual(resp, "220 3000237 <45223423@example.com>") 10407db96d56Sopenharmony_ci art_num, message_id, lines = info 10417db96d56Sopenharmony_ci self.assertEqual(art_num, 3000237) 10427db96d56Sopenharmony_ci self.assertEqual(message_id, "<45223423@example.com>") 10437db96d56Sopenharmony_ci self.assertEqual(lines, []) 10447db96d56Sopenharmony_ci data = f.getvalue() 10457db96d56Sopenharmony_ci self.assertTrue(data.startswith( 10467db96d56Sopenharmony_ci b'From: "Demo User" <nobody@example.net>\r\n' 10477db96d56Sopenharmony_ci b'Subject: I am just a test article\r\n' 10487db96d56Sopenharmony_ci ), ascii(data)) 10497db96d56Sopenharmony_ci self.assertTrue(data.endswith( 10507db96d56Sopenharmony_ci b'This is just a test article.\r\n' 10517db96d56Sopenharmony_ci b'.Here is a dot-starting line.\r\n' 10527db96d56Sopenharmony_ci b'\r\n' 10537db96d56Sopenharmony_ci b'-- Signed by Andr\xc3\xa9.\r\n' 10547db96d56Sopenharmony_ci ), ascii(data)) 10557db96d56Sopenharmony_ci 10567db96d56Sopenharmony_ci def test_head(self): 10577db96d56Sopenharmony_ci # HEAD 10587db96d56Sopenharmony_ci resp, info = self.server.head() 10597db96d56Sopenharmony_ci self.assertEqual(resp, "221 3000237 <45223423@example.com>") 10607db96d56Sopenharmony_ci art_num, message_id, lines = info 10617db96d56Sopenharmony_ci self.assertEqual(art_num, 3000237) 10627db96d56Sopenharmony_ci self.assertEqual(message_id, "<45223423@example.com>") 10637db96d56Sopenharmony_ci self._check_article_head(lines) 10647db96d56Sopenharmony_ci # HEAD num 10657db96d56Sopenharmony_ci resp, info = self.server.head(3000234) 10667db96d56Sopenharmony_ci self.assertEqual(resp, "221 3000234 <45223423@example.com>") 10677db96d56Sopenharmony_ci art_num, message_id, lines = info 10687db96d56Sopenharmony_ci self.assertEqual(art_num, 3000234) 10697db96d56Sopenharmony_ci self.assertEqual(message_id, "<45223423@example.com>") 10707db96d56Sopenharmony_ci self._check_article_head(lines) 10717db96d56Sopenharmony_ci # HEAD id 10727db96d56Sopenharmony_ci resp, info = self.server.head("<45223423@example.com>") 10737db96d56Sopenharmony_ci self.assertEqual(resp, "221 0 <45223423@example.com>") 10747db96d56Sopenharmony_ci art_num, message_id, lines = info 10757db96d56Sopenharmony_ci self.assertEqual(art_num, 0) 10767db96d56Sopenharmony_ci self.assertEqual(message_id, "<45223423@example.com>") 10777db96d56Sopenharmony_ci self._check_article_head(lines) 10787db96d56Sopenharmony_ci # Non-existent id 10797db96d56Sopenharmony_ci with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 10807db96d56Sopenharmony_ci self.server.head("<non-existent@example.com>") 10817db96d56Sopenharmony_ci self.assertEqual(cm.exception.response, "430 No Such Article Found") 10827db96d56Sopenharmony_ci 10837db96d56Sopenharmony_ci def test_head_file(self): 10847db96d56Sopenharmony_ci f = io.BytesIO() 10857db96d56Sopenharmony_ci resp, info = self.server.head(file=f) 10867db96d56Sopenharmony_ci self.assertEqual(resp, "221 3000237 <45223423@example.com>") 10877db96d56Sopenharmony_ci art_num, message_id, lines = info 10887db96d56Sopenharmony_ci self.assertEqual(art_num, 3000237) 10897db96d56Sopenharmony_ci self.assertEqual(message_id, "<45223423@example.com>") 10907db96d56Sopenharmony_ci self.assertEqual(lines, []) 10917db96d56Sopenharmony_ci data = f.getvalue() 10927db96d56Sopenharmony_ci self.assertTrue(data.startswith( 10937db96d56Sopenharmony_ci b'From: "Demo User" <nobody@example.net>\r\n' 10947db96d56Sopenharmony_ci b'Subject: I am just a test article\r\n' 10957db96d56Sopenharmony_ci ), ascii(data)) 10967db96d56Sopenharmony_ci self.assertFalse(data.endswith( 10977db96d56Sopenharmony_ci b'This is just a test article.\r\n' 10987db96d56Sopenharmony_ci b'.Here is a dot-starting line.\r\n' 10997db96d56Sopenharmony_ci b'\r\n' 11007db96d56Sopenharmony_ci b'-- Signed by Andr\xc3\xa9.\r\n' 11017db96d56Sopenharmony_ci ), ascii(data)) 11027db96d56Sopenharmony_ci 11037db96d56Sopenharmony_ci def test_body(self): 11047db96d56Sopenharmony_ci # BODY 11057db96d56Sopenharmony_ci resp, info = self.server.body() 11067db96d56Sopenharmony_ci self.assertEqual(resp, "222 3000237 <45223423@example.com>") 11077db96d56Sopenharmony_ci art_num, message_id, lines = info 11087db96d56Sopenharmony_ci self.assertEqual(art_num, 3000237) 11097db96d56Sopenharmony_ci self.assertEqual(message_id, "<45223423@example.com>") 11107db96d56Sopenharmony_ci self._check_article_body(lines) 11117db96d56Sopenharmony_ci # BODY num 11127db96d56Sopenharmony_ci resp, info = self.server.body(3000234) 11137db96d56Sopenharmony_ci self.assertEqual(resp, "222 3000234 <45223423@example.com>") 11147db96d56Sopenharmony_ci art_num, message_id, lines = info 11157db96d56Sopenharmony_ci self.assertEqual(art_num, 3000234) 11167db96d56Sopenharmony_ci self.assertEqual(message_id, "<45223423@example.com>") 11177db96d56Sopenharmony_ci self._check_article_body(lines) 11187db96d56Sopenharmony_ci # BODY id 11197db96d56Sopenharmony_ci resp, info = self.server.body("<45223423@example.com>") 11207db96d56Sopenharmony_ci self.assertEqual(resp, "222 0 <45223423@example.com>") 11217db96d56Sopenharmony_ci art_num, message_id, lines = info 11227db96d56Sopenharmony_ci self.assertEqual(art_num, 0) 11237db96d56Sopenharmony_ci self.assertEqual(message_id, "<45223423@example.com>") 11247db96d56Sopenharmony_ci self._check_article_body(lines) 11257db96d56Sopenharmony_ci # Non-existent id 11267db96d56Sopenharmony_ci with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 11277db96d56Sopenharmony_ci self.server.body("<non-existent@example.com>") 11287db96d56Sopenharmony_ci self.assertEqual(cm.exception.response, "430 No Such Article Found") 11297db96d56Sopenharmony_ci 11307db96d56Sopenharmony_ci def test_body_file(self): 11317db96d56Sopenharmony_ci f = io.BytesIO() 11327db96d56Sopenharmony_ci resp, info = self.server.body(file=f) 11337db96d56Sopenharmony_ci self.assertEqual(resp, "222 3000237 <45223423@example.com>") 11347db96d56Sopenharmony_ci art_num, message_id, lines = info 11357db96d56Sopenharmony_ci self.assertEqual(art_num, 3000237) 11367db96d56Sopenharmony_ci self.assertEqual(message_id, "<45223423@example.com>") 11377db96d56Sopenharmony_ci self.assertEqual(lines, []) 11387db96d56Sopenharmony_ci data = f.getvalue() 11397db96d56Sopenharmony_ci self.assertFalse(data.startswith( 11407db96d56Sopenharmony_ci b'From: "Demo User" <nobody@example.net>\r\n' 11417db96d56Sopenharmony_ci b'Subject: I am just a test article\r\n' 11427db96d56Sopenharmony_ci ), ascii(data)) 11437db96d56Sopenharmony_ci self.assertTrue(data.endswith( 11447db96d56Sopenharmony_ci b'This is just a test article.\r\n' 11457db96d56Sopenharmony_ci b'.Here is a dot-starting line.\r\n' 11467db96d56Sopenharmony_ci b'\r\n' 11477db96d56Sopenharmony_ci b'-- Signed by Andr\xc3\xa9.\r\n' 11487db96d56Sopenharmony_ci ), ascii(data)) 11497db96d56Sopenharmony_ci 11507db96d56Sopenharmony_ci def check_over_xover_resp(self, resp, overviews): 11517db96d56Sopenharmony_ci self.assertTrue(resp.startswith("224 "), resp) 11527db96d56Sopenharmony_ci self.assertEqual(len(overviews), 3) 11537db96d56Sopenharmony_ci art_num, over = overviews[0] 11547db96d56Sopenharmony_ci self.assertEqual(art_num, 57) 11557db96d56Sopenharmony_ci self.assertEqual(over, { 11567db96d56Sopenharmony_ci "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>", 11577db96d56Sopenharmony_ci "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout", 11587db96d56Sopenharmony_ci "date": "Sat, 19 Jun 2010 18:04:08 -0400", 11597db96d56Sopenharmony_ci "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>", 11607db96d56Sopenharmony_ci "references": "<hvalf7$ort$1@dough.gmane.org>", 11617db96d56Sopenharmony_ci ":bytes": "7103", 11627db96d56Sopenharmony_ci ":lines": "16", 11637db96d56Sopenharmony_ci "xref": "news.gmane.io gmane.comp.python.authors:57" 11647db96d56Sopenharmony_ci }) 11657db96d56Sopenharmony_ci art_num, over = overviews[1] 11667db96d56Sopenharmony_ci self.assertEqual(over["xref"], None) 11677db96d56Sopenharmony_ci art_num, over = overviews[2] 11687db96d56Sopenharmony_ci self.assertEqual(over["subject"], 11697db96d56Sopenharmony_ci "Re: Message d'erreur incompréhensible (par moi)") 11707db96d56Sopenharmony_ci 11717db96d56Sopenharmony_ci def test_xover(self): 11727db96d56Sopenharmony_ci resp, overviews = self.server.xover(57, 59) 11737db96d56Sopenharmony_ci self.check_over_xover_resp(resp, overviews) 11747db96d56Sopenharmony_ci 11757db96d56Sopenharmony_ci def test_over(self): 11767db96d56Sopenharmony_ci # In NNTP "v1", this will fallback on XOVER 11777db96d56Sopenharmony_ci resp, overviews = self.server.over((57, 59)) 11787db96d56Sopenharmony_ci self.check_over_xover_resp(resp, overviews) 11797db96d56Sopenharmony_ci 11807db96d56Sopenharmony_ci sample_post = ( 11817db96d56Sopenharmony_ci b'From: "Demo User" <nobody@example.net>\r\n' 11827db96d56Sopenharmony_ci b'Subject: I am just a test article\r\n' 11837db96d56Sopenharmony_ci b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n' 11847db96d56Sopenharmony_ci b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n' 11857db96d56Sopenharmony_ci b'\r\n' 11867db96d56Sopenharmony_ci b'This is just a test article.\r\n' 11877db96d56Sopenharmony_ci b'.Here is a dot-starting line.\r\n' 11887db96d56Sopenharmony_ci b'\r\n' 11897db96d56Sopenharmony_ci b'-- Signed by Andr\xc3\xa9.\r\n' 11907db96d56Sopenharmony_ci ) 11917db96d56Sopenharmony_ci 11927db96d56Sopenharmony_ci def _check_posted_body(self): 11937db96d56Sopenharmony_ci # Check the raw body as received by the server 11947db96d56Sopenharmony_ci lines = self.handler.posted_body 11957db96d56Sopenharmony_ci # One additional line for the "." terminator 11967db96d56Sopenharmony_ci self.assertEqual(len(lines), 10) 11977db96d56Sopenharmony_ci self.assertEqual(lines[-1], b'.\r\n') 11987db96d56Sopenharmony_ci self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n') 11997db96d56Sopenharmony_ci self.assertEqual(lines[-3], b'\r\n') 12007db96d56Sopenharmony_ci self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n') 12017db96d56Sopenharmony_ci self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n') 12027db96d56Sopenharmony_ci 12037db96d56Sopenharmony_ci def _check_post_ihave_sub(self, func, *args, file_factory): 12047db96d56Sopenharmony_ci # First the prepared post with CRLF endings 12057db96d56Sopenharmony_ci post = self.sample_post 12067db96d56Sopenharmony_ci func_args = args + (file_factory(post),) 12077db96d56Sopenharmony_ci self.handler.posted_body = None 12087db96d56Sopenharmony_ci resp = func(*func_args) 12097db96d56Sopenharmony_ci self._check_posted_body() 12107db96d56Sopenharmony_ci # Then the same post with "normal" line endings - they should be 12117db96d56Sopenharmony_ci # converted by NNTP.post and NNTP.ihave. 12127db96d56Sopenharmony_ci post = self.sample_post.replace(b"\r\n", b"\n") 12137db96d56Sopenharmony_ci func_args = args + (file_factory(post),) 12147db96d56Sopenharmony_ci self.handler.posted_body = None 12157db96d56Sopenharmony_ci resp = func(*func_args) 12167db96d56Sopenharmony_ci self._check_posted_body() 12177db96d56Sopenharmony_ci return resp 12187db96d56Sopenharmony_ci 12197db96d56Sopenharmony_ci def check_post_ihave(self, func, success_resp, *args): 12207db96d56Sopenharmony_ci # With a bytes object 12217db96d56Sopenharmony_ci resp = self._check_post_ihave_sub(func, *args, file_factory=bytes) 12227db96d56Sopenharmony_ci self.assertEqual(resp, success_resp) 12237db96d56Sopenharmony_ci # With a bytearray object 12247db96d56Sopenharmony_ci resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray) 12257db96d56Sopenharmony_ci self.assertEqual(resp, success_resp) 12267db96d56Sopenharmony_ci # With a file object 12277db96d56Sopenharmony_ci resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO) 12287db96d56Sopenharmony_ci self.assertEqual(resp, success_resp) 12297db96d56Sopenharmony_ci # With an iterable of terminated lines 12307db96d56Sopenharmony_ci def iterlines(b): 12317db96d56Sopenharmony_ci return iter(b.splitlines(keepends=True)) 12327db96d56Sopenharmony_ci resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines) 12337db96d56Sopenharmony_ci self.assertEqual(resp, success_resp) 12347db96d56Sopenharmony_ci # With an iterable of non-terminated lines 12357db96d56Sopenharmony_ci def iterlines(b): 12367db96d56Sopenharmony_ci return iter(b.splitlines(keepends=False)) 12377db96d56Sopenharmony_ci resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines) 12387db96d56Sopenharmony_ci self.assertEqual(resp, success_resp) 12397db96d56Sopenharmony_ci 12407db96d56Sopenharmony_ci def test_post(self): 12417db96d56Sopenharmony_ci self.check_post_ihave(self.server.post, "240 Article received OK") 12427db96d56Sopenharmony_ci self.handler.allow_posting = False 12437db96d56Sopenharmony_ci with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 12447db96d56Sopenharmony_ci self.server.post(self.sample_post) 12457db96d56Sopenharmony_ci self.assertEqual(cm.exception.response, 12467db96d56Sopenharmony_ci "440 Posting not permitted") 12477db96d56Sopenharmony_ci 12487db96d56Sopenharmony_ci def test_ihave(self): 12497db96d56Sopenharmony_ci self.check_post_ihave(self.server.ihave, "235 Article transferred OK", 12507db96d56Sopenharmony_ci "<i.am.an.article.you.will.want@example.com>") 12517db96d56Sopenharmony_ci with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 12527db96d56Sopenharmony_ci self.server.ihave("<another.message.id>", self.sample_post) 12537db96d56Sopenharmony_ci self.assertEqual(cm.exception.response, 12547db96d56Sopenharmony_ci "435 Article not wanted") 12557db96d56Sopenharmony_ci 12567db96d56Sopenharmony_ci def test_too_long_lines(self): 12577db96d56Sopenharmony_ci dt = datetime.datetime(2010, 1, 1, 9, 0, 0) 12587db96d56Sopenharmony_ci self.assertRaises(nntplib.NNTPDataError, 12597db96d56Sopenharmony_ci self.server.newnews, "comp.lang.python", dt) 12607db96d56Sopenharmony_ci 12617db96d56Sopenharmony_ci 12627db96d56Sopenharmony_ciclass NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase): 12637db96d56Sopenharmony_ci """Tests an NNTP v1 server (no capabilities).""" 12647db96d56Sopenharmony_ci 12657db96d56Sopenharmony_ci nntp_version = 1 12667db96d56Sopenharmony_ci handler_class = NNTPv1Handler 12677db96d56Sopenharmony_ci 12687db96d56Sopenharmony_ci def test_caps(self): 12697db96d56Sopenharmony_ci caps = self.server.getcapabilities() 12707db96d56Sopenharmony_ci self.assertEqual(caps, {}) 12717db96d56Sopenharmony_ci self.assertEqual(self.server.nntp_version, 1) 12727db96d56Sopenharmony_ci self.assertEqual(self.server.nntp_implementation, None) 12737db96d56Sopenharmony_ci 12747db96d56Sopenharmony_ci 12757db96d56Sopenharmony_ciclass NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase): 12767db96d56Sopenharmony_ci """Tests an NNTP v2 server (with capabilities).""" 12777db96d56Sopenharmony_ci 12787db96d56Sopenharmony_ci nntp_version = 2 12797db96d56Sopenharmony_ci handler_class = NNTPv2Handler 12807db96d56Sopenharmony_ci 12817db96d56Sopenharmony_ci def test_caps(self): 12827db96d56Sopenharmony_ci caps = self.server.getcapabilities() 12837db96d56Sopenharmony_ci self.assertEqual(caps, { 12847db96d56Sopenharmony_ci 'VERSION': ['2', '3'], 12857db96d56Sopenharmony_ci 'IMPLEMENTATION': ['INN', '2.5.1'], 12867db96d56Sopenharmony_ci 'AUTHINFO': ['USER'], 12877db96d56Sopenharmony_ci 'HDR': [], 12887db96d56Sopenharmony_ci 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS', 12897db96d56Sopenharmony_ci 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'], 12907db96d56Sopenharmony_ci 'OVER': [], 12917db96d56Sopenharmony_ci 'POST': [], 12927db96d56Sopenharmony_ci 'READER': [], 12937db96d56Sopenharmony_ci }) 12947db96d56Sopenharmony_ci self.assertEqual(self.server.nntp_version, 3) 12957db96d56Sopenharmony_ci self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1') 12967db96d56Sopenharmony_ci 12977db96d56Sopenharmony_ci 12987db96d56Sopenharmony_ciclass CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase): 12997db96d56Sopenharmony_ci """Tests a probably NNTP v2 server with capabilities only after login.""" 13007db96d56Sopenharmony_ci 13017db96d56Sopenharmony_ci nntp_version = 2 13027db96d56Sopenharmony_ci handler_class = CapsAfterLoginNNTPv2Handler 13037db96d56Sopenharmony_ci 13047db96d56Sopenharmony_ci def test_caps_only_after_login(self): 13057db96d56Sopenharmony_ci self.assertEqual(self.server._caps, {}) 13067db96d56Sopenharmony_ci self.server.login('testuser', 'testpw') 13077db96d56Sopenharmony_ci self.assertIn('VERSION', self.server._caps) 13087db96d56Sopenharmony_ci 13097db96d56Sopenharmony_ci 13107db96d56Sopenharmony_ciclass SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin, 13117db96d56Sopenharmony_ci unittest.TestCase): 13127db96d56Sopenharmony_ci """Same tests as for v2 but we tell NTTP to send MODE READER to a server 13137db96d56Sopenharmony_ci that isn't in READER mode by default.""" 13147db96d56Sopenharmony_ci 13157db96d56Sopenharmony_ci nntp_version = 2 13167db96d56Sopenharmony_ci handler_class = ModeSwitchingNNTPv2Handler 13177db96d56Sopenharmony_ci 13187db96d56Sopenharmony_ci def test_we_are_in_reader_mode_after_connect(self): 13197db96d56Sopenharmony_ci self.assertIn('READER', self.server._caps) 13207db96d56Sopenharmony_ci 13217db96d56Sopenharmony_ci 13227db96d56Sopenharmony_ciclass MiscTests(unittest.TestCase): 13237db96d56Sopenharmony_ci 13247db96d56Sopenharmony_ci def test_decode_header(self): 13257db96d56Sopenharmony_ci def gives(a, b): 13267db96d56Sopenharmony_ci self.assertEqual(nntplib.decode_header(a), b) 13277db96d56Sopenharmony_ci gives("" , "") 13287db96d56Sopenharmony_ci gives("a plain header", "a plain header") 13297db96d56Sopenharmony_ci gives(" with extra spaces ", " with extra spaces ") 13307db96d56Sopenharmony_ci gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python") 13317db96d56Sopenharmony_ci gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?=" 13327db96d56Sopenharmony_ci " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=", 13337db96d56Sopenharmony_ci "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées") 13347db96d56Sopenharmony_ci gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=", 13357db96d56Sopenharmony_ci "Re: problème de matrice") 13367db96d56Sopenharmony_ci # A natively utf-8 header (found in the real world!) 13377db96d56Sopenharmony_ci gives("Re: Message d'erreur incompréhensible (par moi)", 13387db96d56Sopenharmony_ci "Re: Message d'erreur incompréhensible (par moi)") 13397db96d56Sopenharmony_ci 13407db96d56Sopenharmony_ci def test_parse_overview_fmt(self): 13417db96d56Sopenharmony_ci # The minimal (default) response 13427db96d56Sopenharmony_ci lines = ["Subject:", "From:", "Date:", "Message-ID:", 13437db96d56Sopenharmony_ci "References:", ":bytes", ":lines"] 13447db96d56Sopenharmony_ci self.assertEqual(nntplib._parse_overview_fmt(lines), 13457db96d56Sopenharmony_ci ["subject", "from", "date", "message-id", "references", 13467db96d56Sopenharmony_ci ":bytes", ":lines"]) 13477db96d56Sopenharmony_ci # The minimal response using alternative names 13487db96d56Sopenharmony_ci lines = ["Subject:", "From:", "Date:", "Message-ID:", 13497db96d56Sopenharmony_ci "References:", "Bytes:", "Lines:"] 13507db96d56Sopenharmony_ci self.assertEqual(nntplib._parse_overview_fmt(lines), 13517db96d56Sopenharmony_ci ["subject", "from", "date", "message-id", "references", 13527db96d56Sopenharmony_ci ":bytes", ":lines"]) 13537db96d56Sopenharmony_ci # Variations in casing 13547db96d56Sopenharmony_ci lines = ["subject:", "FROM:", "DaTe:", "message-ID:", 13557db96d56Sopenharmony_ci "References:", "BYTES:", "Lines:"] 13567db96d56Sopenharmony_ci self.assertEqual(nntplib._parse_overview_fmt(lines), 13577db96d56Sopenharmony_ci ["subject", "from", "date", "message-id", "references", 13587db96d56Sopenharmony_ci ":bytes", ":lines"]) 13597db96d56Sopenharmony_ci # First example from RFC 3977 13607db96d56Sopenharmony_ci lines = ["Subject:", "From:", "Date:", "Message-ID:", 13617db96d56Sopenharmony_ci "References:", ":bytes", ":lines", "Xref:full", 13627db96d56Sopenharmony_ci "Distribution:full"] 13637db96d56Sopenharmony_ci self.assertEqual(nntplib._parse_overview_fmt(lines), 13647db96d56Sopenharmony_ci ["subject", "from", "date", "message-id", "references", 13657db96d56Sopenharmony_ci ":bytes", ":lines", "xref", "distribution"]) 13667db96d56Sopenharmony_ci # Second example from RFC 3977 13677db96d56Sopenharmony_ci lines = ["Subject:", "From:", "Date:", "Message-ID:", 13687db96d56Sopenharmony_ci "References:", "Bytes:", "Lines:", "Xref:FULL", 13697db96d56Sopenharmony_ci "Distribution:FULL"] 13707db96d56Sopenharmony_ci self.assertEqual(nntplib._parse_overview_fmt(lines), 13717db96d56Sopenharmony_ci ["subject", "from", "date", "message-id", "references", 13727db96d56Sopenharmony_ci ":bytes", ":lines", "xref", "distribution"]) 13737db96d56Sopenharmony_ci # A classic response from INN 13747db96d56Sopenharmony_ci lines = ["Subject:", "From:", "Date:", "Message-ID:", 13757db96d56Sopenharmony_ci "References:", "Bytes:", "Lines:", "Xref:full"] 13767db96d56Sopenharmony_ci self.assertEqual(nntplib._parse_overview_fmt(lines), 13777db96d56Sopenharmony_ci ["subject", "from", "date", "message-id", "references", 13787db96d56Sopenharmony_ci ":bytes", ":lines", "xref"]) 13797db96d56Sopenharmony_ci 13807db96d56Sopenharmony_ci def test_parse_overview(self): 13817db96d56Sopenharmony_ci fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"] 13827db96d56Sopenharmony_ci # First example from RFC 3977 13837db96d56Sopenharmony_ci lines = [ 13847db96d56Sopenharmony_ci '3000234\tI am just a test article\t"Demo User" ' 13857db96d56Sopenharmony_ci '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t' 13867db96d56Sopenharmony_ci '<45223423@example.com>\t<45454@example.net>\t1234\t' 13877db96d56Sopenharmony_ci '17\tXref: news.example.com misc.test:3000363', 13887db96d56Sopenharmony_ci ] 13897db96d56Sopenharmony_ci overview = nntplib._parse_overview(lines, fmt) 13907db96d56Sopenharmony_ci (art_num, fields), = overview 13917db96d56Sopenharmony_ci self.assertEqual(art_num, 3000234) 13927db96d56Sopenharmony_ci self.assertEqual(fields, { 13937db96d56Sopenharmony_ci 'subject': 'I am just a test article', 13947db96d56Sopenharmony_ci 'from': '"Demo User" <nobody@example.com>', 13957db96d56Sopenharmony_ci 'date': '6 Oct 1998 04:38:40 -0500', 13967db96d56Sopenharmony_ci 'message-id': '<45223423@example.com>', 13977db96d56Sopenharmony_ci 'references': '<45454@example.net>', 13987db96d56Sopenharmony_ci ':bytes': '1234', 13997db96d56Sopenharmony_ci ':lines': '17', 14007db96d56Sopenharmony_ci 'xref': 'news.example.com misc.test:3000363', 14017db96d56Sopenharmony_ci }) 14027db96d56Sopenharmony_ci # Second example; here the "Xref" field is totally absent (including 14037db96d56Sopenharmony_ci # the header name) and comes out as None 14047db96d56Sopenharmony_ci lines = [ 14057db96d56Sopenharmony_ci '3000234\tI am just a test article\t"Demo User" ' 14067db96d56Sopenharmony_ci '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t' 14077db96d56Sopenharmony_ci '<45223423@example.com>\t<45454@example.net>\t1234\t' 14087db96d56Sopenharmony_ci '17\t\t', 14097db96d56Sopenharmony_ci ] 14107db96d56Sopenharmony_ci overview = nntplib._parse_overview(lines, fmt) 14117db96d56Sopenharmony_ci (art_num, fields), = overview 14127db96d56Sopenharmony_ci self.assertEqual(fields['xref'], None) 14137db96d56Sopenharmony_ci # Third example; the "Xref" is an empty string, while "references" 14147db96d56Sopenharmony_ci # is a single space. 14157db96d56Sopenharmony_ci lines = [ 14167db96d56Sopenharmony_ci '3000234\tI am just a test article\t"Demo User" ' 14177db96d56Sopenharmony_ci '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t' 14187db96d56Sopenharmony_ci '<45223423@example.com>\t \t1234\t' 14197db96d56Sopenharmony_ci '17\tXref: \t', 14207db96d56Sopenharmony_ci ] 14217db96d56Sopenharmony_ci overview = nntplib._parse_overview(lines, fmt) 14227db96d56Sopenharmony_ci (art_num, fields), = overview 14237db96d56Sopenharmony_ci self.assertEqual(fields['references'], ' ') 14247db96d56Sopenharmony_ci self.assertEqual(fields['xref'], '') 14257db96d56Sopenharmony_ci 14267db96d56Sopenharmony_ci def test_parse_datetime(self): 14277db96d56Sopenharmony_ci def gives(a, b, *c): 14287db96d56Sopenharmony_ci self.assertEqual(nntplib._parse_datetime(a, b), 14297db96d56Sopenharmony_ci datetime.datetime(*c)) 14307db96d56Sopenharmony_ci # Output of DATE command 14317db96d56Sopenharmony_ci gives("19990623135624", None, 1999, 6, 23, 13, 56, 24) 14327db96d56Sopenharmony_ci # Variations 14337db96d56Sopenharmony_ci gives("19990623", "135624", 1999, 6, 23, 13, 56, 24) 14347db96d56Sopenharmony_ci gives("990623", "135624", 1999, 6, 23, 13, 56, 24) 14357db96d56Sopenharmony_ci gives("090623", "135624", 2009, 6, 23, 13, 56, 24) 14367db96d56Sopenharmony_ci 14377db96d56Sopenharmony_ci def test_unparse_datetime(self): 14387db96d56Sopenharmony_ci # Test non-legacy mode 14397db96d56Sopenharmony_ci # 1) with a datetime 14407db96d56Sopenharmony_ci def gives(y, M, d, h, m, s, date_str, time_str): 14417db96d56Sopenharmony_ci dt = datetime.datetime(y, M, d, h, m, s) 14427db96d56Sopenharmony_ci self.assertEqual(nntplib._unparse_datetime(dt), 14437db96d56Sopenharmony_ci (date_str, time_str)) 14447db96d56Sopenharmony_ci self.assertEqual(nntplib._unparse_datetime(dt, False), 14457db96d56Sopenharmony_ci (date_str, time_str)) 14467db96d56Sopenharmony_ci gives(1999, 6, 23, 13, 56, 24, "19990623", "135624") 14477db96d56Sopenharmony_ci gives(2000, 6, 23, 13, 56, 24, "20000623", "135624") 14487db96d56Sopenharmony_ci gives(2010, 6, 5, 1, 2, 3, "20100605", "010203") 14497db96d56Sopenharmony_ci # 2) with a date 14507db96d56Sopenharmony_ci def gives(y, M, d, date_str, time_str): 14517db96d56Sopenharmony_ci dt = datetime.date(y, M, d) 14527db96d56Sopenharmony_ci self.assertEqual(nntplib._unparse_datetime(dt), 14537db96d56Sopenharmony_ci (date_str, time_str)) 14547db96d56Sopenharmony_ci self.assertEqual(nntplib._unparse_datetime(dt, False), 14557db96d56Sopenharmony_ci (date_str, time_str)) 14567db96d56Sopenharmony_ci gives(1999, 6, 23, "19990623", "000000") 14577db96d56Sopenharmony_ci gives(2000, 6, 23, "20000623", "000000") 14587db96d56Sopenharmony_ci gives(2010, 6, 5, "20100605", "000000") 14597db96d56Sopenharmony_ci 14607db96d56Sopenharmony_ci def test_unparse_datetime_legacy(self): 14617db96d56Sopenharmony_ci # Test legacy mode (RFC 977) 14627db96d56Sopenharmony_ci # 1) with a datetime 14637db96d56Sopenharmony_ci def gives(y, M, d, h, m, s, date_str, time_str): 14647db96d56Sopenharmony_ci dt = datetime.datetime(y, M, d, h, m, s) 14657db96d56Sopenharmony_ci self.assertEqual(nntplib._unparse_datetime(dt, True), 14667db96d56Sopenharmony_ci (date_str, time_str)) 14677db96d56Sopenharmony_ci gives(1999, 6, 23, 13, 56, 24, "990623", "135624") 14687db96d56Sopenharmony_ci gives(2000, 6, 23, 13, 56, 24, "000623", "135624") 14697db96d56Sopenharmony_ci gives(2010, 6, 5, 1, 2, 3, "100605", "010203") 14707db96d56Sopenharmony_ci # 2) with a date 14717db96d56Sopenharmony_ci def gives(y, M, d, date_str, time_str): 14727db96d56Sopenharmony_ci dt = datetime.date(y, M, d) 14737db96d56Sopenharmony_ci self.assertEqual(nntplib._unparse_datetime(dt, True), 14747db96d56Sopenharmony_ci (date_str, time_str)) 14757db96d56Sopenharmony_ci gives(1999, 6, 23, "990623", "000000") 14767db96d56Sopenharmony_ci gives(2000, 6, 23, "000623", "000000") 14777db96d56Sopenharmony_ci gives(2010, 6, 5, "100605", "000000") 14787db96d56Sopenharmony_ci 14797db96d56Sopenharmony_ci @unittest.skipUnless(ssl, 'requires SSL support') 14807db96d56Sopenharmony_ci def test_ssl_support(self): 14817db96d56Sopenharmony_ci self.assertTrue(hasattr(nntplib, 'NNTP_SSL')) 14827db96d56Sopenharmony_ci 14837db96d56Sopenharmony_ci 14847db96d56Sopenharmony_ciclass PublicAPITests(unittest.TestCase): 14857db96d56Sopenharmony_ci """Ensures that the correct values are exposed in the public API.""" 14867db96d56Sopenharmony_ci 14877db96d56Sopenharmony_ci def test_module_all_attribute(self): 14887db96d56Sopenharmony_ci self.assertTrue(hasattr(nntplib, '__all__')) 14897db96d56Sopenharmony_ci target_api = ['NNTP', 'NNTPError', 'NNTPReplyError', 14907db96d56Sopenharmony_ci 'NNTPTemporaryError', 'NNTPPermanentError', 14917db96d56Sopenharmony_ci 'NNTPProtocolError', 'NNTPDataError', 'decode_header'] 14927db96d56Sopenharmony_ci if ssl is not None: 14937db96d56Sopenharmony_ci target_api.append('NNTP_SSL') 14947db96d56Sopenharmony_ci self.assertEqual(set(nntplib.__all__), set(target_api)) 14957db96d56Sopenharmony_ci 14967db96d56Sopenharmony_ciclass MockSocketTests(unittest.TestCase): 14977db96d56Sopenharmony_ci """Tests involving a mock socket object 14987db96d56Sopenharmony_ci 14997db96d56Sopenharmony_ci Used where the _NNTPServerIO file object is not enough.""" 15007db96d56Sopenharmony_ci 15017db96d56Sopenharmony_ci nntp_class = nntplib.NNTP 15027db96d56Sopenharmony_ci 15037db96d56Sopenharmony_ci def check_constructor_error_conditions( 15047db96d56Sopenharmony_ci self, handler_class, 15057db96d56Sopenharmony_ci expected_error_type, expected_error_msg, 15067db96d56Sopenharmony_ci login=None, password=None): 15077db96d56Sopenharmony_ci 15087db96d56Sopenharmony_ci class mock_socket_module: 15097db96d56Sopenharmony_ci def create_connection(address, timeout): 15107db96d56Sopenharmony_ci return MockSocket() 15117db96d56Sopenharmony_ci 15127db96d56Sopenharmony_ci class MockSocket: 15137db96d56Sopenharmony_ci def close(self): 15147db96d56Sopenharmony_ci nonlocal socket_closed 15157db96d56Sopenharmony_ci socket_closed = True 15167db96d56Sopenharmony_ci 15177db96d56Sopenharmony_ci def makefile(socket, mode): 15187db96d56Sopenharmony_ci handler = handler_class() 15197db96d56Sopenharmony_ci _, file = make_mock_file(handler) 15207db96d56Sopenharmony_ci files.append(file) 15217db96d56Sopenharmony_ci return file 15227db96d56Sopenharmony_ci 15237db96d56Sopenharmony_ci socket_closed = False 15247db96d56Sopenharmony_ci files = [] 15257db96d56Sopenharmony_ci with patch('nntplib.socket', mock_socket_module), \ 15267db96d56Sopenharmony_ci self.assertRaisesRegex(expected_error_type, expected_error_msg): 15277db96d56Sopenharmony_ci self.nntp_class('dummy', user=login, password=password) 15287db96d56Sopenharmony_ci self.assertTrue(socket_closed) 15297db96d56Sopenharmony_ci for f in files: 15307db96d56Sopenharmony_ci self.assertTrue(f.closed) 15317db96d56Sopenharmony_ci 15327db96d56Sopenharmony_ci def test_bad_welcome(self): 15337db96d56Sopenharmony_ci #Test a bad welcome message 15347db96d56Sopenharmony_ci class Handler(NNTPv1Handler): 15357db96d56Sopenharmony_ci welcome = 'Bad Welcome' 15367db96d56Sopenharmony_ci self.check_constructor_error_conditions( 15377db96d56Sopenharmony_ci Handler, nntplib.NNTPProtocolError, Handler.welcome) 15387db96d56Sopenharmony_ci 15397db96d56Sopenharmony_ci def test_service_temporarily_unavailable(self): 15407db96d56Sopenharmony_ci #Test service temporarily unavailable 15417db96d56Sopenharmony_ci class Handler(NNTPv1Handler): 15427db96d56Sopenharmony_ci welcome = '400 Service temporarily unavailable' 15437db96d56Sopenharmony_ci self.check_constructor_error_conditions( 15447db96d56Sopenharmony_ci Handler, nntplib.NNTPTemporaryError, Handler.welcome) 15457db96d56Sopenharmony_ci 15467db96d56Sopenharmony_ci def test_service_permanently_unavailable(self): 15477db96d56Sopenharmony_ci #Test service permanently unavailable 15487db96d56Sopenharmony_ci class Handler(NNTPv1Handler): 15497db96d56Sopenharmony_ci welcome = '502 Service permanently unavailable' 15507db96d56Sopenharmony_ci self.check_constructor_error_conditions( 15517db96d56Sopenharmony_ci Handler, nntplib.NNTPPermanentError, Handler.welcome) 15527db96d56Sopenharmony_ci 15537db96d56Sopenharmony_ci def test_bad_capabilities(self): 15547db96d56Sopenharmony_ci #Test a bad capabilities response 15557db96d56Sopenharmony_ci class Handler(NNTPv1Handler): 15567db96d56Sopenharmony_ci def handle_CAPABILITIES(self): 15577db96d56Sopenharmony_ci self.push_lit(capabilities_response) 15587db96d56Sopenharmony_ci capabilities_response = '201 bad capability' 15597db96d56Sopenharmony_ci self.check_constructor_error_conditions( 15607db96d56Sopenharmony_ci Handler, nntplib.NNTPReplyError, capabilities_response) 15617db96d56Sopenharmony_ci 15627db96d56Sopenharmony_ci def test_login_aborted(self): 15637db96d56Sopenharmony_ci #Test a bad authinfo response 15647db96d56Sopenharmony_ci login = 't@e.com' 15657db96d56Sopenharmony_ci password = 'python' 15667db96d56Sopenharmony_ci class Handler(NNTPv1Handler): 15677db96d56Sopenharmony_ci def handle_AUTHINFO(self, *args): 15687db96d56Sopenharmony_ci self.push_lit(authinfo_response) 15697db96d56Sopenharmony_ci authinfo_response = '503 Mechanism not recognized' 15707db96d56Sopenharmony_ci self.check_constructor_error_conditions( 15717db96d56Sopenharmony_ci Handler, nntplib.NNTPPermanentError, authinfo_response, 15727db96d56Sopenharmony_ci login, password) 15737db96d56Sopenharmony_ci 15747db96d56Sopenharmony_ciclass bypass_context: 15757db96d56Sopenharmony_ci """Bypass encryption and actual SSL module""" 15767db96d56Sopenharmony_ci def wrap_socket(sock, **args): 15777db96d56Sopenharmony_ci return sock 15787db96d56Sopenharmony_ci 15797db96d56Sopenharmony_ci@unittest.skipUnless(ssl, 'requires SSL support') 15807db96d56Sopenharmony_ciclass MockSslTests(MockSocketTests): 15817db96d56Sopenharmony_ci @staticmethod 15827db96d56Sopenharmony_ci def nntp_class(*pos, **kw): 15837db96d56Sopenharmony_ci return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw) 15847db96d56Sopenharmony_ci 15857db96d56Sopenharmony_ci 15867db96d56Sopenharmony_ciclass LocalServerTests(unittest.TestCase): 15877db96d56Sopenharmony_ci def setUp(self): 15887db96d56Sopenharmony_ci sock = socket.socket() 15897db96d56Sopenharmony_ci port = socket_helper.bind_port(sock) 15907db96d56Sopenharmony_ci sock.listen() 15917db96d56Sopenharmony_ci self.background = threading.Thread( 15927db96d56Sopenharmony_ci target=self.run_server, args=(sock,)) 15937db96d56Sopenharmony_ci self.background.start() 15947db96d56Sopenharmony_ci self.addCleanup(self.background.join) 15957db96d56Sopenharmony_ci 15967db96d56Sopenharmony_ci self.nntp = self.enterContext(NNTP(socket_helper.HOST, port, usenetrc=False)) 15977db96d56Sopenharmony_ci 15987db96d56Sopenharmony_ci def run_server(self, sock): 15997db96d56Sopenharmony_ci # Could be generalized to handle more commands in separate methods 16007db96d56Sopenharmony_ci with sock: 16017db96d56Sopenharmony_ci [client, _] = sock.accept() 16027db96d56Sopenharmony_ci with contextlib.ExitStack() as cleanup: 16037db96d56Sopenharmony_ci cleanup.enter_context(client) 16047db96d56Sopenharmony_ci reader = cleanup.enter_context(client.makefile('rb')) 16057db96d56Sopenharmony_ci client.sendall(b'200 Server ready\r\n') 16067db96d56Sopenharmony_ci while True: 16077db96d56Sopenharmony_ci cmd = reader.readline() 16087db96d56Sopenharmony_ci if cmd == b'CAPABILITIES\r\n': 16097db96d56Sopenharmony_ci client.sendall( 16107db96d56Sopenharmony_ci b'101 Capability list:\r\n' 16117db96d56Sopenharmony_ci b'VERSION 2\r\n' 16127db96d56Sopenharmony_ci b'STARTTLS\r\n' 16137db96d56Sopenharmony_ci b'.\r\n' 16147db96d56Sopenharmony_ci ) 16157db96d56Sopenharmony_ci elif cmd == b'STARTTLS\r\n': 16167db96d56Sopenharmony_ci reader.close() 16177db96d56Sopenharmony_ci client.sendall(b'382 Begin TLS negotiation now\r\n') 16187db96d56Sopenharmony_ci context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 16197db96d56Sopenharmony_ci context.load_cert_chain(certfile) 16207db96d56Sopenharmony_ci client = context.wrap_socket( 16217db96d56Sopenharmony_ci client, server_side=True) 16227db96d56Sopenharmony_ci cleanup.enter_context(client) 16237db96d56Sopenharmony_ci reader = cleanup.enter_context(client.makefile('rb')) 16247db96d56Sopenharmony_ci elif cmd == b'QUIT\r\n': 16257db96d56Sopenharmony_ci client.sendall(b'205 Bye!\r\n') 16267db96d56Sopenharmony_ci break 16277db96d56Sopenharmony_ci else: 16287db96d56Sopenharmony_ci raise ValueError('Unexpected command {!r}'.format(cmd)) 16297db96d56Sopenharmony_ci 16307db96d56Sopenharmony_ci @unittest.skipUnless(ssl, 'requires SSL support') 16317db96d56Sopenharmony_ci def test_starttls(self): 16327db96d56Sopenharmony_ci file = self.nntp.file 16337db96d56Sopenharmony_ci sock = self.nntp.sock 16347db96d56Sopenharmony_ci self.nntp.starttls() 16357db96d56Sopenharmony_ci # Check that the socket and internal pseudo-file really were 16367db96d56Sopenharmony_ci # changed. 16377db96d56Sopenharmony_ci self.assertNotEqual(file, self.nntp.file) 16387db96d56Sopenharmony_ci self.assertNotEqual(sock, self.nntp.sock) 16397db96d56Sopenharmony_ci # Check that the new socket really is an SSL one 16407db96d56Sopenharmony_ci self.assertIsInstance(self.nntp.sock, ssl.SSLSocket) 16417db96d56Sopenharmony_ci # Check that trying starttls when it's already active fails. 16427db96d56Sopenharmony_ci self.assertRaises(ValueError, self.nntp.starttls) 16437db96d56Sopenharmony_ci 16447db96d56Sopenharmony_ci 16457db96d56Sopenharmony_ciif __name__ == "__main__": 16467db96d56Sopenharmony_ci unittest.main() 1647