From 4b3d08137ef53072edd6ddc43cf1f60fce920d9e Mon Sep 17 00:00:00 2001 From: androiddrew Date: Fri, 19 Jan 2018 16:34:45 -0500 Subject: [PATCH] Ported flask_mail code to apistar constructs --- apistar_mail/__init__.py | 1 + apistar_mail/exc.py | 12 + apistar_mail/mail.py | 484 +++++++++++++++++++++++++++++++++++-- tests/test_apistar_mail.py | 45 +++- 4 files changed, 516 insertions(+), 26 deletions(-) create mode 100644 apistar_mail/exc.py diff --git a/apistar_mail/__init__.py b/apistar_mail/__init__.py index e69de29..c12f34c 100644 --- a/apistar_mail/__init__.py +++ b/apistar_mail/__init__.py @@ -0,0 +1 @@ +__version__ = '0.1.1' \ No newline at end of file diff --git a/apistar_mail/exc.py b/apistar_mail/exc.py new file mode 100644 index 0000000..9d33a72 --- /dev/null +++ b/apistar_mail/exc.py @@ -0,0 +1,12 @@ +class MailUnicodeDecodeError(UnicodeDecodeError): + def __init__(self, obj, *args): + self.obj = obj + UnicodeDecodeError.__init__(self, *args) + + def __str__(self): + original = UnicodeDecodeError.__str__(self) + return '%s. You passed in %r (%s)' % (original, self.obj, type(self.obj)) + + +class BadHeaderError(Exception): + pass diff --git a/apistar_mail/mail.py b/apistar_mail/mail.py index 64151e9..9bd43cd 100644 --- a/apistar_mail/mail.py +++ b/apistar_mail/mail.py @@ -1,39 +1,485 @@ -import email +import re import smtplib -import typing +import time +import unicodedata + +from email import charset, policy +from email.encoders import encode_base64 +from email.mime.base import MIMEBase +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from email.header import Header +from email.utils import formatdate, formataddr, make_msgid, parseaddr + +from apistar import Settings + +from .exc import MailUnicodeDecodeError, BadHeaderError + +# TODO Figure out why this is necessary +charset.add_charset('utf-8', charset.SHORTEST, None, 'utf-8') + + +# TODO ok there is something going here that you may not really grok yet. check +def force_text(s, encoding='utf-8', errors='strict', ): + """ + Similar to smart_text, except that lazy instances are resolved to + strings, rather than kept as lazy objects. + + If strings_only is True, don't convert (some) non-string-like objects. + """ + if isinstance(s, str): + return s + + try: + if not isinstance(s, str): + if isinstance(s, bytes): + s = s.decode(encoding, errors) + else: + s = str(s) + + except UnicodeDecodeError as e: + try: + if not isinstance(s, Exception): + raise MailUnicodeDecodeError(s, *e.args) + else: + s = ' '.join([force_text(arg, encoding, errors) for arg in s]) + except: + raise MailUnicodeDecodeError(s, *e.args) + + return s + + # try: + # if not isinstance(s, string_types): + # if PY3: + # if isinstance(s, bytes): + # s = text_type(s, encoding, errors) + # else: + # s = text_type(s) + # elif hasattr(s, '__unicode__'): + # s = s.__unicode__() + # else: + # s = text_type(bytes(s), encoding, errors) + # else: + # s = s.decode(encoding, errors) + # except UnicodeDecodeError as e: + # if not isinstance(s, Exception): + # raise FlaskMailUnicodeDecodeError(s, *e.args) + + # return s + + +def sanitize_subject(subject, encoding='utf-8'): + try: + subject.encode('ascii') + except UnicodeEncodeError: + try: + subject = Header(subject, encoding).encode() + except UnicodeEncodeError: + subject = Header(subject, 'utf-8').encode() + return subject + + +def sanitize_address(addr, encoding='utf-8'): + if isinstance(addr, str): + addr = parseaddr(force_text(addr)) + nm, addr = addr + + try: + nm = Header(nm, encoding).encode() + except UnicodeEncodeError: + nm = Header(nm, 'utf-8').encode() + try: + addr.encode('ascii') + except UnicodeEncodeError: # IDN + if '@' in addr: + localpart, domain = addr.split('@', 1) + localpart = str(Header(localpart, encoding)) + domain = domain.encode('idna').decode('ascii') + addr = '@'.join([localpart, domain]) + else: + addr = Header(addr, encoding).encode() + return formataddr((nm, addr)) + + +def sanitize_addresses(addresses, encoding='utf-8'): + return map(lambda e: sanitize_address(e, encoding), addresses) + + +def _has_newline(line): + """Used by has_bad_header to check for \\r or \\n""" + if line and ('\r' in line or '\n' in line): + return True + return False + -class Connection: - """Handles connection to host""" - pass class Attachment: - """Encapsulates file attachment information""" - pass + """Encapsulates file attachment information. + + :param filename: filename of attachment + :param content_type: file mimetype + :param data: the raw file data + :param disposition: content-disposition (if any) + """ + + def __init__(self, filename=None, content_type=None, data=None, + disposition=None, headers=None): + self.filename = filename + self.content_type = content_type + self.data = data + self.disposition = disposition or 'attachment' + self.headers = headers or {} class Message: - """Encapsulates an email message""" + """Encapsulates an email message. + + :param subject: email subject header + :param recipients: list of email addresses + :param body: plain text message + :param html: HTML message + :param alts: A dict or an iterable to go through dict() that contains multipart alternatives + :param sender: email sender address, or **MAIL_DEFAULT_SENDER** by default + :param cc: CC list + :param bcc: BCC list + :param attachments: list of Attachment instances + :param reply_to: reply-to address + :param date: send date + :param charset: message character set + :param extra_headers: A dictionary of additional headers for the message + :param mail_options: A list of ESMTP options to be used in MAIL FROM command + :param rcpt_options: A list of ESMTP options to be used in RCPT commands + """ + + def __init__(self, subject='', + recipients=None, + body=None, + html=None, + alts=None, + sender=None, + cc=None, + bcc=None, + attachments=None, + reply_to=None, + date=None, + charset=None, + extra_headers=None, + mail_options=None, + rcpt_options=None): + + if isinstance(sender, tuple): + sender = "{} <{}>".format(*sender) - def __init__(self, subject: str, - recipients: typing.List[str]=None): - self.subject = subject self.recipients = recipients or [] + self.subject = subject + self.sender = sender + self.reply_to = reply_to + self.cc = cc or [] + self.bcc = bcc or [] + self.body = body + self.alts = dict(alts or {}) + self.html = html + self.date = date + self.msgId = make_msgid() + self.charset = charset + self.extra_headers = extra_headers + self.mail_options = mail_options or [] + self.rcpt_options = rcpt_options or [] + self.attachments = attachments or [] + + @property + def send_to(self): + return set(self.recipients) | set(self.bcc or ()) | set(self.cc or ()) + + @property + def html(self): + return self.alts.get('html') + + @html.setter + def html(self, value): + if value is None: + self.alts.pop('html', None) + else: + self.alts['html'] = value + + def _mimetext(self, text, subtype='plain'): + """Creates a MIMEText object with the given subtype (default: 'plain') + If the text is unicode, the utf-8 charset is used. + """ + charset = self.charset or 'utf-8' + return MIMEText(text, _subtype=subtype, _charset=charset) + + def _message(self): + """Creates the email""" + ascii_attachments = None # current_app.extensions['mail'].ascii_attachments # What does this do? + encoding = self.charset or 'utf-8' + + attachments = self.attachments or [] + + if len(attachments) == 0 and not self.alts: + # No html content and zero attachments means plain text + msg = self._mimetext(self.body) + elif len(attachments) > 0 and not self.alts: + # No html and at least one attachment means multipart + msg = MIMEMultipart() + msg.attach(self._mimetext(self.body)) + else: + # Anything else + msg = MIMEMultipart() + alternative = MIMEMultipart('alternative') + alternative.attach(self._mimetext(self.body, 'plain')) + for mimetype, content in self.alts.items(): + alternative.attach(self._mimetext(content, mimetype)) + msg.attach(alternative) + + if self.subject: + msg['Subject'] = sanitize_subject(force_text(self.subject), encoding) + + msg['From'] = sanitize_address(self.sender, encoding) + msg['To'] = ', '.join(list(set(sanitize_addresses(self.recipients, encoding)))) + + msg['Date'] = formatdate(self.date, localtime=True) + # see RFC 5322 section 3.6.4. + msg['Message-ID'] = self.msgId + + if self.cc: + msg['Cc'] = ', '.join(list(set(sanitize_addresses(self.cc, encoding)))) + + if self.reply_to: + msg['Reply-To'] = sanitize_address(self.reply_to, encoding) + + if self.extra_headers: + for k, v in self.extra_headers.items(): + msg[k] = v + + SPACES = re.compile(r'[\s]+', re.UNICODE) + for attachment in attachments: + f = MIMEBase(*attachment.content_type.split('/')) + f.set_payload(attachment.data) + encode_base64(f) + + filename = attachment.filename + if filename and ascii_attachments: + # force filename to ascii + filename = unicodedata.normalize('NFKD', filename) + filename = filename.encode('ascii', 'ignore').decode('ascii') + filename = SPACES.sub(u' ', filename).strip() + + try: + filename and filename.encode('ascii') + # TODO Figure out why this is needed. + except UnicodeEncodeError: + filename = ('UTF8', '', filename) + + f.add_header('Content-Disposition', + attachment.disposition, + filename=filename) + + for key, value in attachment.headers.items(): + f.add_header(key, value) + + msg.attach(f) + + # TODO FIGURE out why this was needed for PY3 in the original package + msg.policy = policy.SMTP + + return msg + + # TODO Figure out if this is needed anymore + def as_string(self): + return self._message().as_string() + + def as_bytes(self): + return self._message().as_bytes() + + def __str__(self): + return self.as_string() + + def __bytes__(self): + return self.as_bytes() + + def has_bad_headers(self): + """Checks for bad headers i.e. newlines in subject, sender or recipients. + RFC5322: Allows multiline CRLF with trailing whitespace (FWS) in headers + """ + + headers = [self.sender, self.reply_to] + self.recipients + for header in headers: + if _has_newline(header): + return True + + if self.subject: + if _has_newline(self.subject): + for linenum, line in enumerate(self.subject.split('\r\n')): + if not line: + return True + if linenum > 0 and line[0] not in '\t ': + return True + if _has_newline(line): + return True + if len(line.strip()) == 0: + return True + return False + + def is_bad_headers(self): + from warnings import warn + msg = 'is_bad_headers is deprecated, use the new has_bad_headers method instead.' + warn(DeprecationWarning(msg), stacklevel=1) + return self.has_bad_headers() + + def send(self, connection): + """Verifies and sends the message.""" + + connection.send(self) + + def add_recipient(self, recipient): + """Adds another recipient to the message. + + :param recipient: email address of recipient. + """ - def add_recipient(self, recipient: str): - assert isinstance(recipient, str) self.recipients.append(recipient) + def attach(self, + filename=None, + content_type=None, + data=None, + disposition=None, + headers=None): + """Adds an attachment to the message. + + :param filename: filename of attachment + :param content_type: file mimetype + :param data: the raw file data + :param disposition: content-disposition (if any) + """ + self.attachments.append( + Attachment(filename, content_type, data, disposition, headers)) + + +class Connection: + """Handles connection to host""" + + def __init__(self, mailer): + """ + configure new connection + + :param mailer: the application mail manager + """ + self.mailer = mailer -class _MailMixin: - pass + def __enter__(self): + if self.mailer.mail_suppress_send: + self.host = None + else: + self.host = self.configure_host() + self.num_emails = 0 -class _Mail: - pass + return self + def __exit__(self, exc_type, exc_value, tb): + if self.host: + self.host.quit() -class Mail: + def configure_host(self): + if self.mailer.mail_use_ssl: + host = smtplib.SMTP_SSL(self.mailer.mail_server, self.mailer.mail_port) + else: + host = smtplib.SMTP(self.mailer.mail_server, self.mailer.mail_port) + + host.set_debuglevel(int(self.mailer.mail_debug)) + + if self.mailer.mail_use_tls: + host.starttls() + if self.mailer.mail_user and self.mailer.mail_password: + host.login(self.mailer.mail_user, self.mailer.mail_password) + + return host + + def send(self, message, envelope_from=None): + """Verifies and sends message. + + :param message: Message instance. + :param envelope_from: Email address to be used in MAIL FROM command. + """ + assert message.send_to, "No recipients have been added" + + assert message.sender, ( + "The message does not specify a sender and a default sender " + "has not been configured") + + if message.has_bad_headers(): + raise BadHeaderError + + if message.date is None: + message.date = time.time() + + if self.host: + self.host.sendmail(sanitize_address(envelope_from or message.sender), + list(sanitize_addresses(message.send_to)), + message.as_bytes(), + message.mail_options, + message.rcpt_options) + + self.num_emails += 1 + + if self.num_emails == self.mailer.mail_max_emails: + self.num_emails = 0 + if self.host: + self.host.quit() + self.host = self.configure_host() + + def send_message(self, *args, **kwargs): + """Shortcut for send(msg). + + Takes same arguments as Message constructor. + """ + + self.send(Message(*args, **kwargs)) + + +class Mailer: """Manages email messaging""" - pass + + def __init__(self, settings: Settings): + """ + Configure a new Mailer + + Args: + settings: The application settings dictionary + """ + mail_config = settings.get('EMAIL') + self.mail_server = mail_config.get('MAIL_SERVER') + self.mail_user = mail_config.get('MAIL_USERNAME') + self.mail_password = mail_config.get('MAIL_PASSWORD') + self.mail_port = mail_config.get('MAIL_PORT', 25) + self.mail_use_tls = mail_config.get('MAIL_USE_TLS', False) + self.mail_use_ssl = mail_config.get('MAIL_USE_SSL', False) + self.mail_default_sender = mail_config.get('MAIL_DEFAULT_SENDER') + self.mail_debug = mail_config.get('MAIL_DEBUG', False) + self.mail_max_emails = mail_config.get('MAIL_MAX_EMAILS') + self.mail_suppress_send = mail_config.get('MAIL_SUPPRESS_SEND', False) + self.mail_ascii_attachments = mail_config.get('MAIL_ASCII_ATTACHMENTS', False) + + def send(self, message): + """ + Sends a single message instance. If TESTING is True the message will + not actually be sent. + + :param message: a Message instance. + """ + if message.sender is None: + message.sender = self.mail_default_sender + + with self.connect() as connection: + message.send(connection) + + def send_message(self, *args, **kwargs): + self.send(Message(*args, **kwargs)) + + def connect(self): + """Opens a connection to the mail host.""" + return Connection(self) diff --git a/tests/test_apistar_mail.py b/tests/test_apistar_mail.py index a124c67..a63cda5 100644 --- a/tests/test_apistar_mail.py +++ b/tests/test_apistar_mail.py @@ -1,21 +1,52 @@ -from apistar_mail.mail import Message +from apistar_mail.mail import Message, Mailer, force_text +from apistar_mail.exc import MailUnicodeDecodeError + +import pytest + +settings = { + 'EMAIL': { + 'MAIL_SERVER': 'smtp.example.com', + 'MAIL_USERNAME': 'fake@example.com', + 'MAIL_PASSWORD': 'secret', + 'MAIL_PORT': 587, + 'MAIL_USE_TLS': True, + 'MAIL_SUPPRESS_SEND': True, + 'MAIL_DEFAULT_SENDER': 'fake@example.com' + } +} def test_message_init(): msg = Message(subject="subject", - recipients=['fake@example.com']) + recipients=["fake@example.com"], + body="body") assert msg.subject == "subject" - assert msg.recipients == ['fake@example.com'] + assert msg.recipients == ["fake@example.com"] + assert msg.body == "body" def test_empty_recipient_list_init(): - msg1 = Message(subject="subject") - assert msg1.recipients == [] + msg = Message(subject="subject") + assert msg.recipients == [] def test_add_recipient(): msg1 = Message(subject="subject") assert msg1.recipients == [] - msg1.add_recipient('fake@example.com') + msg1.add_recipient("fake@example.com") assert len(msg1.recipients) == 1 - assert msg1.recipients[0] == 'fake@example.com' \ No newline at end of file + assert msg1.recipients[0] == "fake@example.com" + + +def test_empty_cc_list(): + msg = Message(subject="subject", + recipients=['fake@example.com'] + ) + assert msg.cc == [] + + +def test_raise_unicode_decode_error(): + value = b'\xe5\x93\x88\xe5\x93\x88' + with pytest.raises(MailUnicodeDecodeError) as excinfo: + force_text(value, encoding='ascii') + assert 'You passed in' in str(excinfo) \ No newline at end of file