17db96d56Sopenharmony_ciimport textwrap
27db96d56Sopenharmony_ciimport unittest
37db96d56Sopenharmony_ciimport contextlib
47db96d56Sopenharmony_cifrom email import policy
57db96d56Sopenharmony_cifrom email import errors
67db96d56Sopenharmony_cifrom test.test_email import TestEmailBase
77db96d56Sopenharmony_ci
87db96d56Sopenharmony_ci
97db96d56Sopenharmony_ciclass TestDefectsBase:
107db96d56Sopenharmony_ci
117db96d56Sopenharmony_ci    policy = policy.default
127db96d56Sopenharmony_ci    raise_expected = False
137db96d56Sopenharmony_ci
147db96d56Sopenharmony_ci    @contextlib.contextmanager
157db96d56Sopenharmony_ci    def _raise_point(self, defect):
167db96d56Sopenharmony_ci        yield
177db96d56Sopenharmony_ci
187db96d56Sopenharmony_ci    def test_same_boundary_inner_outer(self):
197db96d56Sopenharmony_ci        source = textwrap.dedent("""\
207db96d56Sopenharmony_ci            Subject: XX
217db96d56Sopenharmony_ci            From: xx@xx.dk
227db96d56Sopenharmony_ci            To: XX
237db96d56Sopenharmony_ci            Mime-version: 1.0
247db96d56Sopenharmony_ci            Content-type: multipart/mixed;
257db96d56Sopenharmony_ci               boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
267db96d56Sopenharmony_ci
277db96d56Sopenharmony_ci            --MS_Mac_OE_3071477847_720252_MIME_Part
287db96d56Sopenharmony_ci            Content-type: multipart/alternative;
297db96d56Sopenharmony_ci               boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
307db96d56Sopenharmony_ci
317db96d56Sopenharmony_ci            --MS_Mac_OE_3071477847_720252_MIME_Part
327db96d56Sopenharmony_ci            Content-type: text/plain; charset="ISO-8859-1"
337db96d56Sopenharmony_ci            Content-transfer-encoding: quoted-printable
347db96d56Sopenharmony_ci
357db96d56Sopenharmony_ci            text
367db96d56Sopenharmony_ci
377db96d56Sopenharmony_ci            --MS_Mac_OE_3071477847_720252_MIME_Part
387db96d56Sopenharmony_ci            Content-type: text/html; charset="ISO-8859-1"
397db96d56Sopenharmony_ci            Content-transfer-encoding: quoted-printable
407db96d56Sopenharmony_ci
417db96d56Sopenharmony_ci            <HTML></HTML>
427db96d56Sopenharmony_ci
437db96d56Sopenharmony_ci            --MS_Mac_OE_3071477847_720252_MIME_Part--
447db96d56Sopenharmony_ci
457db96d56Sopenharmony_ci            --MS_Mac_OE_3071477847_720252_MIME_Part
467db96d56Sopenharmony_ci            Content-type: image/gif; name="xx.gif";
477db96d56Sopenharmony_ci            Content-disposition: attachment
487db96d56Sopenharmony_ci            Content-transfer-encoding: base64
497db96d56Sopenharmony_ci
507db96d56Sopenharmony_ci            Some removed base64 encoded chars.
517db96d56Sopenharmony_ci
527db96d56Sopenharmony_ci            --MS_Mac_OE_3071477847_720252_MIME_Part--
537db96d56Sopenharmony_ci
547db96d56Sopenharmony_ci            """)
557db96d56Sopenharmony_ci        # XXX better would be to actually detect the duplicate.
567db96d56Sopenharmony_ci        with self._raise_point(errors.StartBoundaryNotFoundDefect):
577db96d56Sopenharmony_ci            msg = self._str_msg(source)
587db96d56Sopenharmony_ci        if self.raise_expected: return
597db96d56Sopenharmony_ci        inner = msg.get_payload(0)
607db96d56Sopenharmony_ci        self.assertTrue(hasattr(inner, 'defects'))
617db96d56Sopenharmony_ci        self.assertEqual(len(self.get_defects(inner)), 1)
627db96d56Sopenharmony_ci        self.assertIsInstance(self.get_defects(inner)[0],
637db96d56Sopenharmony_ci                              errors.StartBoundaryNotFoundDefect)
647db96d56Sopenharmony_ci
657db96d56Sopenharmony_ci    def test_multipart_no_boundary(self):
667db96d56Sopenharmony_ci        source = textwrap.dedent("""\
677db96d56Sopenharmony_ci            Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800)
687db96d56Sopenharmony_ci            From: foobar
697db96d56Sopenharmony_ci            Subject: broken mail
707db96d56Sopenharmony_ci            MIME-Version: 1.0
717db96d56Sopenharmony_ci            Content-Type: multipart/report; report-type=delivery-status;
727db96d56Sopenharmony_ci
737db96d56Sopenharmony_ci            --JAB03225.986577786/zinfandel.lacita.com
747db96d56Sopenharmony_ci
757db96d56Sopenharmony_ci            One part
767db96d56Sopenharmony_ci
777db96d56Sopenharmony_ci            --JAB03225.986577786/zinfandel.lacita.com
787db96d56Sopenharmony_ci            Content-Type: message/delivery-status
797db96d56Sopenharmony_ci
807db96d56Sopenharmony_ci            Header: Another part
817db96d56Sopenharmony_ci
827db96d56Sopenharmony_ci            --JAB03225.986577786/zinfandel.lacita.com--
837db96d56Sopenharmony_ci            """)
847db96d56Sopenharmony_ci        with self._raise_point(errors.NoBoundaryInMultipartDefect):
857db96d56Sopenharmony_ci            msg = self._str_msg(source)
867db96d56Sopenharmony_ci        if self.raise_expected: return
877db96d56Sopenharmony_ci        self.assertIsInstance(msg.get_payload(), str)
887db96d56Sopenharmony_ci        self.assertEqual(len(self.get_defects(msg)), 2)
897db96d56Sopenharmony_ci        self.assertIsInstance(self.get_defects(msg)[0],
907db96d56Sopenharmony_ci                              errors.NoBoundaryInMultipartDefect)
917db96d56Sopenharmony_ci        self.assertIsInstance(self.get_defects(msg)[1],
927db96d56Sopenharmony_ci                              errors.MultipartInvariantViolationDefect)
937db96d56Sopenharmony_ci
947db96d56Sopenharmony_ci    multipart_msg = textwrap.dedent("""\
957db96d56Sopenharmony_ci        Date: Wed, 14 Nov 2007 12:56:23 GMT
967db96d56Sopenharmony_ci        From: foo@bar.invalid
977db96d56Sopenharmony_ci        To: foo@bar.invalid
987db96d56Sopenharmony_ci        Subject: Content-Transfer-Encoding: base64 and multipart
997db96d56Sopenharmony_ci        MIME-Version: 1.0
1007db96d56Sopenharmony_ci        Content-Type: multipart/mixed;
1017db96d56Sopenharmony_ci            boundary="===============3344438784458119861=="{}
1027db96d56Sopenharmony_ci
1037db96d56Sopenharmony_ci        --===============3344438784458119861==
1047db96d56Sopenharmony_ci        Content-Type: text/plain
1057db96d56Sopenharmony_ci
1067db96d56Sopenharmony_ci        Test message
1077db96d56Sopenharmony_ci
1087db96d56Sopenharmony_ci        --===============3344438784458119861==
1097db96d56Sopenharmony_ci        Content-Type: application/octet-stream
1107db96d56Sopenharmony_ci        Content-Transfer-Encoding: base64
1117db96d56Sopenharmony_ci
1127db96d56Sopenharmony_ci        YWJj
1137db96d56Sopenharmony_ci
1147db96d56Sopenharmony_ci        --===============3344438784458119861==--
1157db96d56Sopenharmony_ci        """)
1167db96d56Sopenharmony_ci
1177db96d56Sopenharmony_ci    def test_multipart_invalid_cte(self):
1187db96d56Sopenharmony_ci        with self._raise_point(
1197db96d56Sopenharmony_ci                errors.InvalidMultipartContentTransferEncodingDefect):
1207db96d56Sopenharmony_ci            msg = self._str_msg(
1217db96d56Sopenharmony_ci                    self.multipart_msg.format(
1227db96d56Sopenharmony_ci                        "\nContent-Transfer-Encoding: base64"))
1237db96d56Sopenharmony_ci        if self.raise_expected: return
1247db96d56Sopenharmony_ci        self.assertEqual(len(self.get_defects(msg)), 1)
1257db96d56Sopenharmony_ci        self.assertIsInstance(self.get_defects(msg)[0],
1267db96d56Sopenharmony_ci            errors.InvalidMultipartContentTransferEncodingDefect)
1277db96d56Sopenharmony_ci
1287db96d56Sopenharmony_ci    def test_multipart_no_cte_no_defect(self):
1297db96d56Sopenharmony_ci        if self.raise_expected: return
1307db96d56Sopenharmony_ci        msg = self._str_msg(self.multipart_msg.format(''))
1317db96d56Sopenharmony_ci        self.assertEqual(len(self.get_defects(msg)), 0)
1327db96d56Sopenharmony_ci
1337db96d56Sopenharmony_ci    def test_multipart_valid_cte_no_defect(self):
1347db96d56Sopenharmony_ci        if self.raise_expected: return
1357db96d56Sopenharmony_ci        for cte in ('7bit', '8bit', 'BINary'):
1367db96d56Sopenharmony_ci            msg = self._str_msg(
1377db96d56Sopenharmony_ci                self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte))
1387db96d56Sopenharmony_ci            self.assertEqual(len(self.get_defects(msg)), 0, "cte="+cte)
1397db96d56Sopenharmony_ci
1407db96d56Sopenharmony_ci    def test_lying_multipart(self):
1417db96d56Sopenharmony_ci        source = textwrap.dedent("""\
1427db96d56Sopenharmony_ci            From: "Allison Dunlap" <xxx@example.com>
1437db96d56Sopenharmony_ci            To: yyy@example.com
1447db96d56Sopenharmony_ci            Subject: 64423
1457db96d56Sopenharmony_ci            Date: Sun, 11 Jul 2004 16:09:27 -0300
1467db96d56Sopenharmony_ci            MIME-Version: 1.0
1477db96d56Sopenharmony_ci            Content-Type: multipart/alternative;
1487db96d56Sopenharmony_ci
1497db96d56Sopenharmony_ci            Blah blah blah
1507db96d56Sopenharmony_ci            """)
1517db96d56Sopenharmony_ci        with self._raise_point(errors.NoBoundaryInMultipartDefect):
1527db96d56Sopenharmony_ci            msg = self._str_msg(source)
1537db96d56Sopenharmony_ci        if self.raise_expected: return
1547db96d56Sopenharmony_ci        self.assertTrue(hasattr(msg, 'defects'))
1557db96d56Sopenharmony_ci        self.assertEqual(len(self.get_defects(msg)), 2)
1567db96d56Sopenharmony_ci        self.assertIsInstance(self.get_defects(msg)[0],
1577db96d56Sopenharmony_ci                              errors.NoBoundaryInMultipartDefect)
1587db96d56Sopenharmony_ci        self.assertIsInstance(self.get_defects(msg)[1],
1597db96d56Sopenharmony_ci                              errors.MultipartInvariantViolationDefect)
1607db96d56Sopenharmony_ci
1617db96d56Sopenharmony_ci    def test_missing_start_boundary(self):
1627db96d56Sopenharmony_ci        source = textwrap.dedent("""\
1637db96d56Sopenharmony_ci            Content-Type: multipart/mixed; boundary="AAA"
1647db96d56Sopenharmony_ci            From: Mail Delivery Subsystem <xxx@example.com>
1657db96d56Sopenharmony_ci            To: yyy@example.com
1667db96d56Sopenharmony_ci
1677db96d56Sopenharmony_ci            --AAA
1687db96d56Sopenharmony_ci
1697db96d56Sopenharmony_ci            Stuff
1707db96d56Sopenharmony_ci
1717db96d56Sopenharmony_ci            --AAA
1727db96d56Sopenharmony_ci            Content-Type: message/rfc822
1737db96d56Sopenharmony_ci
1747db96d56Sopenharmony_ci            From: webmaster@python.org
1757db96d56Sopenharmony_ci            To: zzz@example.com
1767db96d56Sopenharmony_ci            Content-Type: multipart/mixed; boundary="BBB"
1777db96d56Sopenharmony_ci
1787db96d56Sopenharmony_ci            --BBB--
1797db96d56Sopenharmony_ci
1807db96d56Sopenharmony_ci            --AAA--
1817db96d56Sopenharmony_ci
1827db96d56Sopenharmony_ci            """)
1837db96d56Sopenharmony_ci        # The message structure is:
1847db96d56Sopenharmony_ci        #
1857db96d56Sopenharmony_ci        # multipart/mixed
1867db96d56Sopenharmony_ci        #    text/plain
1877db96d56Sopenharmony_ci        #    message/rfc822
1887db96d56Sopenharmony_ci        #        multipart/mixed [*]
1897db96d56Sopenharmony_ci        #
1907db96d56Sopenharmony_ci        # [*] This message is missing its start boundary
1917db96d56Sopenharmony_ci        with self._raise_point(errors.StartBoundaryNotFoundDefect):
1927db96d56Sopenharmony_ci            outer = self._str_msg(source)
1937db96d56Sopenharmony_ci        if self.raise_expected: return
1947db96d56Sopenharmony_ci        bad = outer.get_payload(1).get_payload(0)
1957db96d56Sopenharmony_ci        self.assertEqual(len(self.get_defects(bad)), 1)
1967db96d56Sopenharmony_ci        self.assertIsInstance(self.get_defects(bad)[0],
1977db96d56Sopenharmony_ci                              errors.StartBoundaryNotFoundDefect)
1987db96d56Sopenharmony_ci
1997db96d56Sopenharmony_ci    def test_first_line_is_continuation_header(self):
2007db96d56Sopenharmony_ci        with self._raise_point(errors.FirstHeaderLineIsContinuationDefect):
2017db96d56Sopenharmony_ci            msg = self._str_msg(' Line 1\nSubject: test\n\nbody')
2027db96d56Sopenharmony_ci        if self.raise_expected: return
2037db96d56Sopenharmony_ci        self.assertEqual(msg.keys(), ['Subject'])
2047db96d56Sopenharmony_ci        self.assertEqual(msg.get_payload(), 'body')
2057db96d56Sopenharmony_ci        self.assertEqual(len(self.get_defects(msg)), 1)
2067db96d56Sopenharmony_ci        self.assertDefectsEqual(self.get_defects(msg),
2077db96d56Sopenharmony_ci                                 [errors.FirstHeaderLineIsContinuationDefect])
2087db96d56Sopenharmony_ci        self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n')
2097db96d56Sopenharmony_ci
2107db96d56Sopenharmony_ci    def test_missing_header_body_separator(self):
2117db96d56Sopenharmony_ci        # Our heuristic if we see a line that doesn't look like a header (no
2127db96d56Sopenharmony_ci        # leading whitespace but no ':') is to assume that the blank line that
2137db96d56Sopenharmony_ci        # separates the header from the body is missing, and to stop parsing
2147db96d56Sopenharmony_ci        # headers and start parsing the body.
2157db96d56Sopenharmony_ci        with self._raise_point(errors.MissingHeaderBodySeparatorDefect):
2167db96d56Sopenharmony_ci            msg = self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n')
2177db96d56Sopenharmony_ci        if self.raise_expected: return
2187db96d56Sopenharmony_ci        self.assertEqual(msg.keys(), ['Subject'])
2197db96d56Sopenharmony_ci        self.assertEqual(msg.get_payload(), 'not a header\nTo: abc\n\nb\n')
2207db96d56Sopenharmony_ci        self.assertDefectsEqual(self.get_defects(msg),
2217db96d56Sopenharmony_ci                                [errors.MissingHeaderBodySeparatorDefect])
2227db96d56Sopenharmony_ci
2237db96d56Sopenharmony_ci    def test_bad_padding_in_base64_payload(self):
2247db96d56Sopenharmony_ci        source = textwrap.dedent("""\
2257db96d56Sopenharmony_ci            Subject: test
2267db96d56Sopenharmony_ci            MIME-Version: 1.0
2277db96d56Sopenharmony_ci            Content-Type: text/plain; charset="utf-8"
2287db96d56Sopenharmony_ci            Content-Transfer-Encoding: base64
2297db96d56Sopenharmony_ci
2307db96d56Sopenharmony_ci            dmk
2317db96d56Sopenharmony_ci            """)
2327db96d56Sopenharmony_ci        msg = self._str_msg(source)
2337db96d56Sopenharmony_ci        with self._raise_point(errors.InvalidBase64PaddingDefect):
2347db96d56Sopenharmony_ci            payload = msg.get_payload(decode=True)
2357db96d56Sopenharmony_ci        if self.raise_expected: return
2367db96d56Sopenharmony_ci        self.assertEqual(payload, b'vi')
2377db96d56Sopenharmony_ci        self.assertDefectsEqual(self.get_defects(msg),
2387db96d56Sopenharmony_ci                                [errors.InvalidBase64PaddingDefect])
2397db96d56Sopenharmony_ci
2407db96d56Sopenharmony_ci    def test_invalid_chars_in_base64_payload(self):
2417db96d56Sopenharmony_ci        source = textwrap.dedent("""\
2427db96d56Sopenharmony_ci            Subject: test
2437db96d56Sopenharmony_ci            MIME-Version: 1.0
2447db96d56Sopenharmony_ci            Content-Type: text/plain; charset="utf-8"
2457db96d56Sopenharmony_ci            Content-Transfer-Encoding: base64
2467db96d56Sopenharmony_ci
2477db96d56Sopenharmony_ci            dm\x01k===
2487db96d56Sopenharmony_ci            """)
2497db96d56Sopenharmony_ci        msg = self._str_msg(source)
2507db96d56Sopenharmony_ci        with self._raise_point(errors.InvalidBase64CharactersDefect):
2517db96d56Sopenharmony_ci            payload = msg.get_payload(decode=True)
2527db96d56Sopenharmony_ci        if self.raise_expected: return
2537db96d56Sopenharmony_ci        self.assertEqual(payload, b'vi')
2547db96d56Sopenharmony_ci        self.assertDefectsEqual(self.get_defects(msg),
2557db96d56Sopenharmony_ci                                [errors.InvalidBase64CharactersDefect])
2567db96d56Sopenharmony_ci
2577db96d56Sopenharmony_ci    def test_invalid_length_of_base64_payload(self):
2587db96d56Sopenharmony_ci        source = textwrap.dedent("""\
2597db96d56Sopenharmony_ci            Subject: test
2607db96d56Sopenharmony_ci            MIME-Version: 1.0
2617db96d56Sopenharmony_ci            Content-Type: text/plain; charset="utf-8"
2627db96d56Sopenharmony_ci            Content-Transfer-Encoding: base64
2637db96d56Sopenharmony_ci
2647db96d56Sopenharmony_ci            abcde
2657db96d56Sopenharmony_ci            """)
2667db96d56Sopenharmony_ci        msg = self._str_msg(source)
2677db96d56Sopenharmony_ci        with self._raise_point(errors.InvalidBase64LengthDefect):
2687db96d56Sopenharmony_ci            payload = msg.get_payload(decode=True)
2697db96d56Sopenharmony_ci        if self.raise_expected: return
2707db96d56Sopenharmony_ci        self.assertEqual(payload, b'abcde')
2717db96d56Sopenharmony_ci        self.assertDefectsEqual(self.get_defects(msg),
2727db96d56Sopenharmony_ci                                [errors.InvalidBase64LengthDefect])
2737db96d56Sopenharmony_ci
2747db96d56Sopenharmony_ci    def test_missing_ending_boundary(self):
2757db96d56Sopenharmony_ci        source = textwrap.dedent("""\
2767db96d56Sopenharmony_ci            To: 1@harrydomain4.com
2777db96d56Sopenharmony_ci            Subject: Fwd: 1
2787db96d56Sopenharmony_ci            MIME-Version: 1.0
2797db96d56Sopenharmony_ci            Content-Type: multipart/alternative;
2807db96d56Sopenharmony_ci             boundary="------------000101020201080900040301"
2817db96d56Sopenharmony_ci
2827db96d56Sopenharmony_ci            --------------000101020201080900040301
2837db96d56Sopenharmony_ci            Content-Type: text/plain; charset=ISO-8859-1
2847db96d56Sopenharmony_ci            Content-Transfer-Encoding: 7bit
2857db96d56Sopenharmony_ci
2867db96d56Sopenharmony_ci            Alternative 1
2877db96d56Sopenharmony_ci
2887db96d56Sopenharmony_ci            --------------000101020201080900040301
2897db96d56Sopenharmony_ci            Content-Type: text/html; charset=ISO-8859-1
2907db96d56Sopenharmony_ci            Content-Transfer-Encoding: 7bit
2917db96d56Sopenharmony_ci
2927db96d56Sopenharmony_ci            Alternative 2
2937db96d56Sopenharmony_ci
2947db96d56Sopenharmony_ci            """)
2957db96d56Sopenharmony_ci        with self._raise_point(errors.CloseBoundaryNotFoundDefect):
2967db96d56Sopenharmony_ci            msg = self._str_msg(source)
2977db96d56Sopenharmony_ci        if self.raise_expected: return
2987db96d56Sopenharmony_ci        self.assertEqual(len(msg.get_payload()), 2)
2997db96d56Sopenharmony_ci        self.assertEqual(msg.get_payload(1).get_payload(), 'Alternative 2\n')
3007db96d56Sopenharmony_ci        self.assertDefectsEqual(self.get_defects(msg),
3017db96d56Sopenharmony_ci                                [errors.CloseBoundaryNotFoundDefect])
3027db96d56Sopenharmony_ci
3037db96d56Sopenharmony_ci
3047db96d56Sopenharmony_ciclass TestDefectDetection(TestDefectsBase, TestEmailBase):
3057db96d56Sopenharmony_ci
3067db96d56Sopenharmony_ci    def get_defects(self, obj):
3077db96d56Sopenharmony_ci        return obj.defects
3087db96d56Sopenharmony_ci
3097db96d56Sopenharmony_ci
3107db96d56Sopenharmony_ciclass TestDefectCapture(TestDefectsBase, TestEmailBase):
3117db96d56Sopenharmony_ci
3127db96d56Sopenharmony_ci    class CapturePolicy(policy.EmailPolicy):
3137db96d56Sopenharmony_ci        captured = None
3147db96d56Sopenharmony_ci        def register_defect(self, obj, defect):
3157db96d56Sopenharmony_ci            self.captured.append(defect)
3167db96d56Sopenharmony_ci
3177db96d56Sopenharmony_ci    def setUp(self):
3187db96d56Sopenharmony_ci        self.policy = self.CapturePolicy(captured=list())
3197db96d56Sopenharmony_ci
3207db96d56Sopenharmony_ci    def get_defects(self, obj):
3217db96d56Sopenharmony_ci        return self.policy.captured
3227db96d56Sopenharmony_ci
3237db96d56Sopenharmony_ci
3247db96d56Sopenharmony_ciclass TestDefectRaising(TestDefectsBase, TestEmailBase):
3257db96d56Sopenharmony_ci
3267db96d56Sopenharmony_ci    policy = TestDefectsBase.policy
3277db96d56Sopenharmony_ci    policy = policy.clone(raise_on_defect=True)
3287db96d56Sopenharmony_ci    raise_expected = True
3297db96d56Sopenharmony_ci
3307db96d56Sopenharmony_ci    @contextlib.contextmanager
3317db96d56Sopenharmony_ci    def _raise_point(self, defect):
3327db96d56Sopenharmony_ci        with self.assertRaises(defect):
3337db96d56Sopenharmony_ci            yield
3347db96d56Sopenharmony_ci
3357db96d56Sopenharmony_ci
3367db96d56Sopenharmony_ciif __name__ == '__main__':
3377db96d56Sopenharmony_ci    unittest.main()
338