derwentx/Xero-Map-Generator

View on GitHub
xero_map_gen/contain.py

Summary

Maintainability
B
4 hrs
Test Coverage
#!/usr/bin/env python2
"""Container classes and utilities for containing data."""

import csv
import heapq
from copy import copy

import tabulate

from .helper import SanitationUtils
from .log import PKG_LOGGER

class XeroObjectGroup(object):
    @classmethod
    def dump_items_csv(cls, items, dump_path='items.csv', names=None, flatten_attr=None):
        try:
            with open(dump_path, 'w'):
                pass
        except IOError as exc:
            PKG_LOGGER.error("file " + dump_path + " could not be opened. Try closing the file. " + repr(exc))
            try:
                input("Press enter to continue")
            except SyntaxError:
                pass

        with open(dump_path, 'w') as dump_path:
            writer = csv.DictWriter(dump_path, names, extrasaction='ignore')
            writer.writeheader()
            for item in items:
                if flatten_attr:
                    item = getattr(item, flatten_attr)()
                elif hasattr(item, '_data'):
                    item = getattr(item, '_data')
                item = dict([
                    (
                        SanitationUtils.to_ascii(key, errors='ignore'),
                        SanitationUtils.to_ascii(value, errors='ignore')
                    ) for key, value in item.items()
                ])
                writer.writerow(item)

class XeroContactGroup(XeroObjectGroup):
    names_raw_csv = {
        'ContactID': 'ContactID',
        'ContactGroups': 'ContactGroups',
        'ContactNumber': 'ContactNumber',
        'ContactStatus': 'ContactStatus',
        'EmailAddress': 'EmailAddress',
        'Name': 'Name',
        'Address': 'Address',
        'Phone': 'Phone',
    }

    @classmethod
    def dump_contacts_raw_csv(cls, contacts, dump_path='contacts-raw.csv'):
        cls.dump_items_csv(
            contacts.flatten_raw, dump_path, cls.names_raw_csv, 'flatten_raw')

    @classmethod
    def dump_contacts_verbose_csv(cls, contacts, dump_path='contacts-verbose.csv'):
        names = copy(cls.names_raw_csv)
        for key in ['Address', 'Phone']:
            del names[key]
        names.update({
            'MAIN Address': 'MAIN Address',
            'POBOX Address': 'POBOX Address',
            'STREET Address': 'STREET Address',
            'DELIVERY Address': 'DELIVERY Address',
            'MAIN Phone': 'Main Phone',
            'DEFAULT Phone': 'DEFAULT Phone',
            'DDI Phone': 'DDI Phone',
            'MOBILE Phone': 'MOBILE Phone',
            'FAX Phone': 'FAX Phone',
        })
        cls.dump_items_csv(contacts, dump_path, names, 'flatten_verbose')

    names_sanitized_csv = {
        'Name': 'Company Name',
        'AddressLine': 'Address',
        'AddressArea': 'Area',
        'AddressPostcode': 'Postcode',
        'AddressState': 'State',
        'AddressCountry': 'Country',
        'Phone': 'Phone',
        'EmailAddress': 'Email',
    }

    @classmethod
    def dump_contacts_sanitized_csv(cls, contacts, dump_path='contacts-sanitized.csv'):
        names = copy(cls.names_sanitized_csv)
        cls.dump_items_csv(contacts, dump_path, names, 'flatten_sanitized')

    @classmethod
    def dump_contacts_sanitized_table(cls, contacts):
        return tabulate.tabulate(
            [contact.flatten_sanitized() for contact in contacts],
            headers='keys'
        )

class XeroObject(object):
    def _primary_property(
            self, properties, type_key, type_priority, fn_empty, default=None):
        """ Abstract main_address and main_phone. """
        if not properties:
            return default
        if len(properties) == 1:
            return properties[0]
        nonempty_properties = []
        for property_ in properties:
            type_ = property_.get(type_key)
            try:
                priority = type_priority.index(type_)
            except ValueError:
                priority = len(type_priority)
            if not fn_empty(property_):
                heapq.heappush(nonempty_properties, (priority, property_))
        if nonempty_properties:
            _, primary_property = heapq.heappop(nonempty_properties)
            return primary_property
        return default


class XeroContact(XeroObject):
    def __init__(self, data):
        self.data = data

    @property
    def data(self):
        return self._data

    @data.setter
    def data(self, value):
        self._data = value
        self._main_address = {}
        self._main_phone = {}

    address_type_priority = [
        'STREET', 'POBOX', 'DELIVERY'
    ]

    @property
    def main_address(self):
        if getattr(self, '_main_address', None):
            return self._main_address

        def address_empty(address):
            return not any([
                value for key, value in address.items() \
                if key.startswith("AddressLine")
            ])

        self._main_address = self._primary_property(
            self.data.get('Addresses', []), 'AddressType',
            self.address_type_priority, address_empty, default={}
        )
        return self._main_address

    phone_type_priority = [
        'DEFAULT', 'DDI', 'MOBILE'
    ]

    @property
    def main_phone(self):
        if getattr(self, '_main_phone', None):
            return self._main_phone

        def phone_empty(phone):
            return 'PhoneNumber' not in phone

        self._main_phone = self._primary_property(
            self.data.get('Phones', []), 'PhoneType',
            self.phone_type_priority, phone_empty, default={}
        )
        return self._main_phone


    @property
    def company_name(self):
        if 'Name' in self.data and self.data.get('Name') :
            return self.data['Name']

    @property
    def main_address_lines(self):
        main_address = self.main_address
        lines = []
        for line in range(1, 5):
            key = "AddressLine%d" % line
            if key not in main_address:
                continue
            line = main_address[key]
            if line:
                lines.append(line)
        return ", ".join(lines)



    @property
    def main_address_area(self):
        main_address = self.main_address
        return main_address.get('City')

    @property
    def main_address_postcode(self):
        main_address = self.main_address
        return main_address.get('PostalCode')

    @property
    def main_address_state(self):
        main_address = self.main_address
        return main_address.get('Region')

    @classmethod
    def convert_country_code(cls, country_code):
        # TODO: complete this
        if country_code == 'AU':
            return 'Australia'
        return country_code

    @property
    def main_address_country(self):
        main_address = self.main_address
        country_code = main_address.get('Country', '') or 'AU'
        return self.convert_country_code(country_code)

    @property
    def phone(self):
        main_phone = self.main_phone
        response = None
        if not (main_phone and main_phone.get('PhoneNumber')):
            return response
        response = main_phone['PhoneNumber']
        if not main_phone.get('PhoneAreaCode'):
            return response
        response = "%s %s" % (
            main_phone['PhoneAreaCode'],
            response
        )
        if not main_phone.get('PhoneCountryCode'):
            return response
        response = "(%s) %s" % (
            main_phone['PhoneCountryCode'],
            response
        )
        return response

    @property
    def archived(self):
        return self.data.get('ContactStatus') == 'ARCHIVED'

    @property
    def active(self):
        return self.data.get('ContactStatus') == 'ACTIVE'

    @property
    def contact_id(self):
        return self.data.get('ContactID')

    def flatten_raw(self):
        flattened = dict()
        for key in [
            'ContactID', 'ContactGroups', 'ContactNumber',
            'ContactStatus', 'EmailAddress', 'Name'
        ]:
            flattened[key] = self.data.get(key)
        for data_key, data_type, type_default in [
            ('Addresses', 'Address', 'POBOX'),
            ('Phones', 'Phone', 'DEFAULT'),
        ]:
            type_key = '%sType' % data_type
            for property_ in self.data[data_key]:
                property_ = copy(property_)
                property_type = property_.pop(type_key, type_default)
                flat_key = '%s %s' % (data_type, property_type)
                assert flat_key not in flattened, "duplicate key %s being added to flat" % flat_key
                flattened[flat_key] = property_

        return flattened

    def flatten_verbose(self):
        flattened = self.flatten_raw()
        for flat_key, attribute in [
            ('MAIN Address', 'main_address'),
            ('MAIN Phone', 'main_phone')
        ]:
            flattened[flat_key] = getattr(self, attribute)
        return flattened

    def flatten_sanitized(self):
        flattened = dict()
        for key in [
            'ContactID', 'EmailAddress', 'Name'
        ]:
            flattened[key] = self.data.get(key)

        for flat_key, attribute in [
            ('AddressLine', 'main_address_lines'),
            ('AddressArea', 'main_address_area'),
            ('AddressPostcode', 'main_address_postcode'),
            ('AddressState', 'main_address_state'),
            ('AddressCountry', 'main_address_country'),
            ('Phone', 'phone'),
        ]:
            flattened[flat_key] = getattr(self, attribute)
        return flattened