kalefranz/auxlib

View on GitHub
auxlib/logz.py

Summary

Maintainability
C
1 day
Test Coverage
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function, unicode_literals
from json import JSONEncoder
from logging import getLogger, INFO, Formatter, StreamHandler, DEBUG
from pprint import pformat
from sys import stderr

from . import NullHandler
from .compat import text_type

log = getLogger(__name__)
root_log = getLogger()

NullHandler = NullHandler

DEBUG_FORMATTER = Formatter(
    "[%(levelname)s] [%(asctime)s.%(msecs)03d] %(process)d %(name)s:%(funcName)s(%(lineno)d):\n"
    "%(message)s\n",
    "%Y-%m-%d %H:%M:%S")

INFO_FORMATTER = Formatter(
    "[%(levelname)s] [%(asctime)s.%(msecs)03d] %(process)d %(name)s(%(lineno)d): %(message)s\n",
    "%Y-%m-%d %H:%M:%S")


def set_root_level(level=INFO):
    root_log.setLevel(level)


def attach_stderr(level=INFO):
    has_stderr_handler = any(handler.name == 'stderr' for handler in root_log.handlers)
    if not has_stderr_handler:
        handler = StreamHandler(stderr)
        handler.name = 'stderr'
        if level is not None:
            handler.setLevel(level)
        handler.setFormatter(DEBUG_FORMATTER if level == DEBUG else INFO_FORMATTER)
        root_log.addHandler(handler)
        return True
    else:
        return False


def detach_stderr():
    for handler in root_log.handlers:
        if handler.name == 'stderr':
            root_log.removeHandler(handler)
            return True
    return False


def initialize_logging(level=INFO):
    attach_stderr(level)


class DumpEncoder(JSONEncoder):
    def default(self, obj):
        if hasattr(obj, 'dump'):
            return obj.dump()
        # Let the base class default method raise the TypeError
        return super(DumpEncoder, self).default(obj)
_DUMPS = DumpEncoder(indent=2, ensure_ascii=False, sort_keys=True).encode  # NOQA


def jsondumps(obj):
    return _DUMPS(obj)


def fullname(obj):
    return obj.__module__ + "." + obj.__class__.__name__


request_header_sort_dict = {
    'Host': '\x00\x00',
    'User-Agent': '\x00\x01',
}
def request_header_sort_key(item):  # NOQA
    return request_header_sort_dict.get(item[0], item[0].lower())


response_header_sort_dict = {
    'Content-Length': '\x7e\x7e\x61',
    'Connection': '\x7e\x7e\x62',
}
def response_header_sort_key(item):  # NOQA
    return response_header_sort_dict.get(item[0], item[0].lower())


def stringify(obj, content_max_len=0):
    def bottle_builder(builder, bottle_object):
        builder.append("{0} {1}{2} {3}".format(bottle_object.method,
                                               bottle_object.path,
                                               bottle_object.environ.get('QUERY_STRING', ''),
                                               bottle_object.get('SERVER_PROTOCOL')))
        builder += ["{0}: {1}".format(key, value) for key, value in bottle_object.headers.items()]
        builder.append('')
        body = bottle_object.body.read().strip()
        if body:
            builder.append(body)

    def requests_models_PreparedRequest_builder(builder, request_object):
        builder.append(">>{0} {1} {2}".format(request_object.method, request_object.path_url,
                                              request_object.url.split(':', 1)[0].upper()))
        builder.extend("> {0}: {1}".format(key, value)
                       for key, value in sorted(request_object.headers.items(),
                                                key=request_header_sort_key))
        builder.append('')
        if request_object.body:
            builder.append(request_object.body)

    def requests_models_Response_builder(builder, response_object):
        builder.append("<<{0} {1} {2}".format(response_object.url.split(':', 1)[0].upper(),
                                              response_object.status_code, response_object.reason))
        builder.extend("< {0}: {1}".format(key, value)
                       for key, value in sorted(response_object.headers.items(),
                                                key=response_header_sort_key))
        elapsed = text_type(response_object.elapsed).split(':', 1)[-1]
        builder.append('< Elapsed: {0}'.format(elapsed))
        if content_max_len:
            builder.append('')
            content_type = response_object.headers.get('Content-Type')
            if content_type == 'application/json':
                content = pformat(response_object.json(), indent=2)
                content = content[:content_max_len] if len(content) > content_max_len else content
                builder.append(content)
                builder.append('')
            elif content_type is not None and (content_type.startswith('text/')
                                               or content_type == 'application/xml'):
                text = response_object.text
                content = text[:content_max_len] if len(text) > content_max_len else text
                builder.append(content)

    try:
        name = fullname(obj)
        builder = ['']  # start with new line
        if name.startswith('bottle.'):
            bottle_builder(builder, obj)
        elif name.endswith('requests.models.PreparedRequest'):
            requests_models_PreparedRequest_builder(builder, obj)
        elif name.endswith('requests.models.Response'):
            if getattr(obj, 'request'):
                requests_models_PreparedRequest_builder(builder, obj.request)
            else:
                log.info("request is 'None' for Response object with url %s", obj.url)
            requests_models_Response_builder(builder, obj)
        else:
            return None
        builder.append('')  # end with new line
        return "\n".join(builder)
    except Exception as e:
        log.exception(e)