rayepps/dynamof

View on GitHub
dynofunc/core/exceptions.py

Summary

Maintainability
A
30 mins
Test Coverage

class DynofuncException(Exception):
   def __init__(self, message):
       self.message = message
       super().__init__(message)
   def info(self, add_message):
       """ Adds details to the base error message"""
       self.message = f'{self.message}; {add_message}'
       return self

class PreexistingTableException(DynofuncException):
    def __init__(self):
        message = "Attempted to create a table that already exists"
        super().__init__(message)
    @classmethod
    def matches(cls, err):
        message, _ = parse(err)
        key = 'Cannot create preexisting table'
        return message == key

class TableDoesNotExistException(DynofuncException):
    def __init__(self):
        message = "Attempted to do operation on a table that does not exist"
        super().__init__(message)
    @classmethod
    def matches(cls, err):
        message, _ = parse(err)
        key = 'Cannot do operations on a non-existent table'
        return message == key

class ConditionNotMetException(DynofuncException):
    def __init__(self):
        message = "Could not find an item that satisifed the given conditions"
        super().__init__(message)
    @classmethod
    def matches(cls, err):
        _, code = parse(err)
        return code == 'ConditionalCheckFailedException'

class BadGatewayException(DynofuncException):
    def __init__(self):
        message = "Issue communicating with dynamo"
        super().__init__(message)
    @classmethod
    def matches(cls, err):
        message, _ = parse(err)
        return message == 'Bad Gateway'

class UnknownDatabaseException(DynofuncException):
    def __init__(self, unknown_boto_client_err):
        message, code = parse(unknown_boto_client_err)
        message = f'An unkonwn exception occured when executing request to dynamo: {message} - {code}'
        super().__init__(message)

def factory(boto_client_err):
    if BadGatewayException.matches(boto_client_err):
        return BadGatewayException()
    if TableDoesNotExistException.matches(boto_client_err):
        return TableDoesNotExistException()
    if ConditionNotMetException.matches(boto_client_err):
        return ConditionNotMetException()
    if PreexistingTableException.matches(boto_client_err):
        return PreexistingTableException()
    return UnknownDatabaseException(boto_client_err)

def parse(exc):
    """Takes in an exception and returns the boto
    `Message, Code` properties if they exist - else `None, None`
    """
    if exc is not None and hasattr(exc, 'response') and exc.response is not None:
      error = exc.response.get('Error', {})
      code = error.get('Code', None)
      message = error.get('Message', None)
      return message, code
    return None, None