17db96d56Sopenharmony_ciimport io
27db96d56Sopenharmony_ciimport socket
37db96d56Sopenharmony_ciimport datetime
47db96d56Sopenharmony_ciimport textwrap
57db96d56Sopenharmony_ciimport unittest
67db96d56Sopenharmony_ciimport functools
77db96d56Sopenharmony_ciimport contextlib
87db96d56Sopenharmony_ciimport os.path
97db96d56Sopenharmony_ciimport re
107db96d56Sopenharmony_ciimport threading
117db96d56Sopenharmony_ci
127db96d56Sopenharmony_cifrom test import support
137db96d56Sopenharmony_cifrom test.support import socket_helper, warnings_helper
147db96d56Sopenharmony_cinntplib = warnings_helper.import_deprecated("nntplib")
157db96d56Sopenharmony_cifrom nntplib import NNTP, GroupInfo
167db96d56Sopenharmony_cifrom unittest.mock import patch
177db96d56Sopenharmony_citry:
187db96d56Sopenharmony_ci    import ssl
197db96d56Sopenharmony_ciexcept ImportError:
207db96d56Sopenharmony_ci    ssl = None
217db96d56Sopenharmony_ci
227db96d56Sopenharmony_ci
237db96d56Sopenharmony_cicertfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem')
247db96d56Sopenharmony_ci
257db96d56Sopenharmony_ciif ssl is not None:
267db96d56Sopenharmony_ci    SSLError = ssl.SSLError
277db96d56Sopenharmony_cielse:
287db96d56Sopenharmony_ci    class SSLError(Exception):
297db96d56Sopenharmony_ci        """Non-existent exception class when we lack SSL support."""
307db96d56Sopenharmony_ci        reason = "This will never be raised."
317db96d56Sopenharmony_ci
327db96d56Sopenharmony_ci# TODO:
337db96d56Sopenharmony_ci# - test the `file` arg to more commands
347db96d56Sopenharmony_ci# - test error conditions
357db96d56Sopenharmony_ci# - test auth and `usenetrc`
367db96d56Sopenharmony_ci
377db96d56Sopenharmony_ci
387db96d56Sopenharmony_ciclass NetworkedNNTPTestsMixin:
397db96d56Sopenharmony_ci
407db96d56Sopenharmony_ci    ssl_context = None
417db96d56Sopenharmony_ci
427db96d56Sopenharmony_ci    def test_welcome(self):
437db96d56Sopenharmony_ci        welcome = self.server.getwelcome()
447db96d56Sopenharmony_ci        self.assertEqual(str, type(welcome))
457db96d56Sopenharmony_ci
467db96d56Sopenharmony_ci    def test_help(self):
477db96d56Sopenharmony_ci        resp, lines = self.server.help()
487db96d56Sopenharmony_ci        self.assertTrue(resp.startswith("100 "), resp)
497db96d56Sopenharmony_ci        for line in lines:
507db96d56Sopenharmony_ci            self.assertEqual(str, type(line))
517db96d56Sopenharmony_ci
527db96d56Sopenharmony_ci    def test_list(self):
537db96d56Sopenharmony_ci        resp, groups = self.server.list()
547db96d56Sopenharmony_ci        if len(groups) > 0:
557db96d56Sopenharmony_ci            self.assertEqual(GroupInfo, type(groups[0]))
567db96d56Sopenharmony_ci            self.assertEqual(str, type(groups[0].group))
577db96d56Sopenharmony_ci
587db96d56Sopenharmony_ci    def test_list_active(self):
597db96d56Sopenharmony_ci        resp, groups = self.server.list(self.GROUP_PAT)
607db96d56Sopenharmony_ci        if len(groups) > 0:
617db96d56Sopenharmony_ci            self.assertEqual(GroupInfo, type(groups[0]))
627db96d56Sopenharmony_ci            self.assertEqual(str, type(groups[0].group))
637db96d56Sopenharmony_ci
647db96d56Sopenharmony_ci    def test_unknown_command(self):
657db96d56Sopenharmony_ci        with self.assertRaises(nntplib.NNTPPermanentError) as cm:
667db96d56Sopenharmony_ci            self.server._shortcmd("XYZZY")
677db96d56Sopenharmony_ci        resp = cm.exception.response
687db96d56Sopenharmony_ci        self.assertTrue(resp.startswith("500 "), resp)
697db96d56Sopenharmony_ci
707db96d56Sopenharmony_ci    def test_newgroups(self):
717db96d56Sopenharmony_ci        # gmane gets a constant influx of new groups.  In order not to stress
727db96d56Sopenharmony_ci        # the server too much, we choose a recent date in the past.
737db96d56Sopenharmony_ci        dt = datetime.date.today() - datetime.timedelta(days=7)
747db96d56Sopenharmony_ci        resp, groups = self.server.newgroups(dt)
757db96d56Sopenharmony_ci        if len(groups) > 0:
767db96d56Sopenharmony_ci            self.assertIsInstance(groups[0], GroupInfo)
777db96d56Sopenharmony_ci            self.assertIsInstance(groups[0].group, str)
787db96d56Sopenharmony_ci
797db96d56Sopenharmony_ci    def test_description(self):
807db96d56Sopenharmony_ci        def _check_desc(desc):
817db96d56Sopenharmony_ci            # Sanity checks
827db96d56Sopenharmony_ci            self.assertIsInstance(desc, str)
837db96d56Sopenharmony_ci            self.assertNotIn(self.GROUP_NAME, desc)
847db96d56Sopenharmony_ci        desc = self.server.description(self.GROUP_NAME)
857db96d56Sopenharmony_ci        _check_desc(desc)
867db96d56Sopenharmony_ci        # Another sanity check
877db96d56Sopenharmony_ci        self.assertIn(self.DESC, desc)
887db96d56Sopenharmony_ci        # With a pattern
897db96d56Sopenharmony_ci        desc = self.server.description(self.GROUP_PAT)
907db96d56Sopenharmony_ci        _check_desc(desc)
917db96d56Sopenharmony_ci        # Shouldn't exist
927db96d56Sopenharmony_ci        desc = self.server.description("zk.brrtt.baz")
937db96d56Sopenharmony_ci        self.assertEqual(desc, '')
947db96d56Sopenharmony_ci
957db96d56Sopenharmony_ci    def test_descriptions(self):
967db96d56Sopenharmony_ci        resp, descs = self.server.descriptions(self.GROUP_PAT)
977db96d56Sopenharmony_ci        # 215 for LIST NEWSGROUPS, 282 for XGTITLE
987db96d56Sopenharmony_ci        self.assertTrue(
997db96d56Sopenharmony_ci            resp.startswith("215 ") or resp.startswith("282 "), resp)
1007db96d56Sopenharmony_ci        self.assertIsInstance(descs, dict)
1017db96d56Sopenharmony_ci        desc = descs[self.GROUP_NAME]
1027db96d56Sopenharmony_ci        self.assertEqual(desc, self.server.description(self.GROUP_NAME))
1037db96d56Sopenharmony_ci
1047db96d56Sopenharmony_ci    def test_group(self):
1057db96d56Sopenharmony_ci        result = self.server.group(self.GROUP_NAME)
1067db96d56Sopenharmony_ci        self.assertEqual(5, len(result))
1077db96d56Sopenharmony_ci        resp, count, first, last, group = result
1087db96d56Sopenharmony_ci        self.assertEqual(group, self.GROUP_NAME)
1097db96d56Sopenharmony_ci        self.assertIsInstance(count, int)
1107db96d56Sopenharmony_ci        self.assertIsInstance(first, int)
1117db96d56Sopenharmony_ci        self.assertIsInstance(last, int)
1127db96d56Sopenharmony_ci        self.assertLessEqual(first, last)
1137db96d56Sopenharmony_ci        self.assertTrue(resp.startswith("211 "), resp)
1147db96d56Sopenharmony_ci
1157db96d56Sopenharmony_ci    def test_date(self):
1167db96d56Sopenharmony_ci        resp, date = self.server.date()
1177db96d56Sopenharmony_ci        self.assertIsInstance(date, datetime.datetime)
1187db96d56Sopenharmony_ci        # Sanity check
1197db96d56Sopenharmony_ci        self.assertGreaterEqual(date.year, 1995)
1207db96d56Sopenharmony_ci        self.assertLessEqual(date.year, 2030)
1217db96d56Sopenharmony_ci
1227db96d56Sopenharmony_ci    def _check_art_dict(self, art_dict):
1237db96d56Sopenharmony_ci        # Some sanity checks for a field dictionary returned by OVER / XOVER
1247db96d56Sopenharmony_ci        self.assertIsInstance(art_dict, dict)
1257db96d56Sopenharmony_ci        # NNTP has 7 mandatory fields
1267db96d56Sopenharmony_ci        self.assertGreaterEqual(art_dict.keys(),
1277db96d56Sopenharmony_ci            {"subject", "from", "date", "message-id",
1287db96d56Sopenharmony_ci             "references", ":bytes", ":lines"}
1297db96d56Sopenharmony_ci            )
1307db96d56Sopenharmony_ci        for v in art_dict.values():
1317db96d56Sopenharmony_ci            self.assertIsInstance(v, (str, type(None)))
1327db96d56Sopenharmony_ci
1337db96d56Sopenharmony_ci    def test_xover(self):
1347db96d56Sopenharmony_ci        resp, count, first, last, name = self.server.group(self.GROUP_NAME)
1357db96d56Sopenharmony_ci        resp, lines = self.server.xover(last - 5, last)
1367db96d56Sopenharmony_ci        if len(lines) == 0:
1377db96d56Sopenharmony_ci            self.skipTest("no articles retrieved")
1387db96d56Sopenharmony_ci        # The 'last' article is not necessarily part of the output (cancelled?)
1397db96d56Sopenharmony_ci        art_num, art_dict = lines[0]
1407db96d56Sopenharmony_ci        self.assertGreaterEqual(art_num, last - 5)
1417db96d56Sopenharmony_ci        self.assertLessEqual(art_num, last)
1427db96d56Sopenharmony_ci        self._check_art_dict(art_dict)
1437db96d56Sopenharmony_ci
1447db96d56Sopenharmony_ci    @unittest.skipIf(True, 'temporarily skipped until a permanent solution'
1457db96d56Sopenharmony_ci                           ' is found for issue #28971')
1467db96d56Sopenharmony_ci    def test_over(self):
1477db96d56Sopenharmony_ci        resp, count, first, last, name = self.server.group(self.GROUP_NAME)
1487db96d56Sopenharmony_ci        start = last - 10
1497db96d56Sopenharmony_ci        # The "start-" article range form
1507db96d56Sopenharmony_ci        resp, lines = self.server.over((start, None))
1517db96d56Sopenharmony_ci        art_num, art_dict = lines[0]
1527db96d56Sopenharmony_ci        self._check_art_dict(art_dict)
1537db96d56Sopenharmony_ci        # The "start-end" article range form
1547db96d56Sopenharmony_ci        resp, lines = self.server.over((start, last))
1557db96d56Sopenharmony_ci        art_num, art_dict = lines[-1]
1567db96d56Sopenharmony_ci        # The 'last' article is not necessarily part of the output (cancelled?)
1577db96d56Sopenharmony_ci        self.assertGreaterEqual(art_num, start)
1587db96d56Sopenharmony_ci        self.assertLessEqual(art_num, last)
1597db96d56Sopenharmony_ci        self._check_art_dict(art_dict)
1607db96d56Sopenharmony_ci        # XXX The "message_id" form is unsupported by gmane
1617db96d56Sopenharmony_ci        # 503 Overview by message-ID unsupported
1627db96d56Sopenharmony_ci
1637db96d56Sopenharmony_ci    def test_xhdr(self):
1647db96d56Sopenharmony_ci        resp, count, first, last, name = self.server.group(self.GROUP_NAME)
1657db96d56Sopenharmony_ci        resp, lines = self.server.xhdr('subject', last)
1667db96d56Sopenharmony_ci        for line in lines:
1677db96d56Sopenharmony_ci            self.assertEqual(str, type(line[1]))
1687db96d56Sopenharmony_ci
1697db96d56Sopenharmony_ci    def check_article_resp(self, resp, article, art_num=None):
1707db96d56Sopenharmony_ci        self.assertIsInstance(article, nntplib.ArticleInfo)
1717db96d56Sopenharmony_ci        if art_num is not None:
1727db96d56Sopenharmony_ci            self.assertEqual(article.number, art_num)
1737db96d56Sopenharmony_ci        for line in article.lines:
1747db96d56Sopenharmony_ci            self.assertIsInstance(line, bytes)
1757db96d56Sopenharmony_ci        # XXX this could exceptionally happen...
1767db96d56Sopenharmony_ci        self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
1777db96d56Sopenharmony_ci
1787db96d56Sopenharmony_ci    @unittest.skipIf(True, "FIXME: see bpo-32128")
1797db96d56Sopenharmony_ci    def test_article_head_body(self):
1807db96d56Sopenharmony_ci        resp, count, first, last, name = self.server.group(self.GROUP_NAME)
1817db96d56Sopenharmony_ci        # Try to find an available article
1827db96d56Sopenharmony_ci        for art_num in (last, first, last - 1):
1837db96d56Sopenharmony_ci            try:
1847db96d56Sopenharmony_ci                resp, head = self.server.head(art_num)
1857db96d56Sopenharmony_ci            except nntplib.NNTPTemporaryError as e:
1867db96d56Sopenharmony_ci                if not e.response.startswith("423 "):
1877db96d56Sopenharmony_ci                    raise
1887db96d56Sopenharmony_ci                # "423 No such article" => choose another one
1897db96d56Sopenharmony_ci                continue
1907db96d56Sopenharmony_ci            break
1917db96d56Sopenharmony_ci        else:
1927db96d56Sopenharmony_ci            self.skipTest("could not find a suitable article number")
1937db96d56Sopenharmony_ci        self.assertTrue(resp.startswith("221 "), resp)
1947db96d56Sopenharmony_ci        self.check_article_resp(resp, head, art_num)
1957db96d56Sopenharmony_ci        resp, body = self.server.body(art_num)
1967db96d56Sopenharmony_ci        self.assertTrue(resp.startswith("222 "), resp)
1977db96d56Sopenharmony_ci        self.check_article_resp(resp, body, art_num)
1987db96d56Sopenharmony_ci        resp, article = self.server.article(art_num)
1997db96d56Sopenharmony_ci        self.assertTrue(resp.startswith("220 "), resp)
2007db96d56Sopenharmony_ci        self.check_article_resp(resp, article, art_num)
2017db96d56Sopenharmony_ci        # Tolerate running the tests from behind a NNTP virus checker
2027db96d56Sopenharmony_ci        denylist = lambda line: line.startswith(b'X-Antivirus')
2037db96d56Sopenharmony_ci        filtered_head_lines = [line for line in head.lines
2047db96d56Sopenharmony_ci                               if not denylist(line)]
2057db96d56Sopenharmony_ci        filtered_lines = [line for line in article.lines
2067db96d56Sopenharmony_ci                          if not denylist(line)]
2077db96d56Sopenharmony_ci        self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
2087db96d56Sopenharmony_ci
2097db96d56Sopenharmony_ci    def test_capabilities(self):
2107db96d56Sopenharmony_ci        # The server under test implements NNTP version 2 and has a
2117db96d56Sopenharmony_ci        # couple of well-known capabilities. Just sanity check that we
2127db96d56Sopenharmony_ci        # got them.
2137db96d56Sopenharmony_ci        def _check_caps(caps):
2147db96d56Sopenharmony_ci            caps_list = caps['LIST']
2157db96d56Sopenharmony_ci            self.assertIsInstance(caps_list, (list, tuple))
2167db96d56Sopenharmony_ci            self.assertIn('OVERVIEW.FMT', caps_list)
2177db96d56Sopenharmony_ci        self.assertGreaterEqual(self.server.nntp_version, 2)
2187db96d56Sopenharmony_ci        _check_caps(self.server.getcapabilities())
2197db96d56Sopenharmony_ci        # This re-emits the command
2207db96d56Sopenharmony_ci        resp, caps = self.server.capabilities()
2217db96d56Sopenharmony_ci        _check_caps(caps)
2227db96d56Sopenharmony_ci
2237db96d56Sopenharmony_ci    def test_zlogin(self):
2247db96d56Sopenharmony_ci        # This test must be the penultimate because further commands will be
2257db96d56Sopenharmony_ci        # refused.
2267db96d56Sopenharmony_ci        baduser = "notarealuser"
2277db96d56Sopenharmony_ci        badpw = "notarealpassword"
2287db96d56Sopenharmony_ci        # Check that bogus credentials cause failure
2297db96d56Sopenharmony_ci        self.assertRaises(nntplib.NNTPError, self.server.login,
2307db96d56Sopenharmony_ci                          user=baduser, password=badpw, usenetrc=False)
2317db96d56Sopenharmony_ci        # FIXME: We should check that correct credentials succeed, but that
2327db96d56Sopenharmony_ci        # would require valid details for some server somewhere to be in the
2337db96d56Sopenharmony_ci        # test suite, I think. Gmane is anonymous, at least as used for the
2347db96d56Sopenharmony_ci        # other tests.
2357db96d56Sopenharmony_ci
2367db96d56Sopenharmony_ci    def test_zzquit(self):
2377db96d56Sopenharmony_ci        # This test must be called last, hence the name
2387db96d56Sopenharmony_ci        cls = type(self)
2397db96d56Sopenharmony_ci        try:
2407db96d56Sopenharmony_ci            self.server.quit()
2417db96d56Sopenharmony_ci        finally:
2427db96d56Sopenharmony_ci            cls.server = None
2437db96d56Sopenharmony_ci
2447db96d56Sopenharmony_ci    @classmethod
2457db96d56Sopenharmony_ci    def wrap_methods(cls):
2467db96d56Sopenharmony_ci        # Wrap all methods in a transient_internet() exception catcher
2477db96d56Sopenharmony_ci        # XXX put a generic version in test.support?
2487db96d56Sopenharmony_ci        def wrap_meth(meth):
2497db96d56Sopenharmony_ci            @functools.wraps(meth)
2507db96d56Sopenharmony_ci            def wrapped(self):
2517db96d56Sopenharmony_ci                with socket_helper.transient_internet(self.NNTP_HOST):
2527db96d56Sopenharmony_ci                    meth(self)
2537db96d56Sopenharmony_ci            return wrapped
2547db96d56Sopenharmony_ci        for name in dir(cls):
2557db96d56Sopenharmony_ci            if not name.startswith('test_'):
2567db96d56Sopenharmony_ci                continue
2577db96d56Sopenharmony_ci            meth = getattr(cls, name)
2587db96d56Sopenharmony_ci            if not callable(meth):
2597db96d56Sopenharmony_ci                continue
2607db96d56Sopenharmony_ci            # Need to use a closure so that meth remains bound to its current
2617db96d56Sopenharmony_ci            # value
2627db96d56Sopenharmony_ci            setattr(cls, name, wrap_meth(meth))
2637db96d56Sopenharmony_ci
2647db96d56Sopenharmony_ci    def test_timeout(self):
2657db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
2667db96d56Sopenharmony_ci            self.NNTP_CLASS(self.NNTP_HOST, timeout=0, usenetrc=False)
2677db96d56Sopenharmony_ci
2687db96d56Sopenharmony_ci    def test_with_statement(self):
2697db96d56Sopenharmony_ci        def is_connected():
2707db96d56Sopenharmony_ci            if not hasattr(server, 'file'):
2717db96d56Sopenharmony_ci                return False
2727db96d56Sopenharmony_ci            try:
2737db96d56Sopenharmony_ci                server.help()
2747db96d56Sopenharmony_ci            except (OSError, EOFError):
2757db96d56Sopenharmony_ci                return False
2767db96d56Sopenharmony_ci            return True
2777db96d56Sopenharmony_ci
2787db96d56Sopenharmony_ci        kwargs = dict(
2797db96d56Sopenharmony_ci            timeout=support.INTERNET_TIMEOUT,
2807db96d56Sopenharmony_ci            usenetrc=False
2817db96d56Sopenharmony_ci        )
2827db96d56Sopenharmony_ci        if self.ssl_context is not None:
2837db96d56Sopenharmony_ci            kwargs["ssl_context"] = self.ssl_context
2847db96d56Sopenharmony_ci
2857db96d56Sopenharmony_ci        try:
2867db96d56Sopenharmony_ci            server = self.NNTP_CLASS(self.NNTP_HOST, **kwargs)
2877db96d56Sopenharmony_ci            with server:
2887db96d56Sopenharmony_ci                self.assertTrue(is_connected())
2897db96d56Sopenharmony_ci                self.assertTrue(server.help())
2907db96d56Sopenharmony_ci            self.assertFalse(is_connected())
2917db96d56Sopenharmony_ci
2927db96d56Sopenharmony_ci            server = self.NNTP_CLASS(self.NNTP_HOST, **kwargs)
2937db96d56Sopenharmony_ci            with server:
2947db96d56Sopenharmony_ci                server.quit()
2957db96d56Sopenharmony_ci            self.assertFalse(is_connected())
2967db96d56Sopenharmony_ci        except SSLError as ssl_err:
2977db96d56Sopenharmony_ci            # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
2987db96d56Sopenharmony_ci            if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
2997db96d56Sopenharmony_ci                raise unittest.SkipTest(f"Got {ssl_err} connecting "
3007db96d56Sopenharmony_ci                                        f"to {self.NNTP_HOST!r}")
3017db96d56Sopenharmony_ci            raise
3027db96d56Sopenharmony_ci
3037db96d56Sopenharmony_ci
3047db96d56Sopenharmony_ciNetworkedNNTPTestsMixin.wrap_methods()
3057db96d56Sopenharmony_ci
3067db96d56Sopenharmony_ci
3077db96d56Sopenharmony_ciEOF_ERRORS = (EOFError,)
3087db96d56Sopenharmony_ciif ssl is not None:
3097db96d56Sopenharmony_ci    EOF_ERRORS += (ssl.SSLEOFError,)
3107db96d56Sopenharmony_ci
3117db96d56Sopenharmony_ci
3127db96d56Sopenharmony_ciclass NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
3137db96d56Sopenharmony_ci    # This server supports STARTTLS (gmane doesn't)
3147db96d56Sopenharmony_ci    NNTP_HOST = 'news.trigofacile.com'
3157db96d56Sopenharmony_ci    GROUP_NAME = 'fr.comp.lang.python'
3167db96d56Sopenharmony_ci    GROUP_PAT = 'fr.comp.lang.*'
3177db96d56Sopenharmony_ci    DESC = 'Python'
3187db96d56Sopenharmony_ci
3197db96d56Sopenharmony_ci    NNTP_CLASS = NNTP
3207db96d56Sopenharmony_ci
3217db96d56Sopenharmony_ci    @classmethod
3227db96d56Sopenharmony_ci    def setUpClass(cls):
3237db96d56Sopenharmony_ci        support.requires("network")
3247db96d56Sopenharmony_ci        kwargs = dict(
3257db96d56Sopenharmony_ci            timeout=support.INTERNET_TIMEOUT,
3267db96d56Sopenharmony_ci            usenetrc=False
3277db96d56Sopenharmony_ci        )
3287db96d56Sopenharmony_ci        if cls.ssl_context is not None:
3297db96d56Sopenharmony_ci            kwargs["ssl_context"] = cls.ssl_context
3307db96d56Sopenharmony_ci        with socket_helper.transient_internet(cls.NNTP_HOST):
3317db96d56Sopenharmony_ci            try:
3327db96d56Sopenharmony_ci                cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, **kwargs)
3337db96d56Sopenharmony_ci            except SSLError as ssl_err:
3347db96d56Sopenharmony_ci                # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
3357db96d56Sopenharmony_ci                if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
3367db96d56Sopenharmony_ci                    raise unittest.SkipTest(f"{cls} got {ssl_err} connecting "
3377db96d56Sopenharmony_ci                                            f"to {cls.NNTP_HOST!r}")
3387db96d56Sopenharmony_ci                print(cls.NNTP_HOST)
3397db96d56Sopenharmony_ci                raise
3407db96d56Sopenharmony_ci            except EOF_ERRORS:
3417db96d56Sopenharmony_ci                raise unittest.SkipTest(f"{cls} got EOF error on connecting "
3427db96d56Sopenharmony_ci                                        f"to {cls.NNTP_HOST!r}")
3437db96d56Sopenharmony_ci
3447db96d56Sopenharmony_ci    @classmethod
3457db96d56Sopenharmony_ci    def tearDownClass(cls):
3467db96d56Sopenharmony_ci        if cls.server is not None:
3477db96d56Sopenharmony_ci            cls.server.quit()
3487db96d56Sopenharmony_ci
3497db96d56Sopenharmony_ci@unittest.skipUnless(ssl, 'requires SSL support')
3507db96d56Sopenharmony_ciclass NetworkedNNTP_SSLTests(NetworkedNNTPTests):
3517db96d56Sopenharmony_ci
3527db96d56Sopenharmony_ci    # Technical limits for this public NNTP server (see http://www.aioe.org):
3537db96d56Sopenharmony_ci    # "Only two concurrent connections per IP address are allowed and
3547db96d56Sopenharmony_ci    # 400 connections per day are accepted from each IP address."
3557db96d56Sopenharmony_ci
3567db96d56Sopenharmony_ci    NNTP_HOST = 'nntp.aioe.org'
3577db96d56Sopenharmony_ci    # bpo-42794: aioe.test is one of the official groups on this server
3587db96d56Sopenharmony_ci    # used for testing: https://news.aioe.org/manual/aioe-hierarchy/
3597db96d56Sopenharmony_ci    GROUP_NAME = 'aioe.test'
3607db96d56Sopenharmony_ci    GROUP_PAT = 'aioe.*'
3617db96d56Sopenharmony_ci    DESC = 'test'
3627db96d56Sopenharmony_ci
3637db96d56Sopenharmony_ci    NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None)
3647db96d56Sopenharmony_ci
3657db96d56Sopenharmony_ci    # Disabled as it produces too much data
3667db96d56Sopenharmony_ci    test_list = None
3677db96d56Sopenharmony_ci
3687db96d56Sopenharmony_ci    # Disabled as the connection will already be encrypted.
3697db96d56Sopenharmony_ci    test_starttls = None
3707db96d56Sopenharmony_ci
3717db96d56Sopenharmony_ci    if ssl is not None:
3727db96d56Sopenharmony_ci        ssl_context = ssl._create_unverified_context()
3737db96d56Sopenharmony_ci        ssl_context.set_ciphers("DEFAULT")
3747db96d56Sopenharmony_ci        ssl_context.maximum_version = ssl.TLSVersion.TLSv1_2
3757db96d56Sopenharmony_ci
3767db96d56Sopenharmony_ci#
3777db96d56Sopenharmony_ci# Non-networked tests using a local server (or something mocking it).
3787db96d56Sopenharmony_ci#
3797db96d56Sopenharmony_ci
3807db96d56Sopenharmony_ciclass _NNTPServerIO(io.RawIOBase):
3817db96d56Sopenharmony_ci    """A raw IO object allowing NNTP commands to be received and processed
3827db96d56Sopenharmony_ci    by a handler.  The handler can push responses which can then be read
3837db96d56Sopenharmony_ci    from the IO object."""
3847db96d56Sopenharmony_ci
3857db96d56Sopenharmony_ci    def __init__(self, handler):
3867db96d56Sopenharmony_ci        io.RawIOBase.__init__(self)
3877db96d56Sopenharmony_ci        # The channel from the client
3887db96d56Sopenharmony_ci        self.c2s = io.BytesIO()
3897db96d56Sopenharmony_ci        # The channel to the client
3907db96d56Sopenharmony_ci        self.s2c = io.BytesIO()
3917db96d56Sopenharmony_ci        self.handler = handler
3927db96d56Sopenharmony_ci        self.handler.start(self.c2s.readline, self.push_data)
3937db96d56Sopenharmony_ci
3947db96d56Sopenharmony_ci    def readable(self):
3957db96d56Sopenharmony_ci        return True
3967db96d56Sopenharmony_ci
3977db96d56Sopenharmony_ci    def writable(self):
3987db96d56Sopenharmony_ci        return True
3997db96d56Sopenharmony_ci
4007db96d56Sopenharmony_ci    def push_data(self, data):
4017db96d56Sopenharmony_ci        """Push (buffer) some data to send to the client."""
4027db96d56Sopenharmony_ci        pos = self.s2c.tell()
4037db96d56Sopenharmony_ci        self.s2c.seek(0, 2)
4047db96d56Sopenharmony_ci        self.s2c.write(data)
4057db96d56Sopenharmony_ci        self.s2c.seek(pos)
4067db96d56Sopenharmony_ci
4077db96d56Sopenharmony_ci    def write(self, b):
4087db96d56Sopenharmony_ci        """The client sends us some data"""
4097db96d56Sopenharmony_ci        pos = self.c2s.tell()
4107db96d56Sopenharmony_ci        self.c2s.write(b)
4117db96d56Sopenharmony_ci        self.c2s.seek(pos)
4127db96d56Sopenharmony_ci        self.handler.process_pending()
4137db96d56Sopenharmony_ci        return len(b)
4147db96d56Sopenharmony_ci
4157db96d56Sopenharmony_ci    def readinto(self, buf):
4167db96d56Sopenharmony_ci        """The client wants to read a response"""
4177db96d56Sopenharmony_ci        self.handler.process_pending()
4187db96d56Sopenharmony_ci        b = self.s2c.read(len(buf))
4197db96d56Sopenharmony_ci        n = len(b)
4207db96d56Sopenharmony_ci        buf[:n] = b
4217db96d56Sopenharmony_ci        return n
4227db96d56Sopenharmony_ci
4237db96d56Sopenharmony_ci
4247db96d56Sopenharmony_cidef make_mock_file(handler):
4257db96d56Sopenharmony_ci    sio = _NNTPServerIO(handler)
4267db96d56Sopenharmony_ci    # Using BufferedRWPair instead of BufferedRandom ensures the file
4277db96d56Sopenharmony_ci    # isn't seekable.
4287db96d56Sopenharmony_ci    file = io.BufferedRWPair(sio, sio)
4297db96d56Sopenharmony_ci    return (sio, file)
4307db96d56Sopenharmony_ci
4317db96d56Sopenharmony_ci
4327db96d56Sopenharmony_ciclass NNTPServer(nntplib.NNTP):
4337db96d56Sopenharmony_ci
4347db96d56Sopenharmony_ci    def __init__(self, f, host, readermode=None):
4357db96d56Sopenharmony_ci        self.file = f
4367db96d56Sopenharmony_ci        self.host = host
4377db96d56Sopenharmony_ci        self._base_init(readermode)
4387db96d56Sopenharmony_ci
4397db96d56Sopenharmony_ci    def _close(self):
4407db96d56Sopenharmony_ci        self.file.close()
4417db96d56Sopenharmony_ci        del self.file
4427db96d56Sopenharmony_ci
4437db96d56Sopenharmony_ci
4447db96d56Sopenharmony_ciclass MockedNNTPTestsMixin:
4457db96d56Sopenharmony_ci    # Override in derived classes
4467db96d56Sopenharmony_ci    handler_class = None
4477db96d56Sopenharmony_ci
4487db96d56Sopenharmony_ci    def setUp(self):
4497db96d56Sopenharmony_ci        super().setUp()
4507db96d56Sopenharmony_ci        self.make_server()
4517db96d56Sopenharmony_ci
4527db96d56Sopenharmony_ci    def tearDown(self):
4537db96d56Sopenharmony_ci        super().tearDown()
4547db96d56Sopenharmony_ci        del self.server
4557db96d56Sopenharmony_ci
4567db96d56Sopenharmony_ci    def make_server(self, *args, **kwargs):
4577db96d56Sopenharmony_ci        self.handler = self.handler_class()
4587db96d56Sopenharmony_ci        self.sio, file = make_mock_file(self.handler)
4597db96d56Sopenharmony_ci        self.server = NNTPServer(file, 'test.server', *args, **kwargs)
4607db96d56Sopenharmony_ci        return self.server
4617db96d56Sopenharmony_ci
4627db96d56Sopenharmony_ci
4637db96d56Sopenharmony_ciclass MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
4647db96d56Sopenharmony_ci    def setUp(self):
4657db96d56Sopenharmony_ci        super().setUp()
4667db96d56Sopenharmony_ci        self.make_server(readermode=True)
4677db96d56Sopenharmony_ci
4687db96d56Sopenharmony_ci
4697db96d56Sopenharmony_ciclass NNTPv1Handler:
4707db96d56Sopenharmony_ci    """A handler for RFC 977"""
4717db96d56Sopenharmony_ci
4727db96d56Sopenharmony_ci    welcome = "200 NNTP mock server"
4737db96d56Sopenharmony_ci
4747db96d56Sopenharmony_ci    def start(self, readline, push_data):
4757db96d56Sopenharmony_ci        self.in_body = False
4767db96d56Sopenharmony_ci        self.allow_posting = True
4777db96d56Sopenharmony_ci        self._readline = readline
4787db96d56Sopenharmony_ci        self._push_data = push_data
4797db96d56Sopenharmony_ci        self._logged_in = False
4807db96d56Sopenharmony_ci        self._user_sent = False
4817db96d56Sopenharmony_ci        # Our welcome
4827db96d56Sopenharmony_ci        self.handle_welcome()
4837db96d56Sopenharmony_ci
4847db96d56Sopenharmony_ci    def _decode(self, data):
4857db96d56Sopenharmony_ci        return str(data, "utf-8", "surrogateescape")
4867db96d56Sopenharmony_ci
4877db96d56Sopenharmony_ci    def process_pending(self):
4887db96d56Sopenharmony_ci        if self.in_body:
4897db96d56Sopenharmony_ci            while True:
4907db96d56Sopenharmony_ci                line = self._readline()
4917db96d56Sopenharmony_ci                if not line:
4927db96d56Sopenharmony_ci                    return
4937db96d56Sopenharmony_ci                self.body.append(line)
4947db96d56Sopenharmony_ci                if line == b".\r\n":
4957db96d56Sopenharmony_ci                    break
4967db96d56Sopenharmony_ci            try:
4977db96d56Sopenharmony_ci                meth, tokens = self.body_callback
4987db96d56Sopenharmony_ci                meth(*tokens, body=self.body)
4997db96d56Sopenharmony_ci            finally:
5007db96d56Sopenharmony_ci                self.body_callback = None
5017db96d56Sopenharmony_ci                self.body = None
5027db96d56Sopenharmony_ci                self.in_body = False
5037db96d56Sopenharmony_ci        while True:
5047db96d56Sopenharmony_ci            line = self._decode(self._readline())
5057db96d56Sopenharmony_ci            if not line:
5067db96d56Sopenharmony_ci                return
5077db96d56Sopenharmony_ci            if not line.endswith("\r\n"):
5087db96d56Sopenharmony_ci                raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
5097db96d56Sopenharmony_ci            line = line[:-2]
5107db96d56Sopenharmony_ci            cmd, *tokens = line.split()
5117db96d56Sopenharmony_ci            #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
5127db96d56Sopenharmony_ci            meth = getattr(self, "handle_" + cmd.upper(), None)
5137db96d56Sopenharmony_ci            if meth is None:
5147db96d56Sopenharmony_ci                self.handle_unknown()
5157db96d56Sopenharmony_ci            else:
5167db96d56Sopenharmony_ci                try:
5177db96d56Sopenharmony_ci                    meth(*tokens)
5187db96d56Sopenharmony_ci                except Exception as e:
5197db96d56Sopenharmony_ci                    raise ValueError("command failed: {!r}".format(line)) from e
5207db96d56Sopenharmony_ci                else:
5217db96d56Sopenharmony_ci                    if self.in_body:
5227db96d56Sopenharmony_ci                        self.body_callback = meth, tokens
5237db96d56Sopenharmony_ci                        self.body = []
5247db96d56Sopenharmony_ci
5257db96d56Sopenharmony_ci    def expect_body(self):
5267db96d56Sopenharmony_ci        """Flag that the client is expected to post a request body"""
5277db96d56Sopenharmony_ci        self.in_body = True
5287db96d56Sopenharmony_ci
5297db96d56Sopenharmony_ci    def push_data(self, data):
5307db96d56Sopenharmony_ci        """Push some binary data"""
5317db96d56Sopenharmony_ci        self._push_data(data)
5327db96d56Sopenharmony_ci
5337db96d56Sopenharmony_ci    def push_lit(self, lit):
5347db96d56Sopenharmony_ci        """Push a string literal"""
5357db96d56Sopenharmony_ci        lit = textwrap.dedent(lit)
5367db96d56Sopenharmony_ci        lit = "\r\n".join(lit.splitlines()) + "\r\n"
5377db96d56Sopenharmony_ci        lit = lit.encode('utf-8')
5387db96d56Sopenharmony_ci        self.push_data(lit)
5397db96d56Sopenharmony_ci
5407db96d56Sopenharmony_ci    def handle_unknown(self):
5417db96d56Sopenharmony_ci        self.push_lit("500 What?")
5427db96d56Sopenharmony_ci
5437db96d56Sopenharmony_ci    def handle_welcome(self):
5447db96d56Sopenharmony_ci        self.push_lit(self.welcome)
5457db96d56Sopenharmony_ci
5467db96d56Sopenharmony_ci    def handle_QUIT(self):
5477db96d56Sopenharmony_ci        self.push_lit("205 Bye!")
5487db96d56Sopenharmony_ci
5497db96d56Sopenharmony_ci    def handle_DATE(self):
5507db96d56Sopenharmony_ci        self.push_lit("111 20100914001155")
5517db96d56Sopenharmony_ci
5527db96d56Sopenharmony_ci    def handle_GROUP(self, group):
5537db96d56Sopenharmony_ci        if group == "fr.comp.lang.python":
5547db96d56Sopenharmony_ci            self.push_lit("211 486 761 1265 fr.comp.lang.python")
5557db96d56Sopenharmony_ci        else:
5567db96d56Sopenharmony_ci            self.push_lit("411 No such group {}".format(group))
5577db96d56Sopenharmony_ci
5587db96d56Sopenharmony_ci    def handle_HELP(self):
5597db96d56Sopenharmony_ci        self.push_lit("""\
5607db96d56Sopenharmony_ci            100 Legal commands
5617db96d56Sopenharmony_ci              authinfo user Name|pass Password|generic <prog> <args>
5627db96d56Sopenharmony_ci              date
5637db96d56Sopenharmony_ci              help
5647db96d56Sopenharmony_ci            Report problems to <root@example.org>
5657db96d56Sopenharmony_ci            .""")
5667db96d56Sopenharmony_ci
5677db96d56Sopenharmony_ci    def handle_STAT(self, message_spec=None):
5687db96d56Sopenharmony_ci        if message_spec is None:
5697db96d56Sopenharmony_ci            self.push_lit("412 No newsgroup selected")
5707db96d56Sopenharmony_ci        elif message_spec == "3000234":
5717db96d56Sopenharmony_ci            self.push_lit("223 3000234 <45223423@example.com>")
5727db96d56Sopenharmony_ci        elif message_spec == "<45223423@example.com>":
5737db96d56Sopenharmony_ci            self.push_lit("223 0 <45223423@example.com>")
5747db96d56Sopenharmony_ci        else:
5757db96d56Sopenharmony_ci            self.push_lit("430 No Such Article Found")
5767db96d56Sopenharmony_ci
5777db96d56Sopenharmony_ci    def handle_NEXT(self):
5787db96d56Sopenharmony_ci        self.push_lit("223 3000237 <668929@example.org> retrieved")
5797db96d56Sopenharmony_ci
5807db96d56Sopenharmony_ci    def handle_LAST(self):
5817db96d56Sopenharmony_ci        self.push_lit("223 3000234 <45223423@example.com> retrieved")
5827db96d56Sopenharmony_ci
5837db96d56Sopenharmony_ci    def handle_LIST(self, action=None, param=None):
5847db96d56Sopenharmony_ci        if action is None:
5857db96d56Sopenharmony_ci            self.push_lit("""\
5867db96d56Sopenharmony_ci                215 Newsgroups in form "group high low flags".
5877db96d56Sopenharmony_ci                comp.lang.python 0000052340 0000002828 y
5887db96d56Sopenharmony_ci                comp.lang.python.announce 0000001153 0000000993 m
5897db96d56Sopenharmony_ci                free.it.comp.lang.python 0000000002 0000000002 y
5907db96d56Sopenharmony_ci                fr.comp.lang.python 0000001254 0000000760 y
5917db96d56Sopenharmony_ci                free.it.comp.lang.python.learner 0000000000 0000000001 y
5927db96d56Sopenharmony_ci                tw.bbs.comp.lang.python 0000000304 0000000304 y
5937db96d56Sopenharmony_ci                .""")
5947db96d56Sopenharmony_ci        elif action == "ACTIVE":
5957db96d56Sopenharmony_ci            if param == "*distutils*":
5967db96d56Sopenharmony_ci                self.push_lit("""\
5977db96d56Sopenharmony_ci                    215 Newsgroups in form "group high low flags"
5987db96d56Sopenharmony_ci                    gmane.comp.python.distutils.devel 0000014104 0000000001 m
5997db96d56Sopenharmony_ci                    gmane.comp.python.distutils.cvs 0000000000 0000000001 m
6007db96d56Sopenharmony_ci                    .""")
6017db96d56Sopenharmony_ci            else:
6027db96d56Sopenharmony_ci                self.push_lit("""\
6037db96d56Sopenharmony_ci                    215 Newsgroups in form "group high low flags"
6047db96d56Sopenharmony_ci                    .""")
6057db96d56Sopenharmony_ci        elif action == "OVERVIEW.FMT":
6067db96d56Sopenharmony_ci            self.push_lit("""\
6077db96d56Sopenharmony_ci                215 Order of fields in overview database.
6087db96d56Sopenharmony_ci                Subject:
6097db96d56Sopenharmony_ci                From:
6107db96d56Sopenharmony_ci                Date:
6117db96d56Sopenharmony_ci                Message-ID:
6127db96d56Sopenharmony_ci                References:
6137db96d56Sopenharmony_ci                Bytes:
6147db96d56Sopenharmony_ci                Lines:
6157db96d56Sopenharmony_ci                Xref:full
6167db96d56Sopenharmony_ci                .""")
6177db96d56Sopenharmony_ci        elif action == "NEWSGROUPS":
6187db96d56Sopenharmony_ci            assert param is not None
6197db96d56Sopenharmony_ci            if param == "comp.lang.python":
6207db96d56Sopenharmony_ci                self.push_lit("""\
6217db96d56Sopenharmony_ci                    215 Descriptions in form "group description".
6227db96d56Sopenharmony_ci                    comp.lang.python\tThe Python computer language.
6237db96d56Sopenharmony_ci                    .""")
6247db96d56Sopenharmony_ci            elif param == "comp.lang.python*":
6257db96d56Sopenharmony_ci                self.push_lit("""\
6267db96d56Sopenharmony_ci                    215 Descriptions in form "group description".
6277db96d56Sopenharmony_ci                    comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
6287db96d56Sopenharmony_ci                    comp.lang.python\tThe Python computer language.
6297db96d56Sopenharmony_ci                    .""")
6307db96d56Sopenharmony_ci            else:
6317db96d56Sopenharmony_ci                self.push_lit("""\
6327db96d56Sopenharmony_ci                    215 Descriptions in form "group description".
6337db96d56Sopenharmony_ci                    .""")
6347db96d56Sopenharmony_ci        else:
6357db96d56Sopenharmony_ci            self.push_lit('501 Unknown LIST keyword')
6367db96d56Sopenharmony_ci
6377db96d56Sopenharmony_ci    def handle_NEWNEWS(self, group, date_str, time_str):
6387db96d56Sopenharmony_ci        # We hard code different return messages depending on passed
6397db96d56Sopenharmony_ci        # argument and date syntax.
6407db96d56Sopenharmony_ci        if (group == "comp.lang.python" and date_str == "20100913"
6417db96d56Sopenharmony_ci            and time_str == "082004"):
6427db96d56Sopenharmony_ci            # Date was passed in RFC 3977 format (NNTP "v2")
6437db96d56Sopenharmony_ci            self.push_lit("""\
6447db96d56Sopenharmony_ci                230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
6457db96d56Sopenharmony_ci                <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
6467db96d56Sopenharmony_ci                <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
6477db96d56Sopenharmony_ci                .""")
6487db96d56Sopenharmony_ci        elif (group == "comp.lang.python" and date_str == "100913"
6497db96d56Sopenharmony_ci            and time_str == "082004"):
6507db96d56Sopenharmony_ci            # Date was passed in RFC 977 format (NNTP "v1")
6517db96d56Sopenharmony_ci            self.push_lit("""\
6527db96d56Sopenharmony_ci                230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
6537db96d56Sopenharmony_ci                <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
6547db96d56Sopenharmony_ci                <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
6557db96d56Sopenharmony_ci                .""")
6567db96d56Sopenharmony_ci        elif (group == 'comp.lang.python' and
6577db96d56Sopenharmony_ci              date_str in ('20100101', '100101') and
6587db96d56Sopenharmony_ci              time_str == '090000'):
6597db96d56Sopenharmony_ci            self.push_lit('too long line' * 3000 +
6607db96d56Sopenharmony_ci                          '\n.')
6617db96d56Sopenharmony_ci        else:
6627db96d56Sopenharmony_ci            self.push_lit("""\
6637db96d56Sopenharmony_ci                230 An empty list of newsarticles follows
6647db96d56Sopenharmony_ci                .""")
6657db96d56Sopenharmony_ci        # (Note for experiments: many servers disable NEWNEWS.
6667db96d56Sopenharmony_ci        #  As of this writing, sicinfo3.epfl.ch doesn't.)
6677db96d56Sopenharmony_ci
6687db96d56Sopenharmony_ci    def handle_XOVER(self, message_spec):
6697db96d56Sopenharmony_ci        if message_spec == "57-59":
6707db96d56Sopenharmony_ci            self.push_lit(
6717db96d56Sopenharmony_ci                "224 Overview information for 57-58 follows\n"
6727db96d56Sopenharmony_ci                "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
6737db96d56Sopenharmony_ci                    "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
6747db96d56Sopenharmony_ci                    "\tSat, 19 Jun 2010 18:04:08 -0400"
6757db96d56Sopenharmony_ci                    "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
6767db96d56Sopenharmony_ci                    "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
6777db96d56Sopenharmony_ci                    "\tXref: news.gmane.io gmane.comp.python.authors:57"
6787db96d56Sopenharmony_ci                    "\n"
6797db96d56Sopenharmony_ci                "58\tLooking for a few good bloggers"
6807db96d56Sopenharmony_ci                    "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
6817db96d56Sopenharmony_ci                    "\tThu, 22 Jul 2010 09:14:14 -0400"
6827db96d56Sopenharmony_ci                    "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
6837db96d56Sopenharmony_ci                    "\t\t6683\t16"
6847db96d56Sopenharmony_ci                    "\t"
6857db96d56Sopenharmony_ci                    "\n"
6867db96d56Sopenharmony_ci                # A UTF-8 overview line from fr.comp.lang.python
6877db96d56Sopenharmony_ci                "59\tRe: Message d'erreur incompréhensible (par moi)"
6887db96d56Sopenharmony_ci                    "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
6897db96d56Sopenharmony_ci                    "\tWed, 15 Sep 2010 18:09:15 +0200"
6907db96d56Sopenharmony_ci                    "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
6917db96d56Sopenharmony_ci                    "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
6927db96d56Sopenharmony_ci                    "\tXref: saria.nerim.net fr.comp.lang.python:1265"
6937db96d56Sopenharmony_ci                    "\n"
6947db96d56Sopenharmony_ci                ".\n")
6957db96d56Sopenharmony_ci        else:
6967db96d56Sopenharmony_ci            self.push_lit("""\
6977db96d56Sopenharmony_ci                224 No articles
6987db96d56Sopenharmony_ci                .""")
6997db96d56Sopenharmony_ci
7007db96d56Sopenharmony_ci    def handle_POST(self, *, body=None):
7017db96d56Sopenharmony_ci        if body is None:
7027db96d56Sopenharmony_ci            if self.allow_posting:
7037db96d56Sopenharmony_ci                self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
7047db96d56Sopenharmony_ci                self.expect_body()
7057db96d56Sopenharmony_ci            else:
7067db96d56Sopenharmony_ci                self.push_lit("440 Posting not permitted")
7077db96d56Sopenharmony_ci        else:
7087db96d56Sopenharmony_ci            assert self.allow_posting
7097db96d56Sopenharmony_ci            self.push_lit("240 Article received OK")
7107db96d56Sopenharmony_ci            self.posted_body = body
7117db96d56Sopenharmony_ci
7127db96d56Sopenharmony_ci    def handle_IHAVE(self, message_id, *, body=None):
7137db96d56Sopenharmony_ci        if body is None:
7147db96d56Sopenharmony_ci            if (self.allow_posting and
7157db96d56Sopenharmony_ci                message_id == "<i.am.an.article.you.will.want@example.com>"):
7167db96d56Sopenharmony_ci                self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
7177db96d56Sopenharmony_ci                self.expect_body()
7187db96d56Sopenharmony_ci            else:
7197db96d56Sopenharmony_ci                self.push_lit("435 Article not wanted")
7207db96d56Sopenharmony_ci        else:
7217db96d56Sopenharmony_ci            assert self.allow_posting
7227db96d56Sopenharmony_ci            self.push_lit("235 Article transferred OK")
7237db96d56Sopenharmony_ci            self.posted_body = body
7247db96d56Sopenharmony_ci
7257db96d56Sopenharmony_ci    sample_head = """\
7267db96d56Sopenharmony_ci        From: "Demo User" <nobody@example.net>
7277db96d56Sopenharmony_ci        Subject: I am just a test article
7287db96d56Sopenharmony_ci        Content-Type: text/plain; charset=UTF-8; format=flowed
7297db96d56Sopenharmony_ci        Message-ID: <i.am.an.article.you.will.want@example.com>"""
7307db96d56Sopenharmony_ci
7317db96d56Sopenharmony_ci    sample_body = """\
7327db96d56Sopenharmony_ci        This is just a test article.
7337db96d56Sopenharmony_ci        ..Here is a dot-starting line.
7347db96d56Sopenharmony_ci
7357db96d56Sopenharmony_ci        -- Signed by Andr\xe9."""
7367db96d56Sopenharmony_ci
7377db96d56Sopenharmony_ci    sample_article = sample_head + "\n\n" + sample_body
7387db96d56Sopenharmony_ci
7397db96d56Sopenharmony_ci    def handle_ARTICLE(self, message_spec=None):
7407db96d56Sopenharmony_ci        if message_spec is None:
7417db96d56Sopenharmony_ci            self.push_lit("220 3000237 <45223423@example.com>")
7427db96d56Sopenharmony_ci        elif message_spec == "<45223423@example.com>":
7437db96d56Sopenharmony_ci            self.push_lit("220 0 <45223423@example.com>")
7447db96d56Sopenharmony_ci        elif message_spec == "3000234":
7457db96d56Sopenharmony_ci            self.push_lit("220 3000234 <45223423@example.com>")
7467db96d56Sopenharmony_ci        else:
7477db96d56Sopenharmony_ci            self.push_lit("430 No Such Article Found")
7487db96d56Sopenharmony_ci            return
7497db96d56Sopenharmony_ci        self.push_lit(self.sample_article)
7507db96d56Sopenharmony_ci        self.push_lit(".")
7517db96d56Sopenharmony_ci
7527db96d56Sopenharmony_ci    def handle_HEAD(self, message_spec=None):
7537db96d56Sopenharmony_ci        if message_spec is None:
7547db96d56Sopenharmony_ci            self.push_lit("221 3000237 <45223423@example.com>")
7557db96d56Sopenharmony_ci        elif message_spec == "<45223423@example.com>":
7567db96d56Sopenharmony_ci            self.push_lit("221 0 <45223423@example.com>")
7577db96d56Sopenharmony_ci        elif message_spec == "3000234":
7587db96d56Sopenharmony_ci            self.push_lit("221 3000234 <45223423@example.com>")
7597db96d56Sopenharmony_ci        else:
7607db96d56Sopenharmony_ci            self.push_lit("430 No Such Article Found")
7617db96d56Sopenharmony_ci            return
7627db96d56Sopenharmony_ci        self.push_lit(self.sample_head)
7637db96d56Sopenharmony_ci        self.push_lit(".")
7647db96d56Sopenharmony_ci
7657db96d56Sopenharmony_ci    def handle_BODY(self, message_spec=None):
7667db96d56Sopenharmony_ci        if message_spec is None:
7677db96d56Sopenharmony_ci            self.push_lit("222 3000237 <45223423@example.com>")
7687db96d56Sopenharmony_ci        elif message_spec == "<45223423@example.com>":
7697db96d56Sopenharmony_ci            self.push_lit("222 0 <45223423@example.com>")
7707db96d56Sopenharmony_ci        elif message_spec == "3000234":
7717db96d56Sopenharmony_ci            self.push_lit("222 3000234 <45223423@example.com>")
7727db96d56Sopenharmony_ci        else:
7737db96d56Sopenharmony_ci            self.push_lit("430 No Such Article Found")
7747db96d56Sopenharmony_ci            return
7757db96d56Sopenharmony_ci        self.push_lit(self.sample_body)
7767db96d56Sopenharmony_ci        self.push_lit(".")
7777db96d56Sopenharmony_ci
7787db96d56Sopenharmony_ci    def handle_AUTHINFO(self, cred_type, data):
7797db96d56Sopenharmony_ci        if self._logged_in:
7807db96d56Sopenharmony_ci            self.push_lit('502 Already Logged In')
7817db96d56Sopenharmony_ci        elif cred_type == 'user':
7827db96d56Sopenharmony_ci            if self._user_sent:
7837db96d56Sopenharmony_ci                self.push_lit('482 User Credential Already Sent')
7847db96d56Sopenharmony_ci            else:
7857db96d56Sopenharmony_ci                self.push_lit('381 Password Required')
7867db96d56Sopenharmony_ci                self._user_sent = True
7877db96d56Sopenharmony_ci        elif cred_type == 'pass':
7887db96d56Sopenharmony_ci            self.push_lit('281 Login Successful')
7897db96d56Sopenharmony_ci            self._logged_in = True
7907db96d56Sopenharmony_ci        else:
7917db96d56Sopenharmony_ci            raise Exception('Unknown cred type {}'.format(cred_type))
7927db96d56Sopenharmony_ci
7937db96d56Sopenharmony_ci
7947db96d56Sopenharmony_ciclass NNTPv2Handler(NNTPv1Handler):
7957db96d56Sopenharmony_ci    """A handler for RFC 3977 (NNTP "v2")"""
7967db96d56Sopenharmony_ci
7977db96d56Sopenharmony_ci    def handle_CAPABILITIES(self):
7987db96d56Sopenharmony_ci        fmt = """\
7997db96d56Sopenharmony_ci            101 Capability list:
8007db96d56Sopenharmony_ci            VERSION 2 3
8017db96d56Sopenharmony_ci            IMPLEMENTATION INN 2.5.1{}
8027db96d56Sopenharmony_ci            HDR
8037db96d56Sopenharmony_ci            LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
8047db96d56Sopenharmony_ci            OVER
8057db96d56Sopenharmony_ci            POST
8067db96d56Sopenharmony_ci            READER
8077db96d56Sopenharmony_ci            ."""
8087db96d56Sopenharmony_ci
8097db96d56Sopenharmony_ci        if not self._logged_in:
8107db96d56Sopenharmony_ci            self.push_lit(fmt.format('\n            AUTHINFO USER'))
8117db96d56Sopenharmony_ci        else:
8127db96d56Sopenharmony_ci            self.push_lit(fmt.format(''))
8137db96d56Sopenharmony_ci
8147db96d56Sopenharmony_ci    def handle_MODE(self, _):
8157db96d56Sopenharmony_ci        raise Exception('MODE READER sent despite READER has been advertised')
8167db96d56Sopenharmony_ci
8177db96d56Sopenharmony_ci    def handle_OVER(self, message_spec=None):
8187db96d56Sopenharmony_ci        return self.handle_XOVER(message_spec)
8197db96d56Sopenharmony_ci
8207db96d56Sopenharmony_ci
8217db96d56Sopenharmony_ciclass CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
8227db96d56Sopenharmony_ci    """A handler that allows CAPABILITIES only after login"""
8237db96d56Sopenharmony_ci
8247db96d56Sopenharmony_ci    def handle_CAPABILITIES(self):
8257db96d56Sopenharmony_ci        if not self._logged_in:
8267db96d56Sopenharmony_ci            self.push_lit('480 You must log in.')
8277db96d56Sopenharmony_ci        else:
8287db96d56Sopenharmony_ci            super().handle_CAPABILITIES()
8297db96d56Sopenharmony_ci
8307db96d56Sopenharmony_ci
8317db96d56Sopenharmony_ciclass ModeSwitchingNNTPv2Handler(NNTPv2Handler):
8327db96d56Sopenharmony_ci    """A server that starts in transit mode"""
8337db96d56Sopenharmony_ci
8347db96d56Sopenharmony_ci    def __init__(self):
8357db96d56Sopenharmony_ci        self._switched = False
8367db96d56Sopenharmony_ci
8377db96d56Sopenharmony_ci    def handle_CAPABILITIES(self):
8387db96d56Sopenharmony_ci        fmt = """\
8397db96d56Sopenharmony_ci            101 Capability list:
8407db96d56Sopenharmony_ci            VERSION 2 3
8417db96d56Sopenharmony_ci            IMPLEMENTATION INN 2.5.1
8427db96d56Sopenharmony_ci            HDR
8437db96d56Sopenharmony_ci            LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
8447db96d56Sopenharmony_ci            OVER
8457db96d56Sopenharmony_ci            POST
8467db96d56Sopenharmony_ci            {}READER
8477db96d56Sopenharmony_ci            ."""
8487db96d56Sopenharmony_ci        if self._switched:
8497db96d56Sopenharmony_ci            self.push_lit(fmt.format(''))
8507db96d56Sopenharmony_ci        else:
8517db96d56Sopenharmony_ci            self.push_lit(fmt.format('MODE-'))
8527db96d56Sopenharmony_ci
8537db96d56Sopenharmony_ci    def handle_MODE(self, what):
8547db96d56Sopenharmony_ci        assert not self._switched and what == 'reader'
8557db96d56Sopenharmony_ci        self._switched = True
8567db96d56Sopenharmony_ci        self.push_lit('200 Posting allowed')
8577db96d56Sopenharmony_ci
8587db96d56Sopenharmony_ci
8597db96d56Sopenharmony_ciclass NNTPv1v2TestsMixin:
8607db96d56Sopenharmony_ci
8617db96d56Sopenharmony_ci    def setUp(self):
8627db96d56Sopenharmony_ci        super().setUp()
8637db96d56Sopenharmony_ci
8647db96d56Sopenharmony_ci    def test_welcome(self):
8657db96d56Sopenharmony_ci        self.assertEqual(self.server.welcome, self.handler.welcome)
8667db96d56Sopenharmony_ci
8677db96d56Sopenharmony_ci    def test_authinfo(self):
8687db96d56Sopenharmony_ci        if self.nntp_version == 2:
8697db96d56Sopenharmony_ci            self.assertIn('AUTHINFO', self.server._caps)
8707db96d56Sopenharmony_ci        self.server.login('testuser', 'testpw')
8717db96d56Sopenharmony_ci        # if AUTHINFO is gone from _caps we also know that getcapabilities()
8727db96d56Sopenharmony_ci        # has been called after login as it should
8737db96d56Sopenharmony_ci        self.assertNotIn('AUTHINFO', self.server._caps)
8747db96d56Sopenharmony_ci
8757db96d56Sopenharmony_ci    def test_date(self):
8767db96d56Sopenharmony_ci        resp, date = self.server.date()
8777db96d56Sopenharmony_ci        self.assertEqual(resp, "111 20100914001155")
8787db96d56Sopenharmony_ci        self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
8797db96d56Sopenharmony_ci
8807db96d56Sopenharmony_ci    def test_quit(self):
8817db96d56Sopenharmony_ci        self.assertFalse(self.sio.closed)
8827db96d56Sopenharmony_ci        resp = self.server.quit()
8837db96d56Sopenharmony_ci        self.assertEqual(resp, "205 Bye!")
8847db96d56Sopenharmony_ci        self.assertTrue(self.sio.closed)
8857db96d56Sopenharmony_ci
8867db96d56Sopenharmony_ci    def test_help(self):
8877db96d56Sopenharmony_ci        resp, help = self.server.help()
8887db96d56Sopenharmony_ci        self.assertEqual(resp, "100 Legal commands")
8897db96d56Sopenharmony_ci        self.assertEqual(help, [
8907db96d56Sopenharmony_ci            '  authinfo user Name|pass Password|generic <prog> <args>',
8917db96d56Sopenharmony_ci            '  date',
8927db96d56Sopenharmony_ci            '  help',
8937db96d56Sopenharmony_ci            'Report problems to <root@example.org>',
8947db96d56Sopenharmony_ci        ])
8957db96d56Sopenharmony_ci
8967db96d56Sopenharmony_ci    def test_list(self):
8977db96d56Sopenharmony_ci        resp, groups = self.server.list()
8987db96d56Sopenharmony_ci        self.assertEqual(len(groups), 6)
8997db96d56Sopenharmony_ci        g = groups[1]
9007db96d56Sopenharmony_ci        self.assertEqual(g,
9017db96d56Sopenharmony_ci            GroupInfo("comp.lang.python.announce", "0000001153",
9027db96d56Sopenharmony_ci                      "0000000993", "m"))
9037db96d56Sopenharmony_ci        resp, groups = self.server.list("*distutils*")
9047db96d56Sopenharmony_ci        self.assertEqual(len(groups), 2)
9057db96d56Sopenharmony_ci        g = groups[0]
9067db96d56Sopenharmony_ci        self.assertEqual(g,
9077db96d56Sopenharmony_ci            GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
9087db96d56Sopenharmony_ci                      "0000000001", "m"))
9097db96d56Sopenharmony_ci
9107db96d56Sopenharmony_ci    def test_stat(self):
9117db96d56Sopenharmony_ci        resp, art_num, message_id = self.server.stat(3000234)
9127db96d56Sopenharmony_ci        self.assertEqual(resp, "223 3000234 <45223423@example.com>")
9137db96d56Sopenharmony_ci        self.assertEqual(art_num, 3000234)
9147db96d56Sopenharmony_ci        self.assertEqual(message_id, "<45223423@example.com>")
9157db96d56Sopenharmony_ci        resp, art_num, message_id = self.server.stat("<45223423@example.com>")
9167db96d56Sopenharmony_ci        self.assertEqual(resp, "223 0 <45223423@example.com>")
9177db96d56Sopenharmony_ci        self.assertEqual(art_num, 0)
9187db96d56Sopenharmony_ci        self.assertEqual(message_id, "<45223423@example.com>")
9197db96d56Sopenharmony_ci        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
9207db96d56Sopenharmony_ci            self.server.stat("<non.existent.id>")
9217db96d56Sopenharmony_ci        self.assertEqual(cm.exception.response, "430 No Such Article Found")
9227db96d56Sopenharmony_ci        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
9237db96d56Sopenharmony_ci            self.server.stat()
9247db96d56Sopenharmony_ci        self.assertEqual(cm.exception.response, "412 No newsgroup selected")
9257db96d56Sopenharmony_ci
9267db96d56Sopenharmony_ci    def test_next(self):
9277db96d56Sopenharmony_ci        resp, art_num, message_id = self.server.next()
9287db96d56Sopenharmony_ci        self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
9297db96d56Sopenharmony_ci        self.assertEqual(art_num, 3000237)
9307db96d56Sopenharmony_ci        self.assertEqual(message_id, "<668929@example.org>")
9317db96d56Sopenharmony_ci
9327db96d56Sopenharmony_ci    def test_last(self):
9337db96d56Sopenharmony_ci        resp, art_num, message_id = self.server.last()
9347db96d56Sopenharmony_ci        self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
9357db96d56Sopenharmony_ci        self.assertEqual(art_num, 3000234)
9367db96d56Sopenharmony_ci        self.assertEqual(message_id, "<45223423@example.com>")
9377db96d56Sopenharmony_ci
9387db96d56Sopenharmony_ci    def test_description(self):
9397db96d56Sopenharmony_ci        desc = self.server.description("comp.lang.python")
9407db96d56Sopenharmony_ci        self.assertEqual(desc, "The Python computer language.")
9417db96d56Sopenharmony_ci        desc = self.server.description("comp.lang.pythonx")
9427db96d56Sopenharmony_ci        self.assertEqual(desc, "")
9437db96d56Sopenharmony_ci
9447db96d56Sopenharmony_ci    def test_descriptions(self):
9457db96d56Sopenharmony_ci        resp, groups = self.server.descriptions("comp.lang.python")
9467db96d56Sopenharmony_ci        self.assertEqual(resp, '215 Descriptions in form "group description".')
9477db96d56Sopenharmony_ci        self.assertEqual(groups, {
9487db96d56Sopenharmony_ci            "comp.lang.python": "The Python computer language.",
9497db96d56Sopenharmony_ci            })
9507db96d56Sopenharmony_ci        resp, groups = self.server.descriptions("comp.lang.python*")
9517db96d56Sopenharmony_ci        self.assertEqual(groups, {
9527db96d56Sopenharmony_ci            "comp.lang.python": "The Python computer language.",
9537db96d56Sopenharmony_ci            "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
9547db96d56Sopenharmony_ci            })
9557db96d56Sopenharmony_ci        resp, groups = self.server.descriptions("comp.lang.pythonx")
9567db96d56Sopenharmony_ci        self.assertEqual(groups, {})
9577db96d56Sopenharmony_ci
9587db96d56Sopenharmony_ci    def test_group(self):
9597db96d56Sopenharmony_ci        resp, count, first, last, group = self.server.group("fr.comp.lang.python")
9607db96d56Sopenharmony_ci        self.assertTrue(resp.startswith("211 "), resp)
9617db96d56Sopenharmony_ci        self.assertEqual(first, 761)
9627db96d56Sopenharmony_ci        self.assertEqual(last, 1265)
9637db96d56Sopenharmony_ci        self.assertEqual(count, 486)
9647db96d56Sopenharmony_ci        self.assertEqual(group, "fr.comp.lang.python")
9657db96d56Sopenharmony_ci        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
9667db96d56Sopenharmony_ci            self.server.group("comp.lang.python.devel")
9677db96d56Sopenharmony_ci        exc = cm.exception
9687db96d56Sopenharmony_ci        self.assertTrue(exc.response.startswith("411 No such group"),
9697db96d56Sopenharmony_ci                        exc.response)
9707db96d56Sopenharmony_ci
9717db96d56Sopenharmony_ci    def test_newnews(self):
9727db96d56Sopenharmony_ci        # NEWNEWS comp.lang.python [20]100913 082004
9737db96d56Sopenharmony_ci        dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
9747db96d56Sopenharmony_ci        resp, ids = self.server.newnews("comp.lang.python", dt)
9757db96d56Sopenharmony_ci        expected = (
9767db96d56Sopenharmony_ci            "230 list of newsarticles (NNTP v{0}) "
9777db96d56Sopenharmony_ci            "created after Mon Sep 13 08:20:04 2010 follows"
9787db96d56Sopenharmony_ci            ).format(self.nntp_version)
9797db96d56Sopenharmony_ci        self.assertEqual(resp, expected)
9807db96d56Sopenharmony_ci        self.assertEqual(ids, [
9817db96d56Sopenharmony_ci            "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
9827db96d56Sopenharmony_ci            "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
9837db96d56Sopenharmony_ci            ])
9847db96d56Sopenharmony_ci        # NEWNEWS fr.comp.lang.python [20]100913 082004
9857db96d56Sopenharmony_ci        dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
9867db96d56Sopenharmony_ci        resp, ids = self.server.newnews("fr.comp.lang.python", dt)
9877db96d56Sopenharmony_ci        self.assertEqual(resp, "230 An empty list of newsarticles follows")
9887db96d56Sopenharmony_ci        self.assertEqual(ids, [])
9897db96d56Sopenharmony_ci
9907db96d56Sopenharmony_ci    def _check_article_body(self, lines):
9917db96d56Sopenharmony_ci        self.assertEqual(len(lines), 4)
9927db96d56Sopenharmony_ci        self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
9937db96d56Sopenharmony_ci        self.assertEqual(lines[-2], b"")
9947db96d56Sopenharmony_ci        self.assertEqual(lines[-3], b".Here is a dot-starting line.")
9957db96d56Sopenharmony_ci        self.assertEqual(lines[-4], b"This is just a test article.")
9967db96d56Sopenharmony_ci
9977db96d56Sopenharmony_ci    def _check_article_head(self, lines):
9987db96d56Sopenharmony_ci        self.assertEqual(len(lines), 4)
9997db96d56Sopenharmony_ci        self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
10007db96d56Sopenharmony_ci        self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
10017db96d56Sopenharmony_ci
10027db96d56Sopenharmony_ci    def _check_article_data(self, lines):
10037db96d56Sopenharmony_ci        self.assertEqual(len(lines), 9)
10047db96d56Sopenharmony_ci        self._check_article_head(lines[:4])
10057db96d56Sopenharmony_ci        self._check_article_body(lines[-4:])
10067db96d56Sopenharmony_ci        self.assertEqual(lines[4], b"")
10077db96d56Sopenharmony_ci
10087db96d56Sopenharmony_ci    def test_article(self):
10097db96d56Sopenharmony_ci        # ARTICLE
10107db96d56Sopenharmony_ci        resp, info = self.server.article()
10117db96d56Sopenharmony_ci        self.assertEqual(resp, "220 3000237 <45223423@example.com>")
10127db96d56Sopenharmony_ci        art_num, message_id, lines = info
10137db96d56Sopenharmony_ci        self.assertEqual(art_num, 3000237)
10147db96d56Sopenharmony_ci        self.assertEqual(message_id, "<45223423@example.com>")
10157db96d56Sopenharmony_ci        self._check_article_data(lines)
10167db96d56Sopenharmony_ci        # ARTICLE num
10177db96d56Sopenharmony_ci        resp, info = self.server.article(3000234)
10187db96d56Sopenharmony_ci        self.assertEqual(resp, "220 3000234 <45223423@example.com>")
10197db96d56Sopenharmony_ci        art_num, message_id, lines = info
10207db96d56Sopenharmony_ci        self.assertEqual(art_num, 3000234)
10217db96d56Sopenharmony_ci        self.assertEqual(message_id, "<45223423@example.com>")
10227db96d56Sopenharmony_ci        self._check_article_data(lines)
10237db96d56Sopenharmony_ci        # ARTICLE id
10247db96d56Sopenharmony_ci        resp, info = self.server.article("<45223423@example.com>")
10257db96d56Sopenharmony_ci        self.assertEqual(resp, "220 0 <45223423@example.com>")
10267db96d56Sopenharmony_ci        art_num, message_id, lines = info
10277db96d56Sopenharmony_ci        self.assertEqual(art_num, 0)
10287db96d56Sopenharmony_ci        self.assertEqual(message_id, "<45223423@example.com>")
10297db96d56Sopenharmony_ci        self._check_article_data(lines)
10307db96d56Sopenharmony_ci        # Non-existent id
10317db96d56Sopenharmony_ci        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
10327db96d56Sopenharmony_ci            self.server.article("<non-existent@example.com>")
10337db96d56Sopenharmony_ci        self.assertEqual(cm.exception.response, "430 No Such Article Found")
10347db96d56Sopenharmony_ci
10357db96d56Sopenharmony_ci    def test_article_file(self):
10367db96d56Sopenharmony_ci        # With a "file" argument
10377db96d56Sopenharmony_ci        f = io.BytesIO()
10387db96d56Sopenharmony_ci        resp, info = self.server.article(file=f)
10397db96d56Sopenharmony_ci        self.assertEqual(resp, "220 3000237 <45223423@example.com>")
10407db96d56Sopenharmony_ci        art_num, message_id, lines = info
10417db96d56Sopenharmony_ci        self.assertEqual(art_num, 3000237)
10427db96d56Sopenharmony_ci        self.assertEqual(message_id, "<45223423@example.com>")
10437db96d56Sopenharmony_ci        self.assertEqual(lines, [])
10447db96d56Sopenharmony_ci        data = f.getvalue()
10457db96d56Sopenharmony_ci        self.assertTrue(data.startswith(
10467db96d56Sopenharmony_ci            b'From: "Demo User" <nobody@example.net>\r\n'
10477db96d56Sopenharmony_ci            b'Subject: I am just a test article\r\n'
10487db96d56Sopenharmony_ci            ), ascii(data))
10497db96d56Sopenharmony_ci        self.assertTrue(data.endswith(
10507db96d56Sopenharmony_ci            b'This is just a test article.\r\n'
10517db96d56Sopenharmony_ci            b'.Here is a dot-starting line.\r\n'
10527db96d56Sopenharmony_ci            b'\r\n'
10537db96d56Sopenharmony_ci            b'-- Signed by Andr\xc3\xa9.\r\n'
10547db96d56Sopenharmony_ci            ), ascii(data))
10557db96d56Sopenharmony_ci
10567db96d56Sopenharmony_ci    def test_head(self):
10577db96d56Sopenharmony_ci        # HEAD
10587db96d56Sopenharmony_ci        resp, info = self.server.head()
10597db96d56Sopenharmony_ci        self.assertEqual(resp, "221 3000237 <45223423@example.com>")
10607db96d56Sopenharmony_ci        art_num, message_id, lines = info
10617db96d56Sopenharmony_ci        self.assertEqual(art_num, 3000237)
10627db96d56Sopenharmony_ci        self.assertEqual(message_id, "<45223423@example.com>")
10637db96d56Sopenharmony_ci        self._check_article_head(lines)
10647db96d56Sopenharmony_ci        # HEAD num
10657db96d56Sopenharmony_ci        resp, info = self.server.head(3000234)
10667db96d56Sopenharmony_ci        self.assertEqual(resp, "221 3000234 <45223423@example.com>")
10677db96d56Sopenharmony_ci        art_num, message_id, lines = info
10687db96d56Sopenharmony_ci        self.assertEqual(art_num, 3000234)
10697db96d56Sopenharmony_ci        self.assertEqual(message_id, "<45223423@example.com>")
10707db96d56Sopenharmony_ci        self._check_article_head(lines)
10717db96d56Sopenharmony_ci        # HEAD id
10727db96d56Sopenharmony_ci        resp, info = self.server.head("<45223423@example.com>")
10737db96d56Sopenharmony_ci        self.assertEqual(resp, "221 0 <45223423@example.com>")
10747db96d56Sopenharmony_ci        art_num, message_id, lines = info
10757db96d56Sopenharmony_ci        self.assertEqual(art_num, 0)
10767db96d56Sopenharmony_ci        self.assertEqual(message_id, "<45223423@example.com>")
10777db96d56Sopenharmony_ci        self._check_article_head(lines)
10787db96d56Sopenharmony_ci        # Non-existent id
10797db96d56Sopenharmony_ci        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
10807db96d56Sopenharmony_ci            self.server.head("<non-existent@example.com>")
10817db96d56Sopenharmony_ci        self.assertEqual(cm.exception.response, "430 No Such Article Found")
10827db96d56Sopenharmony_ci
10837db96d56Sopenharmony_ci    def test_head_file(self):
10847db96d56Sopenharmony_ci        f = io.BytesIO()
10857db96d56Sopenharmony_ci        resp, info = self.server.head(file=f)
10867db96d56Sopenharmony_ci        self.assertEqual(resp, "221 3000237 <45223423@example.com>")
10877db96d56Sopenharmony_ci        art_num, message_id, lines = info
10887db96d56Sopenharmony_ci        self.assertEqual(art_num, 3000237)
10897db96d56Sopenharmony_ci        self.assertEqual(message_id, "<45223423@example.com>")
10907db96d56Sopenharmony_ci        self.assertEqual(lines, [])
10917db96d56Sopenharmony_ci        data = f.getvalue()
10927db96d56Sopenharmony_ci        self.assertTrue(data.startswith(
10937db96d56Sopenharmony_ci            b'From: "Demo User" <nobody@example.net>\r\n'
10947db96d56Sopenharmony_ci            b'Subject: I am just a test article\r\n'
10957db96d56Sopenharmony_ci            ), ascii(data))
10967db96d56Sopenharmony_ci        self.assertFalse(data.endswith(
10977db96d56Sopenharmony_ci            b'This is just a test article.\r\n'
10987db96d56Sopenharmony_ci            b'.Here is a dot-starting line.\r\n'
10997db96d56Sopenharmony_ci            b'\r\n'
11007db96d56Sopenharmony_ci            b'-- Signed by Andr\xc3\xa9.\r\n'
11017db96d56Sopenharmony_ci            ), ascii(data))
11027db96d56Sopenharmony_ci
11037db96d56Sopenharmony_ci    def test_body(self):
11047db96d56Sopenharmony_ci        # BODY
11057db96d56Sopenharmony_ci        resp, info = self.server.body()
11067db96d56Sopenharmony_ci        self.assertEqual(resp, "222 3000237 <45223423@example.com>")
11077db96d56Sopenharmony_ci        art_num, message_id, lines = info
11087db96d56Sopenharmony_ci        self.assertEqual(art_num, 3000237)
11097db96d56Sopenharmony_ci        self.assertEqual(message_id, "<45223423@example.com>")
11107db96d56Sopenharmony_ci        self._check_article_body(lines)
11117db96d56Sopenharmony_ci        # BODY num
11127db96d56Sopenharmony_ci        resp, info = self.server.body(3000234)
11137db96d56Sopenharmony_ci        self.assertEqual(resp, "222 3000234 <45223423@example.com>")
11147db96d56Sopenharmony_ci        art_num, message_id, lines = info
11157db96d56Sopenharmony_ci        self.assertEqual(art_num, 3000234)
11167db96d56Sopenharmony_ci        self.assertEqual(message_id, "<45223423@example.com>")
11177db96d56Sopenharmony_ci        self._check_article_body(lines)
11187db96d56Sopenharmony_ci        # BODY id
11197db96d56Sopenharmony_ci        resp, info = self.server.body("<45223423@example.com>")
11207db96d56Sopenharmony_ci        self.assertEqual(resp, "222 0 <45223423@example.com>")
11217db96d56Sopenharmony_ci        art_num, message_id, lines = info
11227db96d56Sopenharmony_ci        self.assertEqual(art_num, 0)
11237db96d56Sopenharmony_ci        self.assertEqual(message_id, "<45223423@example.com>")
11247db96d56Sopenharmony_ci        self._check_article_body(lines)
11257db96d56Sopenharmony_ci        # Non-existent id
11267db96d56Sopenharmony_ci        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
11277db96d56Sopenharmony_ci            self.server.body("<non-existent@example.com>")
11287db96d56Sopenharmony_ci        self.assertEqual(cm.exception.response, "430 No Such Article Found")
11297db96d56Sopenharmony_ci
11307db96d56Sopenharmony_ci    def test_body_file(self):
11317db96d56Sopenharmony_ci        f = io.BytesIO()
11327db96d56Sopenharmony_ci        resp, info = self.server.body(file=f)
11337db96d56Sopenharmony_ci        self.assertEqual(resp, "222 3000237 <45223423@example.com>")
11347db96d56Sopenharmony_ci        art_num, message_id, lines = info
11357db96d56Sopenharmony_ci        self.assertEqual(art_num, 3000237)
11367db96d56Sopenharmony_ci        self.assertEqual(message_id, "<45223423@example.com>")
11377db96d56Sopenharmony_ci        self.assertEqual(lines, [])
11387db96d56Sopenharmony_ci        data = f.getvalue()
11397db96d56Sopenharmony_ci        self.assertFalse(data.startswith(
11407db96d56Sopenharmony_ci            b'From: "Demo User" <nobody@example.net>\r\n'
11417db96d56Sopenharmony_ci            b'Subject: I am just a test article\r\n'
11427db96d56Sopenharmony_ci            ), ascii(data))
11437db96d56Sopenharmony_ci        self.assertTrue(data.endswith(
11447db96d56Sopenharmony_ci            b'This is just a test article.\r\n'
11457db96d56Sopenharmony_ci            b'.Here is a dot-starting line.\r\n'
11467db96d56Sopenharmony_ci            b'\r\n'
11477db96d56Sopenharmony_ci            b'-- Signed by Andr\xc3\xa9.\r\n'
11487db96d56Sopenharmony_ci            ), ascii(data))
11497db96d56Sopenharmony_ci
11507db96d56Sopenharmony_ci    def check_over_xover_resp(self, resp, overviews):
11517db96d56Sopenharmony_ci        self.assertTrue(resp.startswith("224 "), resp)
11527db96d56Sopenharmony_ci        self.assertEqual(len(overviews), 3)
11537db96d56Sopenharmony_ci        art_num, over = overviews[0]
11547db96d56Sopenharmony_ci        self.assertEqual(art_num, 57)
11557db96d56Sopenharmony_ci        self.assertEqual(over, {
11567db96d56Sopenharmony_ci            "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
11577db96d56Sopenharmony_ci            "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
11587db96d56Sopenharmony_ci            "date": "Sat, 19 Jun 2010 18:04:08 -0400",
11597db96d56Sopenharmony_ci            "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
11607db96d56Sopenharmony_ci            "references": "<hvalf7$ort$1@dough.gmane.org>",
11617db96d56Sopenharmony_ci            ":bytes": "7103",
11627db96d56Sopenharmony_ci            ":lines": "16",
11637db96d56Sopenharmony_ci            "xref": "news.gmane.io gmane.comp.python.authors:57"
11647db96d56Sopenharmony_ci            })
11657db96d56Sopenharmony_ci        art_num, over = overviews[1]
11667db96d56Sopenharmony_ci        self.assertEqual(over["xref"], None)
11677db96d56Sopenharmony_ci        art_num, over = overviews[2]
11687db96d56Sopenharmony_ci        self.assertEqual(over["subject"],
11697db96d56Sopenharmony_ci                         "Re: Message d'erreur incompréhensible (par moi)")
11707db96d56Sopenharmony_ci
11717db96d56Sopenharmony_ci    def test_xover(self):
11727db96d56Sopenharmony_ci        resp, overviews = self.server.xover(57, 59)
11737db96d56Sopenharmony_ci        self.check_over_xover_resp(resp, overviews)
11747db96d56Sopenharmony_ci
11757db96d56Sopenharmony_ci    def test_over(self):
11767db96d56Sopenharmony_ci        # In NNTP "v1", this will fallback on XOVER
11777db96d56Sopenharmony_ci        resp, overviews = self.server.over((57, 59))
11787db96d56Sopenharmony_ci        self.check_over_xover_resp(resp, overviews)
11797db96d56Sopenharmony_ci
11807db96d56Sopenharmony_ci    sample_post = (
11817db96d56Sopenharmony_ci        b'From: "Demo User" <nobody@example.net>\r\n'
11827db96d56Sopenharmony_ci        b'Subject: I am just a test article\r\n'
11837db96d56Sopenharmony_ci        b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
11847db96d56Sopenharmony_ci        b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
11857db96d56Sopenharmony_ci        b'\r\n'
11867db96d56Sopenharmony_ci        b'This is just a test article.\r\n'
11877db96d56Sopenharmony_ci        b'.Here is a dot-starting line.\r\n'
11887db96d56Sopenharmony_ci        b'\r\n'
11897db96d56Sopenharmony_ci        b'-- Signed by Andr\xc3\xa9.\r\n'
11907db96d56Sopenharmony_ci    )
11917db96d56Sopenharmony_ci
11927db96d56Sopenharmony_ci    def _check_posted_body(self):
11937db96d56Sopenharmony_ci        # Check the raw body as received by the server
11947db96d56Sopenharmony_ci        lines = self.handler.posted_body
11957db96d56Sopenharmony_ci        # One additional line for the "." terminator
11967db96d56Sopenharmony_ci        self.assertEqual(len(lines), 10)
11977db96d56Sopenharmony_ci        self.assertEqual(lines[-1], b'.\r\n')
11987db96d56Sopenharmony_ci        self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
11997db96d56Sopenharmony_ci        self.assertEqual(lines[-3], b'\r\n')
12007db96d56Sopenharmony_ci        self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
12017db96d56Sopenharmony_ci        self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
12027db96d56Sopenharmony_ci
12037db96d56Sopenharmony_ci    def _check_post_ihave_sub(self, func, *args, file_factory):
12047db96d56Sopenharmony_ci        # First the prepared post with CRLF endings
12057db96d56Sopenharmony_ci        post = self.sample_post
12067db96d56Sopenharmony_ci        func_args = args + (file_factory(post),)
12077db96d56Sopenharmony_ci        self.handler.posted_body = None
12087db96d56Sopenharmony_ci        resp = func(*func_args)
12097db96d56Sopenharmony_ci        self._check_posted_body()
12107db96d56Sopenharmony_ci        # Then the same post with "normal" line endings - they should be
12117db96d56Sopenharmony_ci        # converted by NNTP.post and NNTP.ihave.
12127db96d56Sopenharmony_ci        post = self.sample_post.replace(b"\r\n", b"\n")
12137db96d56Sopenharmony_ci        func_args = args + (file_factory(post),)
12147db96d56Sopenharmony_ci        self.handler.posted_body = None
12157db96d56Sopenharmony_ci        resp = func(*func_args)
12167db96d56Sopenharmony_ci        self._check_posted_body()
12177db96d56Sopenharmony_ci        return resp
12187db96d56Sopenharmony_ci
12197db96d56Sopenharmony_ci    def check_post_ihave(self, func, success_resp, *args):
12207db96d56Sopenharmony_ci        # With a bytes object
12217db96d56Sopenharmony_ci        resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
12227db96d56Sopenharmony_ci        self.assertEqual(resp, success_resp)
12237db96d56Sopenharmony_ci        # With a bytearray object
12247db96d56Sopenharmony_ci        resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
12257db96d56Sopenharmony_ci        self.assertEqual(resp, success_resp)
12267db96d56Sopenharmony_ci        # With a file object
12277db96d56Sopenharmony_ci        resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
12287db96d56Sopenharmony_ci        self.assertEqual(resp, success_resp)
12297db96d56Sopenharmony_ci        # With an iterable of terminated lines
12307db96d56Sopenharmony_ci        def iterlines(b):
12317db96d56Sopenharmony_ci            return iter(b.splitlines(keepends=True))
12327db96d56Sopenharmony_ci        resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
12337db96d56Sopenharmony_ci        self.assertEqual(resp, success_resp)
12347db96d56Sopenharmony_ci        # With an iterable of non-terminated lines
12357db96d56Sopenharmony_ci        def iterlines(b):
12367db96d56Sopenharmony_ci            return iter(b.splitlines(keepends=False))
12377db96d56Sopenharmony_ci        resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
12387db96d56Sopenharmony_ci        self.assertEqual(resp, success_resp)
12397db96d56Sopenharmony_ci
12407db96d56Sopenharmony_ci    def test_post(self):
12417db96d56Sopenharmony_ci        self.check_post_ihave(self.server.post, "240 Article received OK")
12427db96d56Sopenharmony_ci        self.handler.allow_posting = False
12437db96d56Sopenharmony_ci        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
12447db96d56Sopenharmony_ci            self.server.post(self.sample_post)
12457db96d56Sopenharmony_ci        self.assertEqual(cm.exception.response,
12467db96d56Sopenharmony_ci                         "440 Posting not permitted")
12477db96d56Sopenharmony_ci
12487db96d56Sopenharmony_ci    def test_ihave(self):
12497db96d56Sopenharmony_ci        self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
12507db96d56Sopenharmony_ci                              "<i.am.an.article.you.will.want@example.com>")
12517db96d56Sopenharmony_ci        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
12527db96d56Sopenharmony_ci            self.server.ihave("<another.message.id>", self.sample_post)
12537db96d56Sopenharmony_ci        self.assertEqual(cm.exception.response,
12547db96d56Sopenharmony_ci                         "435 Article not wanted")
12557db96d56Sopenharmony_ci
12567db96d56Sopenharmony_ci    def test_too_long_lines(self):
12577db96d56Sopenharmony_ci        dt = datetime.datetime(2010, 1, 1, 9, 0, 0)
12587db96d56Sopenharmony_ci        self.assertRaises(nntplib.NNTPDataError,
12597db96d56Sopenharmony_ci                          self.server.newnews, "comp.lang.python", dt)
12607db96d56Sopenharmony_ci
12617db96d56Sopenharmony_ci
12627db96d56Sopenharmony_ciclass NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
12637db96d56Sopenharmony_ci    """Tests an NNTP v1 server (no capabilities)."""
12647db96d56Sopenharmony_ci
12657db96d56Sopenharmony_ci    nntp_version = 1
12667db96d56Sopenharmony_ci    handler_class = NNTPv1Handler
12677db96d56Sopenharmony_ci
12687db96d56Sopenharmony_ci    def test_caps(self):
12697db96d56Sopenharmony_ci        caps = self.server.getcapabilities()
12707db96d56Sopenharmony_ci        self.assertEqual(caps, {})
12717db96d56Sopenharmony_ci        self.assertEqual(self.server.nntp_version, 1)
12727db96d56Sopenharmony_ci        self.assertEqual(self.server.nntp_implementation, None)
12737db96d56Sopenharmony_ci
12747db96d56Sopenharmony_ci
12757db96d56Sopenharmony_ciclass NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
12767db96d56Sopenharmony_ci    """Tests an NNTP v2 server (with capabilities)."""
12777db96d56Sopenharmony_ci
12787db96d56Sopenharmony_ci    nntp_version = 2
12797db96d56Sopenharmony_ci    handler_class = NNTPv2Handler
12807db96d56Sopenharmony_ci
12817db96d56Sopenharmony_ci    def test_caps(self):
12827db96d56Sopenharmony_ci        caps = self.server.getcapabilities()
12837db96d56Sopenharmony_ci        self.assertEqual(caps, {
12847db96d56Sopenharmony_ci            'VERSION': ['2', '3'],
12857db96d56Sopenharmony_ci            'IMPLEMENTATION': ['INN', '2.5.1'],
12867db96d56Sopenharmony_ci            'AUTHINFO': ['USER'],
12877db96d56Sopenharmony_ci            'HDR': [],
12887db96d56Sopenharmony_ci            'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
12897db96d56Sopenharmony_ci                     'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
12907db96d56Sopenharmony_ci            'OVER': [],
12917db96d56Sopenharmony_ci            'POST': [],
12927db96d56Sopenharmony_ci            'READER': [],
12937db96d56Sopenharmony_ci            })
12947db96d56Sopenharmony_ci        self.assertEqual(self.server.nntp_version, 3)
12957db96d56Sopenharmony_ci        self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
12967db96d56Sopenharmony_ci
12977db96d56Sopenharmony_ci
12987db96d56Sopenharmony_ciclass CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
12997db96d56Sopenharmony_ci    """Tests a probably NNTP v2 server with capabilities only after login."""
13007db96d56Sopenharmony_ci
13017db96d56Sopenharmony_ci    nntp_version = 2
13027db96d56Sopenharmony_ci    handler_class = CapsAfterLoginNNTPv2Handler
13037db96d56Sopenharmony_ci
13047db96d56Sopenharmony_ci    def test_caps_only_after_login(self):
13057db96d56Sopenharmony_ci        self.assertEqual(self.server._caps, {})
13067db96d56Sopenharmony_ci        self.server.login('testuser', 'testpw')
13077db96d56Sopenharmony_ci        self.assertIn('VERSION', self.server._caps)
13087db96d56Sopenharmony_ci
13097db96d56Sopenharmony_ci
13107db96d56Sopenharmony_ciclass SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
13117db96d56Sopenharmony_ci        unittest.TestCase):
13127db96d56Sopenharmony_ci    """Same tests as for v2 but we tell NTTP to send MODE READER to a server
13137db96d56Sopenharmony_ci    that isn't in READER mode by default."""
13147db96d56Sopenharmony_ci
13157db96d56Sopenharmony_ci    nntp_version = 2
13167db96d56Sopenharmony_ci    handler_class = ModeSwitchingNNTPv2Handler
13177db96d56Sopenharmony_ci
13187db96d56Sopenharmony_ci    def test_we_are_in_reader_mode_after_connect(self):
13197db96d56Sopenharmony_ci        self.assertIn('READER', self.server._caps)
13207db96d56Sopenharmony_ci
13217db96d56Sopenharmony_ci
13227db96d56Sopenharmony_ciclass MiscTests(unittest.TestCase):
13237db96d56Sopenharmony_ci
13247db96d56Sopenharmony_ci    def test_decode_header(self):
13257db96d56Sopenharmony_ci        def gives(a, b):
13267db96d56Sopenharmony_ci            self.assertEqual(nntplib.decode_header(a), b)
13277db96d56Sopenharmony_ci        gives("" , "")
13287db96d56Sopenharmony_ci        gives("a plain header", "a plain header")
13297db96d56Sopenharmony_ci        gives(" with extra  spaces ", " with extra  spaces ")
13307db96d56Sopenharmony_ci        gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
13317db96d56Sopenharmony_ci        gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
13327db96d56Sopenharmony_ci              " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
13337db96d56Sopenharmony_ci              "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
13347db96d56Sopenharmony_ci        gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
13357db96d56Sopenharmony_ci              "Re: problème de matrice")
13367db96d56Sopenharmony_ci        # A natively utf-8 header (found in the real world!)
13377db96d56Sopenharmony_ci        gives("Re: Message d'erreur incompréhensible (par moi)",
13387db96d56Sopenharmony_ci              "Re: Message d'erreur incompréhensible (par moi)")
13397db96d56Sopenharmony_ci
13407db96d56Sopenharmony_ci    def test_parse_overview_fmt(self):
13417db96d56Sopenharmony_ci        # The minimal (default) response
13427db96d56Sopenharmony_ci        lines = ["Subject:", "From:", "Date:", "Message-ID:",
13437db96d56Sopenharmony_ci                 "References:", ":bytes", ":lines"]
13447db96d56Sopenharmony_ci        self.assertEqual(nntplib._parse_overview_fmt(lines),
13457db96d56Sopenharmony_ci            ["subject", "from", "date", "message-id", "references",
13467db96d56Sopenharmony_ci             ":bytes", ":lines"])
13477db96d56Sopenharmony_ci        # The minimal response using alternative names
13487db96d56Sopenharmony_ci        lines = ["Subject:", "From:", "Date:", "Message-ID:",
13497db96d56Sopenharmony_ci                 "References:", "Bytes:", "Lines:"]
13507db96d56Sopenharmony_ci        self.assertEqual(nntplib._parse_overview_fmt(lines),
13517db96d56Sopenharmony_ci            ["subject", "from", "date", "message-id", "references",
13527db96d56Sopenharmony_ci             ":bytes", ":lines"])
13537db96d56Sopenharmony_ci        # Variations in casing
13547db96d56Sopenharmony_ci        lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
13557db96d56Sopenharmony_ci                 "References:", "BYTES:", "Lines:"]
13567db96d56Sopenharmony_ci        self.assertEqual(nntplib._parse_overview_fmt(lines),
13577db96d56Sopenharmony_ci            ["subject", "from", "date", "message-id", "references",
13587db96d56Sopenharmony_ci             ":bytes", ":lines"])
13597db96d56Sopenharmony_ci        # First example from RFC 3977
13607db96d56Sopenharmony_ci        lines = ["Subject:", "From:", "Date:", "Message-ID:",
13617db96d56Sopenharmony_ci                 "References:", ":bytes", ":lines", "Xref:full",
13627db96d56Sopenharmony_ci                 "Distribution:full"]
13637db96d56Sopenharmony_ci        self.assertEqual(nntplib._parse_overview_fmt(lines),
13647db96d56Sopenharmony_ci            ["subject", "from", "date", "message-id", "references",
13657db96d56Sopenharmony_ci             ":bytes", ":lines", "xref", "distribution"])
13667db96d56Sopenharmony_ci        # Second example from RFC 3977
13677db96d56Sopenharmony_ci        lines = ["Subject:", "From:", "Date:", "Message-ID:",
13687db96d56Sopenharmony_ci                 "References:", "Bytes:", "Lines:", "Xref:FULL",
13697db96d56Sopenharmony_ci                 "Distribution:FULL"]
13707db96d56Sopenharmony_ci        self.assertEqual(nntplib._parse_overview_fmt(lines),
13717db96d56Sopenharmony_ci            ["subject", "from", "date", "message-id", "references",
13727db96d56Sopenharmony_ci             ":bytes", ":lines", "xref", "distribution"])
13737db96d56Sopenharmony_ci        # A classic response from INN
13747db96d56Sopenharmony_ci        lines = ["Subject:", "From:", "Date:", "Message-ID:",
13757db96d56Sopenharmony_ci                 "References:", "Bytes:", "Lines:", "Xref:full"]
13767db96d56Sopenharmony_ci        self.assertEqual(nntplib._parse_overview_fmt(lines),
13777db96d56Sopenharmony_ci            ["subject", "from", "date", "message-id", "references",
13787db96d56Sopenharmony_ci             ":bytes", ":lines", "xref"])
13797db96d56Sopenharmony_ci
13807db96d56Sopenharmony_ci    def test_parse_overview(self):
13817db96d56Sopenharmony_ci        fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
13827db96d56Sopenharmony_ci        # First example from RFC 3977
13837db96d56Sopenharmony_ci        lines = [
13847db96d56Sopenharmony_ci            '3000234\tI am just a test article\t"Demo User" '
13857db96d56Sopenharmony_ci            '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
13867db96d56Sopenharmony_ci            '<45223423@example.com>\t<45454@example.net>\t1234\t'
13877db96d56Sopenharmony_ci            '17\tXref: news.example.com misc.test:3000363',
13887db96d56Sopenharmony_ci        ]
13897db96d56Sopenharmony_ci        overview = nntplib._parse_overview(lines, fmt)
13907db96d56Sopenharmony_ci        (art_num, fields), = overview
13917db96d56Sopenharmony_ci        self.assertEqual(art_num, 3000234)
13927db96d56Sopenharmony_ci        self.assertEqual(fields, {
13937db96d56Sopenharmony_ci            'subject': 'I am just a test article',
13947db96d56Sopenharmony_ci            'from': '"Demo User" <nobody@example.com>',
13957db96d56Sopenharmony_ci            'date': '6 Oct 1998 04:38:40 -0500',
13967db96d56Sopenharmony_ci            'message-id': '<45223423@example.com>',
13977db96d56Sopenharmony_ci            'references': '<45454@example.net>',
13987db96d56Sopenharmony_ci            ':bytes': '1234',
13997db96d56Sopenharmony_ci            ':lines': '17',
14007db96d56Sopenharmony_ci            'xref': 'news.example.com misc.test:3000363',
14017db96d56Sopenharmony_ci        })
14027db96d56Sopenharmony_ci        # Second example; here the "Xref" field is totally absent (including
14037db96d56Sopenharmony_ci        # the header name) and comes out as None
14047db96d56Sopenharmony_ci        lines = [
14057db96d56Sopenharmony_ci            '3000234\tI am just a test article\t"Demo User" '
14067db96d56Sopenharmony_ci            '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
14077db96d56Sopenharmony_ci            '<45223423@example.com>\t<45454@example.net>\t1234\t'
14087db96d56Sopenharmony_ci            '17\t\t',
14097db96d56Sopenharmony_ci        ]
14107db96d56Sopenharmony_ci        overview = nntplib._parse_overview(lines, fmt)
14117db96d56Sopenharmony_ci        (art_num, fields), = overview
14127db96d56Sopenharmony_ci        self.assertEqual(fields['xref'], None)
14137db96d56Sopenharmony_ci        # Third example; the "Xref" is an empty string, while "references"
14147db96d56Sopenharmony_ci        # is a single space.
14157db96d56Sopenharmony_ci        lines = [
14167db96d56Sopenharmony_ci            '3000234\tI am just a test article\t"Demo User" '
14177db96d56Sopenharmony_ci            '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
14187db96d56Sopenharmony_ci            '<45223423@example.com>\t \t1234\t'
14197db96d56Sopenharmony_ci            '17\tXref: \t',
14207db96d56Sopenharmony_ci        ]
14217db96d56Sopenharmony_ci        overview = nntplib._parse_overview(lines, fmt)
14227db96d56Sopenharmony_ci        (art_num, fields), = overview
14237db96d56Sopenharmony_ci        self.assertEqual(fields['references'], ' ')
14247db96d56Sopenharmony_ci        self.assertEqual(fields['xref'], '')
14257db96d56Sopenharmony_ci
14267db96d56Sopenharmony_ci    def test_parse_datetime(self):
14277db96d56Sopenharmony_ci        def gives(a, b, *c):
14287db96d56Sopenharmony_ci            self.assertEqual(nntplib._parse_datetime(a, b),
14297db96d56Sopenharmony_ci                             datetime.datetime(*c))
14307db96d56Sopenharmony_ci        # Output of DATE command
14317db96d56Sopenharmony_ci        gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
14327db96d56Sopenharmony_ci        # Variations
14337db96d56Sopenharmony_ci        gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
14347db96d56Sopenharmony_ci        gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
14357db96d56Sopenharmony_ci        gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
14367db96d56Sopenharmony_ci
14377db96d56Sopenharmony_ci    def test_unparse_datetime(self):
14387db96d56Sopenharmony_ci        # Test non-legacy mode
14397db96d56Sopenharmony_ci        # 1) with a datetime
14407db96d56Sopenharmony_ci        def gives(y, M, d, h, m, s, date_str, time_str):
14417db96d56Sopenharmony_ci            dt = datetime.datetime(y, M, d, h, m, s)
14427db96d56Sopenharmony_ci            self.assertEqual(nntplib._unparse_datetime(dt),
14437db96d56Sopenharmony_ci                             (date_str, time_str))
14447db96d56Sopenharmony_ci            self.assertEqual(nntplib._unparse_datetime(dt, False),
14457db96d56Sopenharmony_ci                             (date_str, time_str))
14467db96d56Sopenharmony_ci        gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
14477db96d56Sopenharmony_ci        gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
14487db96d56Sopenharmony_ci        gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
14497db96d56Sopenharmony_ci        # 2) with a date
14507db96d56Sopenharmony_ci        def gives(y, M, d, date_str, time_str):
14517db96d56Sopenharmony_ci            dt = datetime.date(y, M, d)
14527db96d56Sopenharmony_ci            self.assertEqual(nntplib._unparse_datetime(dt),
14537db96d56Sopenharmony_ci                             (date_str, time_str))
14547db96d56Sopenharmony_ci            self.assertEqual(nntplib._unparse_datetime(dt, False),
14557db96d56Sopenharmony_ci                             (date_str, time_str))
14567db96d56Sopenharmony_ci        gives(1999, 6, 23, "19990623", "000000")
14577db96d56Sopenharmony_ci        gives(2000, 6, 23, "20000623", "000000")
14587db96d56Sopenharmony_ci        gives(2010, 6, 5, "20100605", "000000")
14597db96d56Sopenharmony_ci
14607db96d56Sopenharmony_ci    def test_unparse_datetime_legacy(self):
14617db96d56Sopenharmony_ci        # Test legacy mode (RFC 977)
14627db96d56Sopenharmony_ci        # 1) with a datetime
14637db96d56Sopenharmony_ci        def gives(y, M, d, h, m, s, date_str, time_str):
14647db96d56Sopenharmony_ci            dt = datetime.datetime(y, M, d, h, m, s)
14657db96d56Sopenharmony_ci            self.assertEqual(nntplib._unparse_datetime(dt, True),
14667db96d56Sopenharmony_ci                             (date_str, time_str))
14677db96d56Sopenharmony_ci        gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
14687db96d56Sopenharmony_ci        gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
14697db96d56Sopenharmony_ci        gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
14707db96d56Sopenharmony_ci        # 2) with a date
14717db96d56Sopenharmony_ci        def gives(y, M, d, date_str, time_str):
14727db96d56Sopenharmony_ci            dt = datetime.date(y, M, d)
14737db96d56Sopenharmony_ci            self.assertEqual(nntplib._unparse_datetime(dt, True),
14747db96d56Sopenharmony_ci                             (date_str, time_str))
14757db96d56Sopenharmony_ci        gives(1999, 6, 23, "990623", "000000")
14767db96d56Sopenharmony_ci        gives(2000, 6, 23, "000623", "000000")
14777db96d56Sopenharmony_ci        gives(2010, 6, 5, "100605", "000000")
14787db96d56Sopenharmony_ci
14797db96d56Sopenharmony_ci    @unittest.skipUnless(ssl, 'requires SSL support')
14807db96d56Sopenharmony_ci    def test_ssl_support(self):
14817db96d56Sopenharmony_ci        self.assertTrue(hasattr(nntplib, 'NNTP_SSL'))
14827db96d56Sopenharmony_ci
14837db96d56Sopenharmony_ci
14847db96d56Sopenharmony_ciclass PublicAPITests(unittest.TestCase):
14857db96d56Sopenharmony_ci    """Ensures that the correct values are exposed in the public API."""
14867db96d56Sopenharmony_ci
14877db96d56Sopenharmony_ci    def test_module_all_attribute(self):
14887db96d56Sopenharmony_ci        self.assertTrue(hasattr(nntplib, '__all__'))
14897db96d56Sopenharmony_ci        target_api = ['NNTP', 'NNTPError', 'NNTPReplyError',
14907db96d56Sopenharmony_ci                      'NNTPTemporaryError', 'NNTPPermanentError',
14917db96d56Sopenharmony_ci                      'NNTPProtocolError', 'NNTPDataError', 'decode_header']
14927db96d56Sopenharmony_ci        if ssl is not None:
14937db96d56Sopenharmony_ci            target_api.append('NNTP_SSL')
14947db96d56Sopenharmony_ci        self.assertEqual(set(nntplib.__all__), set(target_api))
14957db96d56Sopenharmony_ci
14967db96d56Sopenharmony_ciclass MockSocketTests(unittest.TestCase):
14977db96d56Sopenharmony_ci    """Tests involving a mock socket object
14987db96d56Sopenharmony_ci
14997db96d56Sopenharmony_ci    Used where the _NNTPServerIO file object is not enough."""
15007db96d56Sopenharmony_ci
15017db96d56Sopenharmony_ci    nntp_class = nntplib.NNTP
15027db96d56Sopenharmony_ci
15037db96d56Sopenharmony_ci    def check_constructor_error_conditions(
15047db96d56Sopenharmony_ci            self, handler_class,
15057db96d56Sopenharmony_ci            expected_error_type, expected_error_msg,
15067db96d56Sopenharmony_ci            login=None, password=None):
15077db96d56Sopenharmony_ci
15087db96d56Sopenharmony_ci        class mock_socket_module:
15097db96d56Sopenharmony_ci            def create_connection(address, timeout):
15107db96d56Sopenharmony_ci                return MockSocket()
15117db96d56Sopenharmony_ci
15127db96d56Sopenharmony_ci        class MockSocket:
15137db96d56Sopenharmony_ci            def close(self):
15147db96d56Sopenharmony_ci                nonlocal socket_closed
15157db96d56Sopenharmony_ci                socket_closed = True
15167db96d56Sopenharmony_ci
15177db96d56Sopenharmony_ci            def makefile(socket, mode):
15187db96d56Sopenharmony_ci                handler = handler_class()
15197db96d56Sopenharmony_ci                _, file = make_mock_file(handler)
15207db96d56Sopenharmony_ci                files.append(file)
15217db96d56Sopenharmony_ci                return file
15227db96d56Sopenharmony_ci
15237db96d56Sopenharmony_ci        socket_closed = False
15247db96d56Sopenharmony_ci        files = []
15257db96d56Sopenharmony_ci        with patch('nntplib.socket', mock_socket_module), \
15267db96d56Sopenharmony_ci             self.assertRaisesRegex(expected_error_type, expected_error_msg):
15277db96d56Sopenharmony_ci            self.nntp_class('dummy', user=login, password=password)
15287db96d56Sopenharmony_ci        self.assertTrue(socket_closed)
15297db96d56Sopenharmony_ci        for f in files:
15307db96d56Sopenharmony_ci            self.assertTrue(f.closed)
15317db96d56Sopenharmony_ci
15327db96d56Sopenharmony_ci    def test_bad_welcome(self):
15337db96d56Sopenharmony_ci        #Test a bad welcome message
15347db96d56Sopenharmony_ci        class Handler(NNTPv1Handler):
15357db96d56Sopenharmony_ci            welcome = 'Bad Welcome'
15367db96d56Sopenharmony_ci        self.check_constructor_error_conditions(
15377db96d56Sopenharmony_ci            Handler, nntplib.NNTPProtocolError, Handler.welcome)
15387db96d56Sopenharmony_ci
15397db96d56Sopenharmony_ci    def test_service_temporarily_unavailable(self):
15407db96d56Sopenharmony_ci        #Test service temporarily unavailable
15417db96d56Sopenharmony_ci        class Handler(NNTPv1Handler):
15427db96d56Sopenharmony_ci            welcome = '400 Service temporarily unavailable'
15437db96d56Sopenharmony_ci        self.check_constructor_error_conditions(
15447db96d56Sopenharmony_ci            Handler, nntplib.NNTPTemporaryError, Handler.welcome)
15457db96d56Sopenharmony_ci
15467db96d56Sopenharmony_ci    def test_service_permanently_unavailable(self):
15477db96d56Sopenharmony_ci        #Test service permanently unavailable
15487db96d56Sopenharmony_ci        class Handler(NNTPv1Handler):
15497db96d56Sopenharmony_ci            welcome = '502 Service permanently unavailable'
15507db96d56Sopenharmony_ci        self.check_constructor_error_conditions(
15517db96d56Sopenharmony_ci            Handler, nntplib.NNTPPermanentError, Handler.welcome)
15527db96d56Sopenharmony_ci
15537db96d56Sopenharmony_ci    def test_bad_capabilities(self):
15547db96d56Sopenharmony_ci        #Test a bad capabilities response
15557db96d56Sopenharmony_ci        class Handler(NNTPv1Handler):
15567db96d56Sopenharmony_ci            def handle_CAPABILITIES(self):
15577db96d56Sopenharmony_ci                self.push_lit(capabilities_response)
15587db96d56Sopenharmony_ci        capabilities_response = '201 bad capability'
15597db96d56Sopenharmony_ci        self.check_constructor_error_conditions(
15607db96d56Sopenharmony_ci            Handler, nntplib.NNTPReplyError, capabilities_response)
15617db96d56Sopenharmony_ci
15627db96d56Sopenharmony_ci    def test_login_aborted(self):
15637db96d56Sopenharmony_ci        #Test a bad authinfo response
15647db96d56Sopenharmony_ci        login = 't@e.com'
15657db96d56Sopenharmony_ci        password = 'python'
15667db96d56Sopenharmony_ci        class Handler(NNTPv1Handler):
15677db96d56Sopenharmony_ci            def handle_AUTHINFO(self, *args):
15687db96d56Sopenharmony_ci                self.push_lit(authinfo_response)
15697db96d56Sopenharmony_ci        authinfo_response = '503 Mechanism not recognized'
15707db96d56Sopenharmony_ci        self.check_constructor_error_conditions(
15717db96d56Sopenharmony_ci            Handler, nntplib.NNTPPermanentError, authinfo_response,
15727db96d56Sopenharmony_ci            login, password)
15737db96d56Sopenharmony_ci
15747db96d56Sopenharmony_ciclass bypass_context:
15757db96d56Sopenharmony_ci    """Bypass encryption and actual SSL module"""
15767db96d56Sopenharmony_ci    def wrap_socket(sock, **args):
15777db96d56Sopenharmony_ci        return sock
15787db96d56Sopenharmony_ci
15797db96d56Sopenharmony_ci@unittest.skipUnless(ssl, 'requires SSL support')
15807db96d56Sopenharmony_ciclass MockSslTests(MockSocketTests):
15817db96d56Sopenharmony_ci    @staticmethod
15827db96d56Sopenharmony_ci    def nntp_class(*pos, **kw):
15837db96d56Sopenharmony_ci        return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw)
15847db96d56Sopenharmony_ci
15857db96d56Sopenharmony_ci
15867db96d56Sopenharmony_ciclass LocalServerTests(unittest.TestCase):
15877db96d56Sopenharmony_ci    def setUp(self):
15887db96d56Sopenharmony_ci        sock = socket.socket()
15897db96d56Sopenharmony_ci        port = socket_helper.bind_port(sock)
15907db96d56Sopenharmony_ci        sock.listen()
15917db96d56Sopenharmony_ci        self.background = threading.Thread(
15927db96d56Sopenharmony_ci            target=self.run_server, args=(sock,))
15937db96d56Sopenharmony_ci        self.background.start()
15947db96d56Sopenharmony_ci        self.addCleanup(self.background.join)
15957db96d56Sopenharmony_ci
15967db96d56Sopenharmony_ci        self.nntp = self.enterContext(NNTP(socket_helper.HOST, port, usenetrc=False))
15977db96d56Sopenharmony_ci
15987db96d56Sopenharmony_ci    def run_server(self, sock):
15997db96d56Sopenharmony_ci        # Could be generalized to handle more commands in separate methods
16007db96d56Sopenharmony_ci        with sock:
16017db96d56Sopenharmony_ci            [client, _] = sock.accept()
16027db96d56Sopenharmony_ci        with contextlib.ExitStack() as cleanup:
16037db96d56Sopenharmony_ci            cleanup.enter_context(client)
16047db96d56Sopenharmony_ci            reader = cleanup.enter_context(client.makefile('rb'))
16057db96d56Sopenharmony_ci            client.sendall(b'200 Server ready\r\n')
16067db96d56Sopenharmony_ci            while True:
16077db96d56Sopenharmony_ci                cmd = reader.readline()
16087db96d56Sopenharmony_ci                if cmd == b'CAPABILITIES\r\n':
16097db96d56Sopenharmony_ci                    client.sendall(
16107db96d56Sopenharmony_ci                        b'101 Capability list:\r\n'
16117db96d56Sopenharmony_ci                        b'VERSION 2\r\n'
16127db96d56Sopenharmony_ci                        b'STARTTLS\r\n'
16137db96d56Sopenharmony_ci                        b'.\r\n'
16147db96d56Sopenharmony_ci                    )
16157db96d56Sopenharmony_ci                elif cmd == b'STARTTLS\r\n':
16167db96d56Sopenharmony_ci                    reader.close()
16177db96d56Sopenharmony_ci                    client.sendall(b'382 Begin TLS negotiation now\r\n')
16187db96d56Sopenharmony_ci                    context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
16197db96d56Sopenharmony_ci                    context.load_cert_chain(certfile)
16207db96d56Sopenharmony_ci                    client = context.wrap_socket(
16217db96d56Sopenharmony_ci                        client, server_side=True)
16227db96d56Sopenharmony_ci                    cleanup.enter_context(client)
16237db96d56Sopenharmony_ci                    reader = cleanup.enter_context(client.makefile('rb'))
16247db96d56Sopenharmony_ci                elif cmd == b'QUIT\r\n':
16257db96d56Sopenharmony_ci                    client.sendall(b'205 Bye!\r\n')
16267db96d56Sopenharmony_ci                    break
16277db96d56Sopenharmony_ci                else:
16287db96d56Sopenharmony_ci                    raise ValueError('Unexpected command {!r}'.format(cmd))
16297db96d56Sopenharmony_ci
16307db96d56Sopenharmony_ci    @unittest.skipUnless(ssl, 'requires SSL support')
16317db96d56Sopenharmony_ci    def test_starttls(self):
16327db96d56Sopenharmony_ci        file = self.nntp.file
16337db96d56Sopenharmony_ci        sock = self.nntp.sock
16347db96d56Sopenharmony_ci        self.nntp.starttls()
16357db96d56Sopenharmony_ci        # Check that the socket and internal pseudo-file really were
16367db96d56Sopenharmony_ci        # changed.
16377db96d56Sopenharmony_ci        self.assertNotEqual(file, self.nntp.file)
16387db96d56Sopenharmony_ci        self.assertNotEqual(sock, self.nntp.sock)
16397db96d56Sopenharmony_ci        # Check that the new socket really is an SSL one
16407db96d56Sopenharmony_ci        self.assertIsInstance(self.nntp.sock, ssl.SSLSocket)
16417db96d56Sopenharmony_ci        # Check that trying starttls when it's already active fails.
16427db96d56Sopenharmony_ci        self.assertRaises(ValueError, self.nntp.starttls)
16437db96d56Sopenharmony_ci
16447db96d56Sopenharmony_ci
16457db96d56Sopenharmony_ciif __name__ == "__main__":
16467db96d56Sopenharmony_ci    unittest.main()
1647