martin-majlis/MailGun-V3

View on GitHub
mailgunv3/__init__.py

Summary

Maintainability
F
6 days
Test Coverage
from __future__ import unicode_literals

import base64
import enum
import hashlib
import hmac
import json
import random
import requests

#from mailgun3 import Mailgun
BASE_URL = 'https://api.mailgun.net/v3'


class MailGunV3(object):
    """
    This is the only class that should be directly initialized.
    """

    def __init__(self, domain, private_key, public_key):
        self.domain = domain
        self.private_key = private_key
        self.public_key = public_key
        self.auth = ('api', private_key)

    def _post(self, url, data, auth=None, files=None, include_domain=True):
        return requests.post(url, auth=auth or self.auth, data=data, files=files)

    def _get(self, url, params=None, auth=None, include_domain=True):
        return requests.get(url, auth=auth or self.auth, params=params)

    def _put(self, url, params=None, auth=None, include_domain=True):
        return requests.put(url, auth=auth or self.auth, params=params)

    def _delete(self, url, params=None, auth=None, include_domain=True):
        return requests.delete(url, auth=auth or self.auth, params=params)

    def mailinglist(self, email):
        return MailingList(self, email)

    def message(self,
                from_email,
                to,
                cc=None,
                bcc=None,
                subject=None,
                text=None,
                html=None,
                user_variables=None,
                reply_to=None,
                headers=None,
                inlines=None,
                attachments=None,
                campaign_id=None,
                tags=None):
        return MailMessage(
            self,
            from_email,
            to,
            cc=cc,
            bcc=bcc,
            subject=subject,
            text=text,
            html=html,
            user_variables=user_variables,
            reply_to=reply_to,
            headers=headers,
            inlines=inlines,
            attachments=attachments,
            campaign_id=campaign_id,
            tags=tags
        )


class APIResponse(object):
    def __init__(self):
        self.status_code = 200
        self.status_msg = 'OK'
        self.text = ''
        self.json = ''

    def _set_error(self, status_code, status_msg):
        self.status_code = status_code
        self.status_msg = status_msg

    def _has_error(self):
        return self.status_code != 200

    def __repr__(self):
        return repr({
            'status_code': self.status_code,
            'status_msg': self.status_msg,
            'text': self.text,
            'json': self.json,
        })

    def _copy_state(self, api_response):
        self.status_code = api_response.status_code
        self.status_msg = api_response.status_msg
        self.text = api_response.text
        self.json = api_response.json


class MailMessage(APIResponse):
    def __init__(self,
                 mailgun,
                 from_email,
                 to,
                 cc=None,
                 bcc=None,
                 subject=None,
                 text=None,
                 html=None,
                 user_variables=None,
                 reply_to=None,
                 headers=None,
                 inlines=None,
                 attachments=None,
                 campaign_id=None,
                 tags=None):
        super().__init__()
        self.mailgun = mailgun
        self.from_email = from_email
        self.to = to
        self.cc = cc or []
        self.bcc = bcc or []
        self.subject = subject or ''
        self.text = text
        self.html = html
        self.user_variables = user_variables
        self.reply_to = reply_to
        self.headers = headers
        self.inlines = inlines
        self.attachments = attachments
        self.campaign_id = campaign_id
        self.tags = tags

    def send(self):
                # sanity checks
        assert (self.text or self.html)

        data = {
            'from': self.from_email,
            'to': self.to,
            'cc': self.cc,
            'bcc': self.bcc,
            'subject': self.subject,
            'text': self.text,
            'html': self.html,
        }

        if self.reply_to:
            data['h:Reply-To'] = self.reply_to

        if self.headers:
            for k, v in self.headers.items():
                data["h:%s" % k] = v

        if self.campaign_id:
            data['o:campaign'] = self.campaign_id

        if self.tags:
            data['o:tag'] = self.tags

        if self.user_variables:
            for k, v in self.user_variables.items():
                data['v:%s' % k] = v

        files = []

        if self.inlines:
            for filename in self.inlines:
                files.append(('inline', open(filename)))

        if self.attachments:
            for filename, content_type, content in self.attachments:
                files.append(('attachment', (filename, content, content_type)))

        res = self.mailgun._post(
            BASE_URL + '/' + self.mailgun.domain + '/messages',
            data,
            files=files
        )
        self.text = res.text
        self.json = res.json
        if (res.status_code != 200):
            self._set_error(
                res.status_code,
                'Error during sending e-mail to {}.'.format(self.to),
            )
        return self

    def get(self):
        if self._has_error():
            return self

        res = self.mailgun._get(BASE_URL + '/lists/' +
                                self.address)
        self.text = res.text
        self.json = res.json
        if (res.status_code != 200):
            self._set_error(
                res.status_code,
                'Error during mailing list {} retrieval.'.format(self.address),
            )
        return self


# https://documentation.mailgun.com/api-mailinglists.html#mailing-lists


class MailingList(APIResponse):
    def __init__(self, mailgun, address):
        super().__init__()
        self.mailgun = mailgun
        self.address = address

    def get(self):
        if self._has_error():
            return self

        res = self.mailgun._get(BASE_URL + '/lists/' +
                                self.address)
        self.text = res.text
        self.json = res.json
        if (res.status_code != 200):
            self._set_error(
                res.status_code,
                'Error during mailing list {} retrieval.'.format(self.address),
            )
        return self

    def delete(self):
        if self._has_error():
            return self

        res = self.mailgun._delete(BASE_URL + '/lists/' +
                                   self.address)

        self.text = res.text
        self.json = res.json
        if (res.status_code != 200):
            self._set_error(
                res.status_code,
                'Error during mailing list {} deletion.'.format(self.address),
            )
        return self

    def create(self, name=None, description=None, access_level=None):
        if self._has_error():
            return self

        data = {'address': self.address}
        if name:
            data['name'] = name

        if description:
            data['description'] = description

        if access_level and access_level in ['readonly', 'members', 'everyone']:
            data['access_level'] = access_level

        res = self.mailgun._post(BASE_URL + '/lists', data)
        self.text = res.text
        self.json = res.json
        if (res.status_code != 200):
            self._set_error(
                res.status_code,
                'Error during mailing list {} creation.'.format(self.address),
            )
        return self

    def update(self, name=None, description=None, access_level=None):
        if self._has_error():
            return self

        data = {'address': self.address}
        if name:
            data['name'] = name

        if description:
            data['description'] = description

        if access_level and access_level in ['readonly', 'members', 'everyone']:
            data['access_level'] = access_level

        res = self.mailgun._put(BASE_URL + '/lists/' + self.address, data)
        self.text = res.text
        self.json = res.json
        if (res.status_code != 200):
            self._set_error(
                res.status_code,
                'Error during mailing list {} update.'.format(self.address),
            )
        return self

    def members(self):
        if self._has_error():
            return self

        res = self.mailgun._get(BASE_URL + '/lists/' +
                                self.address + '/members/pages')
        self.text = res.text
        self.json = res.json
        if (res.status_code != 200):
            self._set_error(
                res.status_code,
                'Error during mailing list {} members.'.format(self.address),
            )
        return self

    def member(self, address):
        return MailingListMember(self.mailgun, self, address)


class MailingListMember(APIResponse):
    def __init__(self, mailgun, mailinglist, address):
        super().__init__()
        self.mailgun = mailgun
        self.mailinglist = mailinglist
        self.address = address
        self._copy_state(mailinglist)

    def create(self, name=None, params=None, subscribed=True, upsert=False):
        if self._has_error():
            return self

        data = {'address': self.address}
        if name:
            data['name'] = name

        if params:
            data['vars'] = json.dumps(params) if isinstance(
                params, dict) else params

        if not subscribed:
            data['subscribed'] = 'no'

        if upsert:
            data['upsert'] = 'yes'

        res = self.mailgun._post(BASE_URL + '/lists/' +
                                 self.mailinglist.address + '/members', data)
        self.text = res.text
        self.json = res.json
        if (res.status_code != 200):
            self._set_error(
                res.status_code,
                'Error during adding member {} into mailing list.'.format(
                    self.address,
                    self.mailinglist.address,
                ),
            )
        return self

    def update(self, name=None, params=None, subscribed=True):
        if self._has_error():
            return self

        data = {}
        if name:
            data['name'] = name

        if params:
            data['vars'] = json.dumps(params) if isinstance(
                params, dict) else params

        if not subscribed:
            data['subscribed'] = 'no'

        res = self.mailgun._put(BASE_URL + '/lists/' +
                                self.mailinglist.address + '/members/' + self.address, data)
        self.text = res.text
        self.json = res.json
        if (res.status_code != 200):
            self._set_error(
                res.status_code,
                'Error during updating member {} from mailing list.'.format(
                    self.address,
                    self.mailinglist.address,
                ),
            )
        return self

    def get(self):
        if self._has_error():
            return self

        res = self.mailgun._get(BASE_URL + '/lists/' +
                                self.mailinglist.address + '/members/' + self.address)
        self.text = res.text
        self.json = res.json
        if (res.status_code != 200):
            self._set_error(
                res.status_code,
                'Error during retrieving member {} from mailing list.'.format(
                    self.address,
                    self.mailinglist.address,
                ),
            )
        return self

    def delete(self):
        if self._has_error():
            return self

        res = self.mailgun._delete(BASE_URL + '/lists/' +
                                   self.mailinglist.address + '/members/' + self.address)
        self.text = res.text
        self.json = res.json
        if (res.status_code != 200):
            self._set_error(
                res.status_code,
                'Error during deleting member {} from mailing list.'.format(
                    self.address,
                    self.mailinglist.address,
                ),
            )
        return self