ducminh-phan/reformat-gherkin

View on GitHub
reformat_gherkin/core.py

Summary

Maintainability
A
1 hr
Test Coverage
import sys
import traceback
from io import TextIOWrapper
from pathlib import Path
from typing import BinaryIO, Iterable, Iterator, Set, Tuple, Union

from .ast_node import GherkinDocument
from .errors import (
    BaseError,
    EmptySources,
    EquivalentError,
    InternalError,
    NothingChanged,
    StableError,
)
from .formatter import LineGenerator
from .options import NewlineMode, Options, WriteBackMode
from .parser import parse
from .report import Report
from .utils import decode_stream, diff, dump_to_file, err, open_stream_or_path

REPORT_URL = "https://github.com/ducminh-phan/reformat-gherkin/issues"

NEWLINE_FROM_OPTION = {
    NewlineMode.CRLF: "\r\n",
    NewlineMode.LF: "\n",
}


def find_sources(src: Iterable[str]) -> Set[Path]:
    sources: Set[Path] = set()

    for s in src:
        path = Path(s).resolve()
        if path.is_dir():
            sources.update(path.rglob("*.feature"))
        elif path.is_file():
            # If a file was explicitly given, we don't care about its extension
            sources.add(path)
        else:  # pragma: no cover
            err(f"Invalid path: {s}")

    return sources


def reformat(src: Tuple[str], report: Report, *, options: Options):
    use_stdin = "-" in src
    sources = find_sources(filter((lambda it: it != "-"), src))

    if not sources and not use_stdin:
        raise EmptySources

    if use_stdin:
        changed = reformat_stdin(options=options)
        report.done("stdin", changed)

    for path in sources:
        try:
            changed = reformat_single_file(path, options=options)
            report.done(str(path), changed)
        except Exception as e:
            report.failed(path, str(e))


def reformat_stdin(*, options: Options) -> bool:
    output = sys.stdout.buffer if options.write_back is WriteBackMode.INPLACE else None
    return reformat_stream_or_path(
        sys.stdin.buffer,
        output,
        force_write=True,
        options=options,
    )


def reformat_single_file(path: Path, *, options: Options) -> bool:
    out_path = path if options.write_back is WriteBackMode.INPLACE else None
    return reformat_stream_or_path(path, out_path, options=options)


def reformat_stream_or_path(
    in_stream_or_path: Union[BinaryIO, Path],
    out_stream_or_path: Union[None, BinaryIO, Path],
    *,
    force_write: bool = False,
    options: Options,
) -> bool:
    with open_stream_or_path(in_stream_or_path, "rb") as in_stream:
        src_contents, encoding, existing_newline = decode_stream(in_stream)

    newline = NEWLINE_FROM_OPTION.get(options.newline, existing_newline)
    newline_changed = newline != existing_newline

    content_changed = True
    try:
        dst_contents = format_file_contents(src_contents, options=options)
    except NothingChanged:
        content_changed = False
        dst_contents = src_contents

    will_write = force_write or content_changed or newline_changed

    if will_write and out_stream_or_path is not None:
        with open_stream_or_path(out_stream_or_path, "wb") as out_stream:
            tiow = TextIOWrapper(out_stream, encoding=encoding, newline=newline)
            tiow.write(dst_contents)
            # Ensures that the underlying stream is not closed when the
            # TextIOWrapper is garbage collected. We don't want to close a
            # stream that was passed to us.
            tiow.detach()

    return content_changed or newline_changed


def format_file_contents(src_contents: str, *, options: Options) -> str:
    """
    Reformat the contents of a file and return new contents. Raise NothingChanged
    if the contents were not changed after reformatting.

    If `options.fast` is False, additionally confirm that the reformatted file is
    valid by calling :func:`assert_equivalent` and :func:`assert_stable` on it.
    """
    if src_contents.strip() == "":
        raise NothingChanged

    dst_contents = format_str(src_contents, options=options)
    if src_contents == dst_contents:
        raise NothingChanged

    if not options.fast:
        assert_equivalent(src_contents, dst_contents)
        assert_stable(src_contents, dst_contents, options=options)

    return dst_contents


def format_str(src_contents: str, *, options: Options) -> str:
    """
    Reformat a string and return new contents.
    """
    ast = parse(src_contents)

    line_generator = LineGenerator(
        ast,
        options.step_keyword_alignment,
        options.tag_line_mode,
        options.indent,
    )
    lines = line_generator.generate()

    return "\n".join(lines)


def assert_equivalent(src: str, dst: str) -> None:
    """
    Raise EquivalentError if `src` and `dst` aren't equivalent.
    """

    def _v(ast: GherkinDocument) -> Iterator[str]:
        """
        Simple visitor generating strings to compare ASTs by content
        """
        for node in ast:
            yield repr(node)

    src_ast = parse(src)

    try:
        dst_ast = parse(dst)
    except BaseError as exc:
        log = dump_to_file("".join(traceback.format_tb(exc.__traceback__)), dst)
        raise InternalError(
            f"INTERNAL ERROR: Invalid file contents are produced:\n"
            f"{exc}\n"
            f"Please report a bug on {REPORT_URL}.\n"
            f"This invalid output might be helpful:\n"
            f"{log}\n"
        ) from exc

    src_ast_str = "\n".join(_v(src_ast))
    dst_ast_str = "\n".join(_v(dst_ast))

    if src_ast_str != dst_ast_str:
        log = dump_to_file(diff(src_ast_str, dst_ast_str, "src", "dst"))
        raise EquivalentError(
            f"INTERNAL ERROR: The new content produced is not equivalent to "
            f"the source.\n"
            f"Please report a bug on {REPORT_URL}.\n"
            f"This diff might be helpful: {log}\n"
        )


def assert_stable(src: str, dst: str, *, options: Options) -> None:
    """
    Raise StableError if `dst` reformats differently the second time.
    """
    new_dst = format_str(dst, options=options)
    if dst != new_dst:
        log = dump_to_file(
            diff(src, dst, "source", "first pass"),
            diff(dst, new_dst, "first pass", "second pass"),
        )
        raise StableError(
            f"INTERNAL ERROR: Different contents are produced on the second pass "
            f"of the formatter.\n"
            f"Please report a bug on {REPORT_URL}.\n"
            f"This diff might be helpful: {log}\n"
        ) from None