mattwalshdev/emailee

View on GitHub
emailee/emailee.py

Summary

Maintainability
D
2 days
Test Coverage
import enum
import mimetypes
import re
import smtplib
from email.mime.application import MIMEApplication
from email.mime.audio import MIMEAudio
from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formatdate
from pathlib import Path
from typing import BinaryIO, List, TextIO, Union


def _helperOutputFileCheck(filename: str) -> bool:
    try:
        if Path(filename).is_file():
            try:
                fileSize: int = Path(filename).stat().st_size
            except Exception as error:
                raise error
            if fileSize != 0:
                raise OSError(
                    "outputFile is not empty, please empty or delete it and try again"
                )
        else:
            try:
                Path(filename).touch()
            except Exception as error:
                raise PermissionError(f"No access to create the new file - {error}")
    except Exception as error:
        raise PermissionError(
            f"No access to file location to check if file exists - {error}"
        )
    return True


class _EncType(enum.Enum):
    """
    Enumerations of TLS and SSL, for encryption logic in Emailee
    """

    SSL = 1
    TLS = 2


class Emailee:
    """
    A robust SMTP email solution for Python.
    Zero external dependencies (unless testing).
    Allows sending via SSL or TLS, attachments, reply-to and to/cc/bcc.
    Emails are built with the below structure -

    multipart/mixed
        ├─── multipart/related
        │    └── multipart/alternative
        │        ├─── text/plain
        │        └─── text/html
        ├─── application/text - embedded attachment
        ├─── application/image - embedded attachment
        ├─── application/audio - embedded attachment
        └─── application/pdf etc. - other application attachment

    See https://github.com/mattwalshdev/emailee for additional documentation

    Methods
    -------
        sender(sender: str, replyTo: str) - email (required), email
        subject(subject: str) - text
        msgContent(msgText: str, msgHTML: str) - raw text, html text
        sendTo(to: List[str], cc: List[str], bcc: List[str], ignoreErrors: bool) -
            List of emails in each, one required. Set ignoreErrors True if you
            want it to still send emails if there is 1+ invalid emails listed
        attachmentFiles(attachmentFiles: List[str]) - List of full and/or
            relative paths to files
        server(smtpServer: str, port: int, SSLTLS: str, authUsername: str,
            authPassword: str, timeout: int) - servername (required),
            port no, encryption type, credentials,
            timeout for server connection timeout in seconds, default 30 seconds,
            increase if on slow connection
        ready() - will return True if all required fields are entered and valid,
            does not guarantee emails actually exist
        send(outputFile: str) - attempt to send the email, optional addition of an
            outputFile to save metadata to. OutputFile is mandatory when sending emails in either async mode

    Example
    -------
    import emailee

    my_email = emailee.Emailee()
    my_email.sender("john.smith@fakeemail.com")
    my_email.subject("Test email")
    my_email.msgContent("raw text, poorly formatted")
    my_email.sendTo(["jill.smith@fakeemail.com"])
    my_email.server("smtp.fakeemail.com")

    if my_email.ready():
        my_email.send()
    """

    _EMAIL_REGEX = re.compile(r"(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)")

    def __init__(self) -> None:
        self._sender: str = ""
        self._replyTo: str = ""
        self._subject: str = ""
        self._msgText: str = ""
        self._msgHTML: str = ""
        self._to: List[str] = []
        self._cc: List[str] = []
        self._bcc: List[str] = []
        self._attachmentFiles: List[str] = []
        self._smtpServer: str = ""
        self._port: int = 0
        self._SSLTLS: Union[_EncType, None] = None
        self._authUsername: str = ""
        self._authPassword: str = ""
        self._timeout: int = 30
        self._outputFile: str = ""
        self._outputFileReady: bool = False

    def __repr__(self) -> str:
        outputDict = {
            "sender": self._sender,
            "replyTo": self._replyTo,
            "subject": self._subject,
            "msgText": self._msgText,
            "msgHTML": self._msgHTML,
            "to": self._to,
            "cc": self._cc,
            "bcc": self._bcc,
            "attachmentFiles": self._attachmentFiles,
            "smtpServer": self._smtpServer,
            "port": self._port,
            "SSLTLS": self._SSLTLS,
        }
        return str(outputDict)

    def __str__(self) -> str:
        outputDict = {
            "sender": self._sender,
            "replyTo": self._replyTo,
            "subject": self._subject,
            "to": self._to,
            "cc": self._cc,
            "bcc": self._bcc,
        }
        return str(outputDict)

    def _addressCheck(
        self, emailList: List[str], type: str, ignoreErrors: bool
    ) -> List[str]:
        self._tempEmailList = []
        for email in emailList:
            if not isinstance(email, str):
                raise TypeError(f"{type} email not a string - {email}")

            if not Emailee._EMAIL_REGEX.fullmatch(email):
                if not ignoreErrors:
                    raise ValueError(f"Invalid {type} email - {email}")
                else:
                    continue
            if ignoreErrors:
                self._tempEmailList.append(email)
        if ignoreErrors:
            return self._tempEmailList
        return emailList

    def sender(self, sender: str, replyTo: str = "") -> None:
        if not isinstance(sender, str):
            raise TypeError("sender not a string")

        self._sender = self._addressCheck([sender], "sender", ignoreErrors=False)[0]

        if replyTo:
            if not isinstance(replyTo, str):
                raise TypeError("replyTo not a string")

            self._replyTo = self._addressCheck(
                [replyTo], "replyTo", ignoreErrors=False
            )[0]

    def subject(self, subject: str) -> None:
        if not isinstance(subject, str):
            raise TypeError("subject not a string")

        if len(subject) > 255:
            raise ValueError("subject string too long")
        self._subject = subject

    def msgContent(self, msgText: str = "", msgHTML: str = "") -> None:
        if not isinstance(msgText, str):
            raise TypeError("msgText not a string")

        if not isinstance(msgHTML, str):
            raise TypeError("msgHTML not a string")

        self._msgText = msgText
        self._msgHTML = msgHTML

    def sendTo(
        self,
        to: List[str] = [],
        cc: List[str] = [],
        bcc: List[str] = [],
        ignoreErrors: bool = False,
    ) -> None:
        if not to and not cc and not bcc:
            raise ValueError("No email addresses to send to are defined")

        if (
            not isinstance(to, list)
            or not isinstance(cc, list)
            or not isinstance(bcc, list)
        ):
            raise TypeError("sendTo items are not in list format")

        if not isinstance(ignoreErrors, bool):
            raise TypeError("sendTo ignoreErrors is not a bool")

        self._to = self._addressCheck(to, "to", ignoreErrors)
        self._cc = self._addressCheck(cc, "cc", ignoreErrors)
        self._bcc = self._addressCheck(bcc, "bcc", ignoreErrors)

        if not self._to and not self._cc and not self._bcc:
            raise ValueError("No email addresses to send to are properly defined")

    def attachmentFiles(self, attachmentFiles: List[str] = []) -> None:
        if not isinstance(attachmentFiles, list):
            raise TypeError("attachmentFiles not in list format")

        for file in attachmentFiles:
            if not isinstance(file, str):
                raise TypeError(f"file name listed not in string format - {file}")

            if not Path(file).is_file():
                raise ValueError(
                    f"Attachment listed is not a file or not accessible - {file}"
                )

        self._attachmentFiles = attachmentFiles

    def server(
        self,
        smtpServer: str,
        *,
        port: int = 0,
        SSLTLS: str = "",
        authUsername: str = "",
        authPassword: str = "",
        timeout: int = 30,
    ) -> None:
        if not isinstance(smtpServer, str):
            raise TypeError("smtpServer name not in string format")

        if not isinstance(port, int):
            raise TypeError("smtpServer port not in integer format")

        if not isinstance(SSLTLS, str):
            raise TypeError("smtpServer SSLTLS option not in string format")

        if not isinstance(authUsername, str):
            raise TypeError("smtpServer auth username not in string format")

        if not isinstance(authPassword, str):
            raise TypeError("smtpServer auth password not in string format")

        if not isinstance(timeout, int):
            raise TypeError("timeout server option not in integer format")

        if port < 0 or port > 65535:
            raise ValueError("Port is an invalid number")

        if SSLTLS not in _EncType.__members__ and SSLTLS != "":
            raise ValueError("SSLTLS option is invalid")

        if timeout <= 0:
            raise ValueError("Server timeout period is an invalid number")

        self._smtpServer = smtpServer

        if port > 0:
            self._port = port
        elif SSLTLS == "TLS":
            self._port = 587
        elif SSLTLS == "SSL":
            self._port = 465
        else:
            self._port = 25

        if SSLTLS:
            self._SSLTLS = _EncType[f"{SSLTLS}"]
        elif port == 587:
            self._SSLTLS = _EncType.TLS
        elif port == 465:
            self._SSLTLS = _EncType.SSL

        if authUsername and authPassword:
            self._authUsername = authUsername
            self._authPassword = authPassword
        elif authPassword:
            self._authUsername = self._sender
            self._authPassword = authPassword

        self._timeout = timeout

    def ready(self) -> bool:
        if self._authPassword and not self._authUsername and self._sender:
            # this may happen if server methods are invoked before
            #  the sender method AND the person has not added an
            #  authUsername BUT has added an authPassword
            self._authUsername = self._sender

        if (
            self._sender
            and (self._to or self._cc or self._bcc)
            and self._smtpServer
            and self._port
        ):
            # minimum requirements in place to send an email
            return True
        else:
            return False

    def _generate(self) -> str:
        message = MIMEMultipart("mixed")

        message["to"] = ", ".join(self._to) if self._to else ""

        message["cc"] = ", ".join(self._cc) if self._cc else ""

        message["from"] = self._sender
        message["subject"] = self._subject
        message["date"] = formatdate(localtime=True)

        if self._replyTo:
            message.add_header("reply-to", self._replyTo)

        messageRelated = MIMEMultipart("related")
        messageAlternative = MIMEMultipart("alternative")

        if self._msgText:
            messageAlternative.attach(MIMEText(self._msgText, "plain"))
        if self._msgHTML:
            messageAlternative.attach(MIMEText(self._msgHTML, "html"))

        messageRelated.attach(messageAlternative)

        message.attach(messageRelated)

        for attFile in self._attachmentFiles or []:
            contentType, encoding = mimetypes.guess_type(attFile)

            if contentType is None or encoding is not None:
                contentType = "application/octet-stream"
            mainType, subType = contentType.split("/", 1)

            attachment: Union[MIMEText, MIMEImage, MIMEAudio, MIMEApplication]
            fp: Union[TextIO, BinaryIO]

            if mainType == "text":
                with open(attFile, "r") as fp:
                    attachment = MIMEText(fp.read(), _subtype=subType)
            else:
                with open(attFile, "rb") as fp:
                    if mainType == "image":
                        attachment = MIMEImage(fp.read(), _subtype=subType)
                    elif mainType == "audio":
                        attachment = MIMEAudio(fp.read(), _subtype=subType)
                    else:
                        attachment = MIMEApplication(fp.read(), _subtype=subType)

            filename = Path(attFile).stem
            attachment.add_header(
                "Content-Disposition", "attachment; filename=" + filename
            )
            message.attach(attachment)

        return message.as_string()

    def send(self, outputFile: str = "") -> bool:
        if self._authPassword and not self._authUsername and self._sender:
            # this may happen if server methods are invoked before
            #  the sender method AND the person has not added an
            #  authUsername BUT has added an authPassword
            self._authUsername = self._sender

        if not isinstance(outputFile, str):
            raise TypeError("outputFile path not in string format")

        # run checks against the output file, unless it's already been run
        #  during this run, this is for async batch jobs of emails
        #  where send is called repeatedly
        if outputFile:
            if not self._outputFileReady:
                self._outputFileReady = _helperOutputFileCheck(outputFile)
            self._outputFile = outputFile

        try:
            generatedEmail: str = self._generate()
        except Exception as error:
            raise (error)

        try:
            smtp: object

            if self._SSLTLS == _EncType.SSL:
                try:
                    smtp = smtplib.SMTP_SSL(
                        self._smtpServer, self._port, timeout=self._timeout
                    )
                except Exception as error:
                    raise ValueError(error)
            else:
                try:
                    smtp = smtplib.SMTP(
                        self._smtpServer, self._port, timeout=self._timeout
                    )
                except Exception as error:
                    raise ValueError(error)

            smtp.ehlo()

            if self._SSLTLS == _EncType.TLS:
                smtp.starttls()
                smtp.ehlo()
        except Exception as error:
            raise error
        if self._authUsername and self._authPassword:
            try:
                smtp.login(self._authUsername, self._authPassword)
            except Exception as error:
                raise ValueError(error)

        try:
            smtp.sendmail(self._sender, self._to + self._cc + self._bcc, generatedEmail)
        except Exception as error:
            raise ValueError(error)

        try:
            smtp.close()
            if self._outputFile:
                with open(self._outputFile, "a") as file:
                    file.write(self.__str__() + "\n")
            return True
        except Exception as error:
            raise (error)