mattwalshdev/emailee

View on GitHub
emailee/emailee_async.py

Summary

Maintainability
B
6 hrs
Test Coverage
import multiprocessing
import threading
import time
from typing import Any, Dict, List, Union

from .emailee import Emailee, _helperOutputFileCheck


class _SendAsync:
    """
    No point having a great SMTP library without the ability to send asynchronous mail!
    You can roll your own using the Emailee class but all the hard work is already done here.
    Uses either Threaded or Multiprocessing libraries:
        Threaded - Create x amount of threads on same CPU core
        Multiprocessing - Create processes utilising all CPU cores
    See docstrings for AsyncThreads and AsyncMP on how to implement.

    Parameters
    -------
        mailList -
            [
                {
                    'sender': sender email,
                    'replyTo': reply to email, optional
                    'subject': email subject, optional
                    'msgText': raw text message, optional
                    'msgHTML': HTML formatted message, optional
                    'to': list of to emails, optional
                    'cc': list of cc emails, optional
                    'bcc': list of bcc emails, optional
                    'ignoreErrors': will continue to send the email, skipping invalid to/cc/bcc mail items, optional
                    'attachmentFiles': list of attachment file paths, optional
                },
                {...},
            ]
        serverDict -
            {
                'smtpServer': SMTP server,
                'port': port number, optional
                'SSLTLS': SSL or TLS encryption, optional
                'authUsername': authenticated username, optional
                'authPassword': authenticated password, optional
            }
        outputFile: str - empty or non-existant text file to write successful sent email metadata to.
            Similar to AsyncThreads/AsyncMP.emailReport(), but the outputFile is written to during the email
            processing, so if there is an exception which crashes the mail run you will have a list of what successfully
            sent. Helpful to check against mail send throttling issues, which you may not notice otherwise.
        waitTime: int or float - time in seconds between each mail item sent, useful if your sending too
             many emails, too quickly utilising services like gmail, who will stop sending emails if it looks like spam
    """

    def __init__(
        self,
        mailList: List[Dict[str, Any]],
        serverDict: Dict[str, Any],
        outputFile: str,
        waitTime: Union[int, float] = 0,
    ) -> None:
        if not isinstance(mailList, list):
            raise TypeError("Email items not valid type")

        if not isinstance(serverDict, dict):
            raise TypeError("Server items not valid type")

        if not isinstance(waitTime, int) and not isinstance(waitTime, float):
            raise TypeError("Email wait time not valid number")

        if waitTime < 0:
            raise ValueError("waitTime cannot be a negative number")

        self._mailList: List[Dict[str, str]] = mailList
        self._serverDict: Dict[str, str] = serverDict
        self._waitTime: Union[int, float] = waitTime
        self._outputFileReady: bool = False

        self.emailReport: List[Dict[str, Any]] = []

        self._processQueue: multiprocessing.Queue = multiprocessing.Queue()

        self._outputFileReady = _helperOutputFileCheck(outputFile)
        self._outputFile: str = outputFile

        self._outputFileReady

    def _runTest(self):
        # run initial mail item procedurally to check server works
        try:
            _sendMailFunc(
                self._mailList[0],
                self._serverDict,
                self._processQueue,
                self._outputFile,
                self._outputFileReady,
            )
            self.emailReport.append(self._processQueue.get())
        except Exception as error:
            raise ValueError(error)


def _sendMailFunc(
    mailItem: Dict,
    mailServer: Dict,
    processQueue: multiprocessing.Queue,
    outputFile: str,
    outputFileReady: bool,
) -> None:
    """
    Helper function that sends each asynchronous mail item
    when using AsyncThreads or AsyncMP

    Parameters
    -------
        mailItem - Individual mail dict from _SendAsync.self._mailList
        mailServer - Server settings in dict from _SendAsync.self._serverDict
        processQueue - Queue object that sending results are added to
        outputFile - Text file append successful sends metadata to
    """

    mailItem.setdefault("sender", "")
    mailItem.setdefault("replyTo", "")
    mailItem.setdefault("subject", "")
    mailItem.setdefault("msgText", "")
    mailItem.setdefault("msgHTML", "")
    mailItem.setdefault("to", [])
    mailItem.setdefault("cc", [])
    mailItem.setdefault("bcc", [])
    mailItem.setdefault("ignoreErrors", False)
    mailItem.setdefault("attachmentFiles", [])

    mailServer.setdefault("smtpServer", [])
    mailServer.setdefault("port", 0)
    mailServer.setdefault("SSLTLS", None)
    mailServer.setdefault("authUsername", "")
    mailServer.setdefault("authPassword", "")
    mailServer.setdefault("timeout", 30)

    mail = Emailee()
    # override the class attribute as we've already checked the output
    #  file is valid for all async email sends
    mail._outputFileReady = outputFileReady
    mail.sender(sender=mailItem["sender"], replyTo=mailItem["replyTo"])
    mail.subject(mailItem["subject"])
    mail.msgContent(msgText=mailItem["msgText"], msgHTML=mailItem["msgHTML"])
    mail.sendTo(
        to=mailItem["to"],
        cc=mailItem["cc"],
        bcc=mailItem["bcc"],
        ignoreErrors=mailItem["ignoreErrors"],
    )
    mail.attachmentFiles(mailItem["attachmentFiles"])

    mail.server(
        smtpServer=mailServer["smtpServer"],
        port=mailServer["port"],
        SSLTLS=mailServer["SSLTLS"],
        authUsername=mailServer["authUsername"],
        authPassword=mailServer["authPassword"],
        timeout=mailServer["timeout"],
    )
    try:
        if mail.ready():
            mail.send(outputFile)
            processQueue.put(
                {
                    "sender": mailItem["sender"],
                    "replyTo": mailItem["replyTo"],
                    "subject": mailItem["subject"],
                    "to": mailItem["to"],
                    "cc": mailItem["cc"],
                    "bcc": mailItem["bcc"],
                }
            )
    except Exception as error:
        raise ValueError(error)


class _myThread(threading.Thread):
    """
    Class called by AsyncThreads to generate a new thread

    Parameters
    -------
        mailItem - Individual mail dict from _SendAsync.self._mailList
        serverDict - Server settings in dict from _SendAsync.self._serverDict
        poolSema - Pool BoundedSemaphore for threading
        processQueue - Queue object that sending results are added to
    """

    def __init__(
        self,
        mailItem: Dict,
        serverDict: Dict,
        poolSema: threading.BoundedSemaphore,
        processQueue: multiprocessing.Queue,
        outputFile: str,
        outputFileReady: bool,
    ) -> None:
        threading.Thread.__init__(self)

        self._mailItem: Dict = mailItem
        self._serverDict: Dict = serverDict
        self._poolSema = poolSema
        self._processQueue = processQueue
        self._outputFile: str = outputFile
        self._outputFileReady: bool = outputFileReady

    def run(self):
        self._poolSema.acquire()
        try:
            _sendMailFunc(
                self._mailItem,
                self._serverDict,
                self._processQueue,
                self._outputFile,
                self._outputFileReady,
            )
        except Exception as error:
            raise error
        finally:
            self._poolSema.release()


class AsyncThreads(_SendAsync):
    """
    Class to call to send emails asyncronously using threads.
    Threads are bound to the CPU that the core program sits on
    but can be rate limited if you don't want to max out your hardware.
    AsyncThreads.emailReport will return a list of all sent emails, useful for logging

    See https://github.com/mattwalshdev/emailee for additional documentation

    Parameters
    -------
        mailList - List of all mail items, sent to _SendAsync
        serverDict - Server settings in dict, sent to _SendAsync
        maxThreads - Maximum number of concurrent threads allowed, optional, default 10
        outputFile - Empty text file to store successfuly sends, sent to _SendAsync
        waitTime - Time in seconds before each email item, sent to _SendAsync

    Example
    -------
    import emailee

    emails = [{...}]
    server = {...}

    emails = emailee.AsyncThreads(emails, server, outputFile='output.txt', waitTime=0.250)
    print(emails.emailReport)
    """

    def __init__(
        self,
        mailList: List[Dict[str, Any]],
        serverDict: Dict[str, Any],
        outputFile: str,
        maxThreads: int = 10,
        waitTime: Union[int, float] = 0,
    ) -> None:
        _SendAsync.__init__(self, mailList, serverDict, outputFile, waitTime)

        if not isinstance(maxThreads, int):
            raise TypeError("maxThread is not valid int")

        if maxThreads <= 0:
            raise ValueError("maxThreads must be a number greater than 0")
        self._maxThreads = maxThreads

        self._active: List = []

        self.run()

    def run(self) -> List:
        self._runTest()
        if len(self._mailList) == 1:
            return self.emailReport

        self._poolSema = threading.BoundedSemaphore(value=self._maxThreads)

        for mailItem in self._mailList[1:]:
            newThread = _myThread(
                mailItem,
                self._serverDict,
                self._poolSema,
                self._processQueue,
                self._outputFile,
                self._outputFileReady,
            )
            newThread.start()
            self._active.append(newThread)
            time.sleep(self._waitTime)

        # wait for all items to complete before exiting
        for jobs in self._active:
            jobs.join()

        while self._processQueue.qsize():
            self.emailReport.append(self._processQueue.get())

        return self.emailReport


class AsyncMP(_SendAsync):
    """
    Class to call to send emails asyncronously using multiprocessing.
    Processes are able to utilise all CPU cores, but take up more memory
    due to isolated binaries copied for each process. Depending on work load
    you may utilise 100% of your CPU cycles running this.
    AsyncMP.emailReport will return a list of all sent emails, useful for logging

    See https://github.com/mattwalshdev/emailee for additional documentation

    Parameters
    -------
        mailList - List of all mail items, sent to _SendAsync
        serverDict - Server settings in dict, sent to _SendAsync
        outputFile - Empty text file to store successfuly sends, sent to _SendAsync
        waitTime - Time in seconds before each email item, sent to _SendAsync

    Example
    -------
    import emailee

    emails = [{...}]
    server = {...}

    emails = emailee.AsyncMP(emails, server, outputFile='output.txt', waitTime=0.250)
    print(emails.emailReport)
    """

    def __init__(
        self,
        mailList: List[Dict[str, Any]],
        serverDict: Dict[str, Any],
        outputFile: str,
        waitTime: Union[int, float] = 0,
    ) -> None:
        _SendAsync.__init__(self, mailList, serverDict, outputFile, waitTime)

        self._active: List = []

        self.run()

    def run(self) -> List:
        self._runTest()
        if len(self._mailList) == 1:
            return self.emailReport

        for mailItem in self._mailList[1:]:
            newProcess = multiprocessing.Process(
                target=_sendMailFunc,
                args=(
                    mailItem,
                    self._serverDict,
                    self._processQueue,
                    self._outputFile,
                    self._outputFileReady,
                ),
            )
            newProcess.start()
            self._active.append(newProcess)
            time.sleep(self._waitTime)

        # wait for all items to complete before exiting
        for jobs in self._active:
            jobs.join()

        while not self._processQueue.empty():
            self.emailReport.append(self._processQueue.get())

        return self.emailReport