tadashi-aikawa/owlmixin

View on GitHub
owlmixin/util.py

Summary

Maintainability
B
4 hrs
Test Coverage
# coding: utf-8

# The location of `import yaml` is not optimized!!
# pylint: disable=wrong-import-order,duplicate-code

import codecs
import csv
import io
import json
import re
from math import ceil, floor
from typing import Dict, Iterable, Iterator, List, Optional, Sequence, Union
from unicodedata import east_asian_width
from urllib.request import urlopen

import yaml


class CrLfCsvDialect(csv.Dialect):
    delimiter = ","
    quotechar = '"'
    doublequote = True
    skipinitialspace = True
    lineterminator = "\r\n"
    quoting = csv.QUOTE_MINIMAL


class CrLfTsvDialect(csv.Dialect):
    delimiter = "\t"
    quotechar = '"'
    doublequote = True
    skipinitialspace = True
    lineterminator = "\r\n"
    quoting = csv.QUOTE_MINIMAL


class LfCsvDialect(csv.Dialect):
    delimiter = ","
    quotechar = '"'
    doublequote = True
    skipinitialspace = True
    lineterminator = "\n"
    quoting = csv.QUOTE_MINIMAL


class LfTsvDialect(csv.Dialect):
    delimiter = "\t"
    quotechar = '"'
    doublequote = True
    skipinitialspace = True
    lineterminator = "\n"
    quoting = csv.QUOTE_MINIMAL


csv.register_dialect("crlf_csv", CrLfCsvDialect)
csv.register_dialect("crlf_tsv", CrLfTsvDialect)
csv.register_dialect("lf_csv", LfCsvDialect)
csv.register_dialect("lf_tsv", LfTsvDialect)


def get_dialect_name(crlf: bool, tsv: bool) -> str:
    line_break = "crlf" if crlf else "lf"
    file = "tsv" if tsv else "csv"
    return f"{line_break}_{file}"


class MyDumper(yaml.SafeDumper):
    # pylint: disable=too-many-ancestors
    def increase_indent(self, flow=False, indentless=False):
        return super(MyDumper, self).increase_indent(flow, False)


def construct_yaml_str(self, node):
    return self.construct_scalar(node)


yaml.SafeLoader.add_constructor("tag:yaml.org,2002:str", construct_yaml_str)


def replace_keys(d, keymap, force_snake_case):
    """
    :param dict d:
    :param Dict[unicode, unicode] keymap:
    :param bool force_snake_case:
    :rtype: Dict[unicode, unicode]
    """
    return {
        to_snake(keymap.get(k, k)) if force_snake_case else keymap.get(k, k): v
        for k, v in d.items()
    }


def to_snake(value):
    """For key of dictionary

    :param unicode value:
    :rtype: unicode
    """
    return (
        re.sub(r"((?<!^)[A-Z])", "_\\1", value.strip("<>-")).lower().replace("-", "_")
    )


def load_json(json_str):
    """
    :param unicode json_str:
    :rtype: dict | list
    """
    return json.loads(json_str)


def load_jsonf(fpath, encoding):
    """
    :param unicode fpath:
    :param unicode encoding:
    :rtype: dict | list
    """
    with codecs.open(fpath, encoding=encoding) as f:
        return json.load(f)


def load_yaml(yaml_str):
    """
    :param unicode yaml_str:
    :rtype: dict | list
    """
    return yaml.safe_load(yaml_str)


def load_yamlf(fpath, encoding):
    """
    :param unicode fpath:
    :param unicode encoding:
    :rtype: dict | list
    """
    with codecs.open(fpath, encoding=encoding) as f:
        return yaml.safe_load(f)


def load_csvf(
    fpath: str, fieldnames: Optional[Sequence[str]], encoding: str
) -> Iterator[dict]:
    """
    :param fpath:
    :param fieldnames:
    :param encoding:
    :return Iterator of dict:
    """
    with open(fpath, mode="r", encoding=encoding) as f:
        snippet = f.read(8192)
        f.seek(0)

        dialect = csv.Sniffer().sniff(snippet)
        dialect.skipinitialspace = True
        reader = csv.DictReader(f, fieldnames=fieldnames, dialect=dialect)  # type: ignore
        for d in reader:
            yield d


def load_json_url(url):
    """
    :param unicode url:
    :rtype: dict | list
    """
    return json.loads(urlopen(url).read())


def dump_csv(
    data: Iterable[dict],
    fieldnames: Sequence[str],
    *,
    with_header: bool = False,
    crlf: bool = False,
    tsv: bool = False,
) -> str:
    """
    :param data:
    :param fieldnames:
    :param with_header:
    :param crlf:
    :param tsv:
    :return: Csv string
    """

    def force_str(v):
        # XXX: Double quotation behaves strangely... so replace (why?)
        return dump_json(v).replace('"', "'") if isinstance(v, (dict, list)) else v

    with io.StringIO() as sio:
        dialect = get_dialect_name(crlf, tsv)
        writer = csv.DictWriter(
            sio, fieldnames=fieldnames, dialect=dialect, extrasaction="ignore"
        )
        if with_header:
            writer.writeheader()
        for x in data:
            writer.writerow({k: force_str(v) for k, v in x.items()})
        sio.seek(0)
        return sio.read()


def dump_csvf(
    data: Iterable[dict],
    fieldnames: Sequence[str],
    *,
    fpath: str,
    encoding: str,
    with_header: bool = False,
    crlf: bool = False,
    tsv: bool = False,
) -> str:
    """
    :param data:
    :param fieldnames:
    :param fpath: write path
    :param encoding: encoding
    :param with_header:
    :param crlf:
    :param tsv:
    :return: written path
    """

    def force_str(v):
        # XXX: Double quotation behaves strangely... so replace (why?)
        return dump_json(v).replace('"', "'") if isinstance(v, (dict, list)) else v

    with codecs.open(fpath, mode="w", encoding=encoding) as f:
        dialect = get_dialect_name(crlf, tsv)
        writer = csv.DictWriter(
            f, fieldnames=fieldnames, dialect=dialect, extrasaction="ignore"
        )
        if with_header:
            writer.writeheader()
        for x in data:
            writer.writerow({k: force_str(v) for k, v in x.items()})

    return fpath


def dump_json(data, indent=None):
    """
    :param list | dict data:
    :param Optional[int] indent:
    :rtype: unicode
    """
    return json.dumps(
        data, indent=indent, ensure_ascii=False, sort_keys=True, separators=(",", ": ")
    )


def dump_jsonf(
    data: Union[list, dict], *, fpath: str, encoding: str, indent=None
) -> str:
    """
    :param data: list | dict data
    :param fpath: write path
    :param encoding: encoding
    :param indent:
    :rtype: written path
    """
    with codecs.open(fpath, mode="w", encoding=encoding) as f:
        f.write(dump_json(data, indent))
        return fpath


def dump_yaml(data):
    """
    :param list | dict data:
    :rtype: unicode
    """
    return yaml.dump(
        data,
        indent=2,
        encoding=None,
        allow_unicode=True,
        default_flow_style=False,
        Dumper=MyDumper,
    )


def dump_yamlf(data: Union[list, dict], *, fpath: str, encoding: str) -> str:
    """
    :param data: list | dict data
    :param fpath: write path
    :param encoding: encoding
    :rtype: written path
    """
    with codecs.open(fpath, mode="w", encoding=encoding) as f:
        f.write(dump_yaml(data))
        return fpath


def dump_table(data: List[dict], fieldnames: Sequence[str]) -> str:
    """
    :param data:
    :param fieldnames:
    :return: Table string
    """

    def min3(num: int) -> int:
        return 3 if num < 4 else num

    width_by_col: Dict[str, int] = {
        f: min3(max([string_width(str(d.get(f))) for d in data] + [string_width(f)]))
        for f in fieldnames
    }

    def fill_spaces(word: str, width: int, center=False):
        """aaa, 4 => ' aaa  '"""
        to_fills: int = width - string_width(word)
        return (
            f" {' ' * floor(to_fills / 2)}{word}{' ' * ceil(to_fills / 2)} "
            if center
            else f" {word}{' ' * to_fills} "
        )

    def to_record(r: dict) -> str:
        return f"|{'|'.join([fill_spaces(str(r.get(f)), width_by_col[f]) for f in fieldnames])}|"

    lf = "\n"
    return f"""
|{'|'.join([fill_spaces(x, width_by_col[x], center=True) for x in fieldnames])}|
|{'|'.join([fill_spaces(width_by_col[x] * "-", width_by_col[x]) for x in fieldnames])}|
{lf.join([to_record(x) for x in data])}
""".lstrip()


def string_width(word: str) -> int:
    """
    :param word:
    :return: Widths of word

    Usage:

        >>> string_width('abc')
        3
        >>> string_width('Abしー')
        7
        >>> string_width('')
        0
    """
    return sum(map(lambda x: 2 if east_asian_width(x) in "FWA" else 1, word))