Source code for Exscript.util.mail

#
# Copyright (C) 2010-2017 Samuel Abels
# The MIT License (MIT)
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
Sending and formatting emails.
"""
from builtins import str
from builtins import object
import os
import time
import re
import socket
import smtplib
import mimetypes
from getpass import getuser
from email import encoders
from email.mime.multipart import MIMEMultipart
from email.mime.audio import MIMEAudio
from email.mime.base import MIMEBase
from email.mime.image import MIMEImage
from email.mime.text import MIMEText
from Exscript.util.event import Event

#
# Helpers. (non-public)
#
_varname_re = re.compile(r'[a-z][\w_]*',     re.I)
_string_re = re.compile(r'(\\?){([\w_]+)}', re.I)


class _TemplateParser(object):

    """
    This exists for backward compatibility; Python 2.3 does not come
    with a similar way for string substitution yet.
    """

    def __init__(self):
        self.tmpl_vars = None

    # Tokens that include variables in a string may use this callback to
    # substitute the variable against its value.
    def _variable_sub_cb(self, match):
        escape = match.group(1)
        varname = match.group(2)
        if escape == '\\':
            return '$' + varname
        if not _varname_re.match(varname):
            raise Exception('%s is not a variable name' % varname)
        value = self.tmpl_vars.get(varname)
        if value is None:
            raise Exception('Undefined value for %s' % varname)
        elif hasattr(value, '__iter__') and not isinstance(value, str):
            value = '\n'.join([str(v) for v in value])
        return str(value)

    def parse(self, template, **kwargs):
        self.tmpl_vars = kwargs
        output = ''
        for line in template.split('\n'):
            if line.endswith(' '):
                output += line
            else:
                output += line + '\n'
        return _string_re.sub(self._variable_sub_cb, output)


def _render_template(string, **vars):
    default = {'date': time.strftime('%Y-%m-%d'),
               'user': getuser()}
    default.update(vars)
    parser = _TemplateParser()
    return parser.parse(string, **default)


def _is_header_line(line):
    return re.match(r'^\w+: .+$', line) is not None


def _get_var_from_header_line(line):
    match = re.match(r'^(\w+): (.+)$', line)
    return match.group(1).strip().lower(), match.group(2).strip()


def _cleanup_mail_addresses(recipients):
    if isinstance(recipients, list):
        recipients = ','.join(recipients)
    rcpt = re.split(r'\s*[,;\r\n]\s*', recipients.lower())
    return [str(r) for r in sorted(set(rcpt)) if r.strip()]

#
# Public.
#


[docs]class Mail(object): """ Represents an email. """
[docs] def __init__(self, sender=None, to='', cc='', bcc='', subject='', body=''): """ Creates a new email with the given values. If the given sender is None, one will be automatically chosen using getpass.getuser(). :type sender: string :param sender: The email address of the sender. :type to: string|list(string) :param to: A list of email addresses, passed to set_to(). :type cc: string|list(string) :param cc: A list of email addresses, passed to set_cc(). :type bcc: string|list(string) :param bcc: A list of email addresses, passed to set_bcc(). :type subject: string :param subject: A subject line, passed to set_subject(). :type body: string :param body: The email body, passed to set_body(). """ self.changed_event = Event() self.files = [] self.sender = None self.cc = None self.bcc = None self.to = None self.subject = None self.body = None if not sender: domain = socket.getfqdn('localhost') sender = getuser() + '@' + domain self.set_sender(sender) self.set_to(to) self.set_cc(cc) self.set_bcc(bcc) self.set_subject(subject) self.set_body(body)
[docs] def set_from_template_string(self, string): """ Reads the given template (SMTP formatted) and sets all fields accordingly. :type string: string :param string: The template. """ in_header = True body = '' for line in string.split('\n'): if not in_header: body += line + '\n' continue if not _is_header_line(line): body += line + '\n' in_header = False continue key, value = _get_var_from_header_line(line) if key == 'from': self.set_sender(value) elif key == 'to': self.add_to(value) elif key == 'cc': self.add_cc(value) elif key == 'bcc': self.add_bcc(value) elif key == 'subject': self.set_subject(value) else: raise Exception('Invalid header field "%s"' % key) self.set_body(body.strip())
[docs] def set_sender(self, sender): """ Defines the value of the "From:" field. :type sender: string :param sender: The email address of the sender. """ self.sender = sender self.changed_event()
[docs] def get_sender(self): """ Returns the value of the "From:" field. :rtype: string :return: The email address of the sender. """ return self.sender
[docs] def set_to(self, to): """ Replaces the current list of recipients in the 'to' field by the given value. The value may be one of the following: - A list of strings (email addresses). - A comma separated string containing one or more email addresses. :type to: string|list(string) :param to: The email addresses for the 'to' field. """ self.to = _cleanup_mail_addresses(to) self.changed_event()
[docs] def add_to(self, to): """ Adds the given list of recipients to the 'to' field. Accepts the same argument types as set_to(). :type to: string|list(string) :param to: The list of email addresses. """ self.to += _cleanup_mail_addresses(to) self.changed_event()
[docs] def get_to(self): """ Returns the value of the "to" field. :rtype: list(string) :return: The email addresses in the 'to' field. """ return self.to
[docs] def set_cc(self, cc): """ Like set_to(), but for the 'cc' field. :type cc: string|list(string) :param cc: The email addresses for the 'cc' field. """ self.cc = _cleanup_mail_addresses(cc) self.changed_event()
[docs] def add_cc(self, cc): """ Like add_to(), but for the 'cc' field. :type cc: string|list(string) :param cc: The list of email addresses. """ self.cc += _cleanup_mail_addresses(cc) self.changed_event()
[docs] def get_cc(self): """ Returns the value of the "cc" field. :rtype: list(string) :return: The email addresses in the 'cc' field. """ return self.cc
[docs] def set_bcc(self, bcc): """ Like set_to(), but for the 'bcc' field. :type bcc: string|list(string) :param bcc: The email addresses for the 'bcc' field. """ self.bcc = _cleanup_mail_addresses(bcc) self.changed_event()
[docs] def add_bcc(self, bcc): """ Like add_to(), but for the 'bcc' field. :type bcc: string|list(string) :param bcc: The list of email addresses. """ self.bcc += _cleanup_mail_addresses(bcc) self.changed_event()
[docs] def get_bcc(self): """ Returns the value of the "bcc" field. :rtype: list(string) :return: The email addresses in the 'bcc' field. """ return self.bcc
[docs] def get_recipients(self): """ Returns a list of all recipients (to, cc, and bcc). :rtype: list(string) :return: The email addresses of all recipients. """ return self.get_to() + self.get_cc() + self.get_bcc()
[docs] def set_subject(self, subject): """ Defines the subject line. :type subject: string :param subject: The new subject line. """ self.subject = subject self.changed_event()
[docs] def get_subject(self): """ Returns the subject line. :rtype: string :return: The subject line. """ return self.subject
[docs] def set_body(self, body): """ Defines the body of the mail. :type body: string :param body: The new email body. """ self.body = body self.changed_event()
[docs] def get_body(self): """ Returns the body of the mail. :rtype: string :return: The body of the mail. """ return self.body
[docs] def get_smtp_header(self): """ Returns the SMTP formatted header of the line. :rtype: string :return: The SMTP header. """ header = "From: %s\r\n" % self.get_sender() header += "To: %s\r\n" % ',\r\n '.join(self.get_to()) header += "Cc: %s\r\n" % ',\r\n '.join(self.get_cc()) header += "Bcc: %s\r\n" % ',\r\n '.join(self.get_bcc()) header += "Subject: %s\r\n" % self.get_subject() return header
[docs] def get_smtp_mail(self): """ Returns the SMTP formatted email, as it may be passed to sendmail. :rtype: string :return: The SMTP formatted mail. """ header = self.get_smtp_header() body = self.get_body().replace('\n', '\r\n') return header + '\r\n' + body + '\r\n'
[docs] def add_attachment(self, filename): """ Adds the file with the given name as an attachment. :type filename: string :param filename: A filename. """ self.files.append(filename)
[docs] def get_attachments(self): """ Returns a list of attached files. :rtype: list[string] :return: The list of filenames. """ return self.files
[docs]def from_template_string(string, **kwargs): """ Reads the given SMTP formatted template, and creates a new Mail object using the information. :type string: str :param string: The SMTP formatted template. :type kwargs: str :param kwargs: Variables to replace in the template. :rtype: Mail :return: The resulting mail. """ tmpl = _render_template(string, **kwargs) mail = Mail() mail.set_from_template_string(tmpl) return mail
[docs]def from_template(filename, **kwargs): """ Like from_template_string(), but reads the template from the file with the given name instead. :type filename: string :param filename: The name of the template file. :type kwargs: str :param kwargs: Variables to replace in the template. :rtype: Mail :return: The resulting mail. """ with open(filename) as fp: return from_template_string(fp.read(), **kwargs)
def _get_mime_object(filename): # Guess the content type based on the file's extension. Encoding # is ignored, although we should check for simple things like # gzip'd or compressed files. ctype, encoding = mimetypes.guess_type(filename) if ctype is None or encoding is not None: ctype = 'application/octet-stream' maintype, subtype = ctype.split('/', 1) if maintype == 'text': with open(filename) as fp: msg = MIMEText(fp.read(), _subtype=subtype) elif maintype == 'image': with open(filename, 'rb') as fp: msg = MIMEImage(fp.read(), _subtype=subtype) elif maintype == 'audio': with open(filename, 'rb') as fp: msg = MIMEAudio(fp.read(), _subtype=subtype) else: msg = MIMEBase(maintype, subtype) with open(filename, 'rb') as fp: msg.set_payload(fp.read()) encoders.encode_base64(msg) # Set the filename parameter msg.add_header('Content-Disposition', 'attachment', filename=os.path.basename(filename)) return msg
[docs]def send(mail, server='localhost'): """ Sends the given mail. :type mail: Mail :param mail: The mail object. :type server: string :param server: The address of the mailserver. """ sender = mail.get_sender() rcpt = mail.get_recipients() session = smtplib.SMTP(server) message = MIMEMultipart() message['Subject'] = mail.get_subject() message['From'] = mail.get_sender() message['To'] = ', '.join(mail.get_to()) message['Cc'] = ', '.join(mail.get_cc()) message.preamble = 'Your mail client is not MIME aware.' body = MIMEText(mail.get_body().encode("utf-8"), "plain", "utf-8") body.add_header('Content-Disposition', 'inline') message.attach(body) for filename in mail.get_attachments(): message.attach(_get_mime_object(filename)) session.sendmail(sender, rcpt, message.as_string())