portfoliome/foil

View on GitHub
foil/fileio.py

Summary

Maintainability
A
3 hrs
Test Coverage
"""
tools.fileio.py contains helper utilities for file reading and writing.
"""

import csv
from collections import namedtuple
from contextlib import contextmanager
from io import BufferedReader
from zipfile import ZipFile

from foil.filters import create_indexer


class TextReader:
    """Reads text file.

    Parameters
    ----------
    path : Absolute path to text file.
    encoding : File encoding.
    """

    def __init__(self, path: str, encoding: str):
        self.path = path
        self.encoding = encoding

    def __iter__(self):
        with open(self.path, 'r', encoding=self.encoding) as f:
            for line in f:
                yield line.strip('\r\n')


class DelimitedReader:
    """Read delimited text stream into namedtuple Records.

    Attributes
    ----------
    stream: stream of text.
    dialect: delimited file attributes.
    fields: Record field names.
    converters: casting functions to cast fields to Python objects.
      Utilize tools.parsers.make_converters for the general use case.

    Factory Methods
    ---------------
    See factory methods for alternative constructors.
    """

    def __init__(self, stream,
                 dialect: csv.Dialect, fields: list, converters: list):
        reader = csv.reader(stream, dialect=dialect)
        self.header = next(reader)
        self.reader = reader
        self.converters = converters
        self.Record = namedtuple('Record', fields)

    def __iter__(self):
        return self

    def __next__(self):
        Record = self.Record
        record = Record._make(type_converter(item) for type_converter, item
                              in zip(self.converters, next(self.reader)))
        return record

    @property
    def file_line_number(self):
        return self.reader.line_num

    @classmethod
    def from_file(cls, path, encoding, dialect, fields, converters):
        """Read delimited text from a text file."""

        return cls(open(path, 'r', encoding=encoding), dialect, fields, converters)

    @classmethod
    def from_zipfile(cls, path, filename, encoding, dialect, fields, converters):
        """Read delimited text from zipfile."""

        stream = ZipReader(path, filename).readlines(encoding)
        return cls(stream, dialect, fields, converters)

    @staticmethod
    def discover_headers(stream, dialect):
        from foil.parsers import parse_quoted_string

        headers = DelimitedReader(stream, dialect=dialect,
                                  fields=[], converters=[]).header

        return (parse_quoted_string(field) for field in headers)

    @staticmethod
    def file_headers(path, encoding, dialect):
        stream = open(path, 'r', encoding=encoding)

        return DelimitedReader.discover_headers(stream, dialect=dialect)

    @staticmethod
    def zipfile_headers(path, filename, encoding, dialect):
        stream = ZipReader(path, filename).readlines(encoding)

        return DelimitedReader.discover_headers(stream, dialect)


class DelimitedSubsetReader(DelimitedReader):
    """Read delimited text into namedtuple Records ignoring certain fields."""

    def __init__(self, stream, dialect: csv.Dialect, fields: list,
                 converters: list, field_index: list):
        super().__init__(stream, dialect, fields, converters)

        self.indexer = create_indexer(field_index)

    def __next__(self):
        indexer = self.indexer
        Record = self.Record

        row = indexer(next(self.reader))
        record = Record._make(type_converter(item) for type_converter, item
                              in zip(self.converters, row))

        return record

    @classmethod
    def from_file(cls, path, encoding, dialect, fields, converters, field_index):
        """Read delimited text from a text file."""

        return cls(open(path, 'r', encoding=encoding), dialect, fields, converters, field_index)

    @classmethod
    def from_zipfile(cls, path, filename, encoding, dialect, fields,
                     converters, field_index):
        """Read delimited text from zipfile."""

        stream = ZipReader(path, filename).readlines(encoding)
        return cls(stream, dialect, fields, converters, field_index)


class ZipReader:
    """Reads zip file.

    Parameters
    ----------
    path : Absolute path to zip file archive.
    filename : File name in archive to read.
    """

    def __init__(self, path: str, filename: str):
        self.path = path
        self.filename = filename

    def read(self, encoding):
        """Read content into encoded str."""
        return self.read_bytes().decode(encoding)

    def read_bytes(self):
        """Read content into byte string."""

        with ZipFile(self.path, mode='r') as archive:
            return archive.read(self.filename)

    def readlines(self, encoding):
        """Read content into encoded str line generator."""

        return (line.decode(encoding) for line in self.readlines_bytes())

    def readlines_bytes(self):
        """Read content into byte str line iterator."""

        with open_zipfile_archive(self.path, self.filename) as file:
            for line in file:
                yield line.rstrip(b'\r\n')


@contextmanager
def open_zipfile_archive(path, filename):
    with ZipFile(path, mode='r') as archive:
        with BufferedReader(archive.open(filename, mode='r')) as file:
            yield file


def concatenate_streams(streams):
    """Chain a sequence of iterators into a single stream."""

    for stream in streams:
        yield from stream