17db96d56Sopenharmony_ciimport binascii
27db96d56Sopenharmony_ciimport email.charset
37db96d56Sopenharmony_ciimport email.message
47db96d56Sopenharmony_ciimport email.errors
57db96d56Sopenharmony_cifrom email import quoprimime
67db96d56Sopenharmony_ci
77db96d56Sopenharmony_ciclass ContentManager:
87db96d56Sopenharmony_ci
97db96d56Sopenharmony_ci    def __init__(self):
107db96d56Sopenharmony_ci        self.get_handlers = {}
117db96d56Sopenharmony_ci        self.set_handlers = {}
127db96d56Sopenharmony_ci
137db96d56Sopenharmony_ci    def add_get_handler(self, key, handler):
147db96d56Sopenharmony_ci        self.get_handlers[key] = handler
157db96d56Sopenharmony_ci
167db96d56Sopenharmony_ci    def get_content(self, msg, *args, **kw):
177db96d56Sopenharmony_ci        content_type = msg.get_content_type()
187db96d56Sopenharmony_ci        if content_type in self.get_handlers:
197db96d56Sopenharmony_ci            return self.get_handlers[content_type](msg, *args, **kw)
207db96d56Sopenharmony_ci        maintype = msg.get_content_maintype()
217db96d56Sopenharmony_ci        if maintype in self.get_handlers:
227db96d56Sopenharmony_ci            return self.get_handlers[maintype](msg, *args, **kw)
237db96d56Sopenharmony_ci        if '' in self.get_handlers:
247db96d56Sopenharmony_ci            return self.get_handlers[''](msg, *args, **kw)
257db96d56Sopenharmony_ci        raise KeyError(content_type)
267db96d56Sopenharmony_ci
277db96d56Sopenharmony_ci    def add_set_handler(self, typekey, handler):
287db96d56Sopenharmony_ci        self.set_handlers[typekey] = handler
297db96d56Sopenharmony_ci
307db96d56Sopenharmony_ci    def set_content(self, msg, obj, *args, **kw):
317db96d56Sopenharmony_ci        if msg.get_content_maintype() == 'multipart':
327db96d56Sopenharmony_ci            # XXX: is this error a good idea or not?  We can remove it later,
337db96d56Sopenharmony_ci            # but we can't add it later, so do it for now.
347db96d56Sopenharmony_ci            raise TypeError("set_content not valid on multipart")
357db96d56Sopenharmony_ci        handler = self._find_set_handler(msg, obj)
367db96d56Sopenharmony_ci        msg.clear_content()
377db96d56Sopenharmony_ci        handler(msg, obj, *args, **kw)
387db96d56Sopenharmony_ci
397db96d56Sopenharmony_ci    def _find_set_handler(self, msg, obj):
407db96d56Sopenharmony_ci        full_path_for_error = None
417db96d56Sopenharmony_ci        for typ in type(obj).__mro__:
427db96d56Sopenharmony_ci            if typ in self.set_handlers:
437db96d56Sopenharmony_ci                return self.set_handlers[typ]
447db96d56Sopenharmony_ci            qname = typ.__qualname__
457db96d56Sopenharmony_ci            modname = getattr(typ, '__module__', '')
467db96d56Sopenharmony_ci            full_path = '.'.join((modname, qname)) if modname else qname
477db96d56Sopenharmony_ci            if full_path_for_error is None:
487db96d56Sopenharmony_ci                full_path_for_error = full_path
497db96d56Sopenharmony_ci            if full_path in self.set_handlers:
507db96d56Sopenharmony_ci                return self.set_handlers[full_path]
517db96d56Sopenharmony_ci            if qname in self.set_handlers:
527db96d56Sopenharmony_ci                return self.set_handlers[qname]
537db96d56Sopenharmony_ci            name = typ.__name__
547db96d56Sopenharmony_ci            if name in self.set_handlers:
557db96d56Sopenharmony_ci                return self.set_handlers[name]
567db96d56Sopenharmony_ci        if None in self.set_handlers:
577db96d56Sopenharmony_ci            return self.set_handlers[None]
587db96d56Sopenharmony_ci        raise KeyError(full_path_for_error)
597db96d56Sopenharmony_ci
607db96d56Sopenharmony_ci
617db96d56Sopenharmony_ciraw_data_manager = ContentManager()
627db96d56Sopenharmony_ci
637db96d56Sopenharmony_ci
647db96d56Sopenharmony_cidef get_text_content(msg, errors='replace'):
657db96d56Sopenharmony_ci    content = msg.get_payload(decode=True)
667db96d56Sopenharmony_ci    charset = msg.get_param('charset', 'ASCII')
677db96d56Sopenharmony_ci    return content.decode(charset, errors=errors)
687db96d56Sopenharmony_ciraw_data_manager.add_get_handler('text', get_text_content)
697db96d56Sopenharmony_ci
707db96d56Sopenharmony_ci
717db96d56Sopenharmony_cidef get_non_text_content(msg):
727db96d56Sopenharmony_ci    return msg.get_payload(decode=True)
737db96d56Sopenharmony_cifor maintype in 'audio image video application'.split():
747db96d56Sopenharmony_ci    raw_data_manager.add_get_handler(maintype, get_non_text_content)
757db96d56Sopenharmony_cidel maintype
767db96d56Sopenharmony_ci
777db96d56Sopenharmony_ci
787db96d56Sopenharmony_cidef get_message_content(msg):
797db96d56Sopenharmony_ci    return msg.get_payload(0)
807db96d56Sopenharmony_cifor subtype in 'rfc822 external-body'.split():
817db96d56Sopenharmony_ci    raw_data_manager.add_get_handler('message/'+subtype, get_message_content)
827db96d56Sopenharmony_cidel subtype
837db96d56Sopenharmony_ci
847db96d56Sopenharmony_ci
857db96d56Sopenharmony_cidef get_and_fixup_unknown_message_content(msg):
867db96d56Sopenharmony_ci    # If we don't understand a message subtype, we are supposed to treat it as
877db96d56Sopenharmony_ci    # if it were application/octet-stream, per
887db96d56Sopenharmony_ci    # tools.ietf.org/html/rfc2046#section-5.2.4.  Feedparser doesn't do that,
897db96d56Sopenharmony_ci    # so do our best to fix things up.  Note that it is *not* appropriate to
907db96d56Sopenharmony_ci    # model message/partial content as Message objects, so they are handled
917db96d56Sopenharmony_ci    # here as well.  (How to reassemble them is out of scope for this comment :)
927db96d56Sopenharmony_ci    return bytes(msg.get_payload(0))
937db96d56Sopenharmony_ciraw_data_manager.add_get_handler('message',
947db96d56Sopenharmony_ci                                 get_and_fixup_unknown_message_content)
957db96d56Sopenharmony_ci
967db96d56Sopenharmony_ci
977db96d56Sopenharmony_cidef _prepare_set(msg, maintype, subtype, headers):
987db96d56Sopenharmony_ci    msg['Content-Type'] = '/'.join((maintype, subtype))
997db96d56Sopenharmony_ci    if headers:
1007db96d56Sopenharmony_ci        if not hasattr(headers[0], 'name'):
1017db96d56Sopenharmony_ci            mp = msg.policy
1027db96d56Sopenharmony_ci            headers = [mp.header_factory(*mp.header_source_parse([header]))
1037db96d56Sopenharmony_ci                       for header in headers]
1047db96d56Sopenharmony_ci        try:
1057db96d56Sopenharmony_ci            for header in headers:
1067db96d56Sopenharmony_ci                if header.defects:
1077db96d56Sopenharmony_ci                    raise header.defects[0]
1087db96d56Sopenharmony_ci                msg[header.name] = header
1097db96d56Sopenharmony_ci        except email.errors.HeaderDefect as exc:
1107db96d56Sopenharmony_ci            raise ValueError("Invalid header: {}".format(
1117db96d56Sopenharmony_ci                                header.fold(policy=msg.policy))) from exc
1127db96d56Sopenharmony_ci
1137db96d56Sopenharmony_ci
1147db96d56Sopenharmony_cidef _finalize_set(msg, disposition, filename, cid, params):
1157db96d56Sopenharmony_ci    if disposition is None and filename is not None:
1167db96d56Sopenharmony_ci        disposition = 'attachment'
1177db96d56Sopenharmony_ci    if disposition is not None:
1187db96d56Sopenharmony_ci        msg['Content-Disposition'] = disposition
1197db96d56Sopenharmony_ci    if filename is not None:
1207db96d56Sopenharmony_ci        msg.set_param('filename',
1217db96d56Sopenharmony_ci                      filename,
1227db96d56Sopenharmony_ci                      header='Content-Disposition',
1237db96d56Sopenharmony_ci                      replace=True)
1247db96d56Sopenharmony_ci    if cid is not None:
1257db96d56Sopenharmony_ci        msg['Content-ID'] = cid
1267db96d56Sopenharmony_ci    if params is not None:
1277db96d56Sopenharmony_ci        for key, value in params.items():
1287db96d56Sopenharmony_ci            msg.set_param(key, value)
1297db96d56Sopenharmony_ci
1307db96d56Sopenharmony_ci
1317db96d56Sopenharmony_ci# XXX: This is a cleaned-up version of base64mime.body_encode (including a bug
1327db96d56Sopenharmony_ci# fix in the calculation of unencoded_bytes_per_line).  It would be nice to
1337db96d56Sopenharmony_ci# drop both this and quoprimime.body_encode in favor of enhanced binascii
1347db96d56Sopenharmony_ci# routines that accepted a max_line_length parameter.
1357db96d56Sopenharmony_cidef _encode_base64(data, max_line_length):
1367db96d56Sopenharmony_ci    encoded_lines = []
1377db96d56Sopenharmony_ci    unencoded_bytes_per_line = max_line_length // 4 * 3
1387db96d56Sopenharmony_ci    for i in range(0, len(data), unencoded_bytes_per_line):
1397db96d56Sopenharmony_ci        thisline = data[i:i+unencoded_bytes_per_line]
1407db96d56Sopenharmony_ci        encoded_lines.append(binascii.b2a_base64(thisline).decode('ascii'))
1417db96d56Sopenharmony_ci    return ''.join(encoded_lines)
1427db96d56Sopenharmony_ci
1437db96d56Sopenharmony_ci
1447db96d56Sopenharmony_cidef _encode_text(string, charset, cte, policy):
1457db96d56Sopenharmony_ci    lines = string.encode(charset).splitlines()
1467db96d56Sopenharmony_ci    linesep = policy.linesep.encode('ascii')
1477db96d56Sopenharmony_ci    def embedded_body(lines): return linesep.join(lines) + linesep
1487db96d56Sopenharmony_ci    def normal_body(lines): return b'\n'.join(lines) + b'\n'
1497db96d56Sopenharmony_ci    if cte is None:
1507db96d56Sopenharmony_ci        # Use heuristics to decide on the "best" encoding.
1517db96d56Sopenharmony_ci        if max((len(x) for x in lines), default=0) <= policy.max_line_length:
1527db96d56Sopenharmony_ci            try:
1537db96d56Sopenharmony_ci                return '7bit', normal_body(lines).decode('ascii')
1547db96d56Sopenharmony_ci            except UnicodeDecodeError:
1557db96d56Sopenharmony_ci                pass
1567db96d56Sopenharmony_ci            if policy.cte_type == '8bit':
1577db96d56Sopenharmony_ci                return '8bit', normal_body(lines).decode('ascii', 'surrogateescape')
1587db96d56Sopenharmony_ci        sniff = embedded_body(lines[:10])
1597db96d56Sopenharmony_ci        sniff_qp = quoprimime.body_encode(sniff.decode('latin-1'),
1607db96d56Sopenharmony_ci                                          policy.max_line_length)
1617db96d56Sopenharmony_ci        sniff_base64 = binascii.b2a_base64(sniff)
1627db96d56Sopenharmony_ci        # This is a little unfair to qp; it includes lineseps, base64 doesn't.
1637db96d56Sopenharmony_ci        if len(sniff_qp) > len(sniff_base64):
1647db96d56Sopenharmony_ci            cte = 'base64'
1657db96d56Sopenharmony_ci        else:
1667db96d56Sopenharmony_ci            cte = 'quoted-printable'
1677db96d56Sopenharmony_ci            if len(lines) <= 10:
1687db96d56Sopenharmony_ci                return cte, sniff_qp
1697db96d56Sopenharmony_ci    if cte == '7bit':
1707db96d56Sopenharmony_ci        data = normal_body(lines).decode('ascii')
1717db96d56Sopenharmony_ci    elif cte == '8bit':
1727db96d56Sopenharmony_ci        data = normal_body(lines).decode('ascii', 'surrogateescape')
1737db96d56Sopenharmony_ci    elif cte == 'quoted-printable':
1747db96d56Sopenharmony_ci        data = quoprimime.body_encode(normal_body(lines).decode('latin-1'),
1757db96d56Sopenharmony_ci                                      policy.max_line_length)
1767db96d56Sopenharmony_ci    elif cte == 'base64':
1777db96d56Sopenharmony_ci        data = _encode_base64(embedded_body(lines), policy.max_line_length)
1787db96d56Sopenharmony_ci    else:
1797db96d56Sopenharmony_ci        raise ValueError("Unknown content transfer encoding {}".format(cte))
1807db96d56Sopenharmony_ci    return cte, data
1817db96d56Sopenharmony_ci
1827db96d56Sopenharmony_ci
1837db96d56Sopenharmony_cidef set_text_content(msg, string, subtype="plain", charset='utf-8', cte=None,
1847db96d56Sopenharmony_ci                     disposition=None, filename=None, cid=None,
1857db96d56Sopenharmony_ci                     params=None, headers=None):
1867db96d56Sopenharmony_ci    _prepare_set(msg, 'text', subtype, headers)
1877db96d56Sopenharmony_ci    cte, payload = _encode_text(string, charset, cte, msg.policy)
1887db96d56Sopenharmony_ci    msg.set_payload(payload)
1897db96d56Sopenharmony_ci    msg.set_param('charset',
1907db96d56Sopenharmony_ci                  email.charset.ALIASES.get(charset, charset),
1917db96d56Sopenharmony_ci                  replace=True)
1927db96d56Sopenharmony_ci    msg['Content-Transfer-Encoding'] = cte
1937db96d56Sopenharmony_ci    _finalize_set(msg, disposition, filename, cid, params)
1947db96d56Sopenharmony_ciraw_data_manager.add_set_handler(str, set_text_content)
1957db96d56Sopenharmony_ci
1967db96d56Sopenharmony_ci
1977db96d56Sopenharmony_cidef set_message_content(msg, message, subtype="rfc822", cte=None,
1987db96d56Sopenharmony_ci                       disposition=None, filename=None, cid=None,
1997db96d56Sopenharmony_ci                       params=None, headers=None):
2007db96d56Sopenharmony_ci    if subtype == 'partial':
2017db96d56Sopenharmony_ci        raise ValueError("message/partial is not supported for Message objects")
2027db96d56Sopenharmony_ci    if subtype == 'rfc822':
2037db96d56Sopenharmony_ci        if cte not in (None, '7bit', '8bit', 'binary'):
2047db96d56Sopenharmony_ci            # http://tools.ietf.org/html/rfc2046#section-5.2.1 mandate.
2057db96d56Sopenharmony_ci            raise ValueError(
2067db96d56Sopenharmony_ci                "message/rfc822 parts do not support cte={}".format(cte))
2077db96d56Sopenharmony_ci        # 8bit will get coerced on serialization if policy.cte_type='7bit'.  We
2087db96d56Sopenharmony_ci        # may end up claiming 8bit when it isn't needed, but the only negative
2097db96d56Sopenharmony_ci        # result of that should be a gateway that needs to coerce to 7bit
2107db96d56Sopenharmony_ci        # having to look through the whole embedded message to discover whether
2117db96d56Sopenharmony_ci        # or not it actually has to do anything.
2127db96d56Sopenharmony_ci        cte = '8bit' if cte is None else cte
2137db96d56Sopenharmony_ci    elif subtype == 'external-body':
2147db96d56Sopenharmony_ci        if cte not in (None, '7bit'):
2157db96d56Sopenharmony_ci            # http://tools.ietf.org/html/rfc2046#section-5.2.3 mandate.
2167db96d56Sopenharmony_ci            raise ValueError(
2177db96d56Sopenharmony_ci                "message/external-body parts do not support cte={}".format(cte))
2187db96d56Sopenharmony_ci        cte = '7bit'
2197db96d56Sopenharmony_ci    elif cte is None:
2207db96d56Sopenharmony_ci        # http://tools.ietf.org/html/rfc2046#section-5.2.4 says all future
2217db96d56Sopenharmony_ci        # subtypes should be restricted to 7bit, so assume that.
2227db96d56Sopenharmony_ci        cte = '7bit'
2237db96d56Sopenharmony_ci    _prepare_set(msg, 'message', subtype, headers)
2247db96d56Sopenharmony_ci    msg.set_payload([message])
2257db96d56Sopenharmony_ci    msg['Content-Transfer-Encoding'] = cte
2267db96d56Sopenharmony_ci    _finalize_set(msg, disposition, filename, cid, params)
2277db96d56Sopenharmony_ciraw_data_manager.add_set_handler(email.message.Message, set_message_content)
2287db96d56Sopenharmony_ci
2297db96d56Sopenharmony_ci
2307db96d56Sopenharmony_cidef set_bytes_content(msg, data, maintype, subtype, cte='base64',
2317db96d56Sopenharmony_ci                     disposition=None, filename=None, cid=None,
2327db96d56Sopenharmony_ci                     params=None, headers=None):
2337db96d56Sopenharmony_ci    _prepare_set(msg, maintype, subtype, headers)
2347db96d56Sopenharmony_ci    if cte == 'base64':
2357db96d56Sopenharmony_ci        data = _encode_base64(data, max_line_length=msg.policy.max_line_length)
2367db96d56Sopenharmony_ci    elif cte == 'quoted-printable':
2377db96d56Sopenharmony_ci        # XXX: quoprimime.body_encode won't encode newline characters in data,
2387db96d56Sopenharmony_ci        # so we can't use it.  This means max_line_length is ignored.  Another
2397db96d56Sopenharmony_ci        # bug to fix later.  (Note: encoders.quopri is broken on line ends.)
2407db96d56Sopenharmony_ci        data = binascii.b2a_qp(data, istext=False, header=False, quotetabs=True)
2417db96d56Sopenharmony_ci        data = data.decode('ascii')
2427db96d56Sopenharmony_ci    elif cte == '7bit':
2437db96d56Sopenharmony_ci        data = data.decode('ascii')
2447db96d56Sopenharmony_ci    elif cte in ('8bit', 'binary'):
2457db96d56Sopenharmony_ci        data = data.decode('ascii', 'surrogateescape')
2467db96d56Sopenharmony_ci    msg.set_payload(data)
2477db96d56Sopenharmony_ci    msg['Content-Transfer-Encoding'] = cte
2487db96d56Sopenharmony_ci    _finalize_set(msg, disposition, filename, cid, params)
2497db96d56Sopenharmony_cifor typ in (bytes, bytearray, memoryview):
2507db96d56Sopenharmony_ci    raw_data_manager.add_set_handler(typ, set_bytes_content)
2517db96d56Sopenharmony_cidel typ
252