yukihiko-shinoda/god-slayer

View on GitHub
godslayer/reader_operator_factory.py

Summary

Maintainability
A
50 mins
Test Coverage
"""Factory for ReaderOperator."""
from pathlib import Path
from typing import List, Optional

from godslayer.csv.reader_operators.header_skipper import HeaderSkipper
from godslayer.csv.reader_operators.partition_skip_record_before_footer_reader import (
    PartitionSkipRecordBeforeFooterReader,
)
from godslayer.csv.reader_operators.partition_skip_record_reader import PartitionSkipRecordReader
from godslayer.csv.reader_operators.reader_operator import ReaderOperator
from godslayer.csv.reader_operators.record_before_footer_reader import RecordBeforeFooterReader
from godslayer.csv.reader_operators.record_reader import RecordReader
from godslayer.last_line_detector import LastLineDetector

__all__ = ["ReaderOperatorFactory"]


class ReaderOperatorFactory:
    """Factory for ReaderOperator."""

    def __init__(
        self,
        path_to_file: Path,
        *,
        header: Optional[List[str]] = None,
        partition: Optional[List[str]] = None,
        footer: Optional[List[str]] = None,
        # pylint: disable=duplicate-code
        encoding: str = "utf-8"
    ):
        self.path_to_file = path_to_file
        self.header = header
        self.partition = partition
        self.footer = footer
        self.encoding = encoding

    def create(self) -> ReaderOperator:
        """Creates ReaderOperator."""
        header_skipper: Optional[HeaderSkipper] = None if self.header is None else HeaderSkipper(self.header)
        if self.footer is not None:
            index_last_line = LastLineDetector.detect_index(self.path_to_file, self.footer, self.encoding)
            reader_operator = self._create_record_before_footer_reader(index_last_line, header_skipper)
        else:
            reader_operator = self._create_no_footer_record_reader(header_skipper)
        return reader_operator

    def _create_no_footer_record_reader(self, header_skipper: Optional[HeaderSkipper]):
        if self.partition is not None:
            return PartitionSkipRecordReader(self.partition, before_task=header_skipper)
        return RecordReader(before_task=header_skipper)

    def _create_record_before_footer_reader(self, index_last_line: int, header_skipper: Optional[HeaderSkipper]):
        if self.partition is not None:
            return PartitionSkipRecordBeforeFooterReader(self.partition, index_last_line, before_task=header_skipper)
        return RecordBeforeFooterReader(index_last_line, before_task=header_skipper)