avocado-framework/avocado

View on GitHub
avocado/utils/diff_validator.py

Summary

Maintainability
B
5 hrs
Test Coverage
B
84%
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; specifically version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# This code was inspired in the autotest project,
# client/shared/utils.py
# Authors: Plamen Dimitrov <plamen.dimitrov@intra2net.com>, Kristof Katus <kristof.katus@intra2net.com>

"""
Diff validator: Utility for testing file changes

Some typical use of this utility would be:

>>> import diff_validator
>>> change = diff_validator.Change()
>>> change.add_validated_files(["/etc/somerc"])
>>> change.append_expected_add("/etc/somerc", "this is a new line")
>>> change.append_expected_remove("/etc/somerc", "this line is removed")
>>> diff_validator.make_temp_file_copies(change.get_target_files())

After making changes through some in-test operation:

>>> changes = diff_validator.extract_changes(change.get_target_files())
>>> change_success = diff_validator.assert_change(changes, change.files_dict)

If test fails due to invalid change on the system:

>>> if not change_success:
>>>     changes = diff_validator.assert_change_dict(changes, change.files_dict)
>>>     raise DiffValidationError("Change is different than expected:\n%s" % diff_validator.create_diff_report(changes))
>>> else:
>>>     logging.info("Change made successfully")
>>> diff_validator.del_temp_file_copies(change.get_target_files())

"""

import difflib
import os
import shutil


class DiffValidationError(Exception):
    pass


def get_temp_file_path(file_path):
    """
    Generates a temporary filename.

    :param str file_path: file path prefix
    :returns: appended file path
    :rtype: str
    """
    return file_path + ".tmp"


def make_temp_file_copies(file_paths):
    """
    Creates temporary copies of the provided files.

    :param file_paths: file paths to be copied
    :type file_paths: [str]
    """
    for file_path in file_paths:
        temp_file_path = get_temp_file_path(file_path)
        shutil.copyfile(file_path, temp_file_path)


def del_temp_file_copies(file_paths):
    """
    Deletes all the provided files.

    :param file_paths: deleted file paths (their temporary versions)
    :type file_paths: [str]
    """
    for file_path in file_paths:
        temp_file_path = get_temp_file_path(file_path)
        os.remove(temp_file_path)


def parse_unified_diff_output(lines):
    """
    Parses the unified diff output of two files.

    :param lines: diff lines
    :type lines: [str]
    :returns: pair of adds and removes, where each is a list of trimmed lines
    :rtype: ([str], [str])
    """
    adds = []
    removes = []
    for line in lines:
        # ignore filepaths in the output
        if len(line) > 2 and (line[:3] == "+++" or line[:3] == "---"):
            continue
        # ignore line range information in the output
        elif len(line) > 1 and line[:2] == "@@":
            continue
        # gather adds
        elif len(line) > 0 and line[0] == "+":
            added_line = line[1:].lstrip().rstrip()
            if len(added_line) == 0:
                continue
            adds = adds + [added_line]
        # gather removes
        elif len(line) > 0 and line[0] == "-":
            removed_line = line[1:].lstrip().rstrip()
            if len(removed_line) == 0:
                continue
            removes = removes + [removed_line]
    return (adds, removes)


def extract_changes(file_paths, compared_file_paths=None):
    """
    Extracts diff information based on the new and temporarily saved old files.

    :param file_paths: original file paths (whose temporary versions will be retrieved)
    :type file_paths: [str]
    :param compared_file_paths: custom file paths to use instead of the temporary versions
    :type compared_file_paths: [str] or None
    :returns: file paths with corresponding diff information key-value pairs
    :rtype: {str, ([str], [str])}
    """
    changes = {}
    if compared_file_paths is None:
        compared_file_paths = []

    for i in range(len(file_paths)):
        temp_file_path = get_temp_file_path(file_paths[i])

        if len(compared_file_paths) > i:
            file1, file2 = compared_file_paths[i], file_paths[i]
        else:
            file1, file2 = temp_file_path, file_paths[i]
        with open(file1, encoding="utf-8") as f1:
            lines1 = f1.readlines()
        with open(file2, encoding="utf-8") as f2:
            lines2 = f2.readlines()
        lines = difflib.unified_diff(lines1, lines2, fromfile=file1, tofile=file2, n=0)

        changes[file_paths[i]] = parse_unified_diff_output(lines)
    return changes


def assert_change_dict(actual_result, expected_result):
    """
    Calculates unexpected line changes.

    :param actual_result: actual added and removed lines
    :type actual_result: {file_path, ([added_line, ...], [removed_line, ...])}
    :param expected_result: expected added and removed lines
    :type expected_result: {file_path, ([added_line, ...], [removed_line, ...])}
    :returns: detected differences as groups of lines with filepath keys and a tuple of
              (unexpected_adds, not_present_adds, unexpected_removes, not_present_removes)
    :rtype: {str, (str, str, str, str)}
    """
    change_diffs = {}
    for file_path, actual_changes in actual_result.items():
        expected_changes = expected_result[file_path]

        actual_adds = actual_changes[0]
        actual_removes = actual_changes[1]
        expected_adds = expected_changes[0]
        expected_removes = expected_changes[1]

        # Additional unexpected adds -- they should have been not added
        unexpected_adds = sorted(set(actual_adds) - set(expected_adds))
        # Not present expected adds -- they should have been added
        not_present_adds = sorted(set(expected_adds) - set(actual_adds))
        # Additional unexpected removes - they should have been not removed
        unexpected_removes = sorted(set(actual_removes) - set(expected_removes))
        # Not present expected removes - they should have been removed
        not_present_removes = sorted(set(expected_removes) - set(actual_removes))

        change_diffs[file_path] = (
            unexpected_adds,
            not_present_adds,
            unexpected_removes,
            not_present_removes,
        )

    return change_diffs


def assert_change(actual_result, expected_result):
    """
    Condition wrapper of the upper method.

    :param actual_result: actual added and removed lines with filepath keys and a tuple of
                          ([added_line, ...], [removed_line, ...])
    :type actual_result: {str, ([str], [str])}
    :param expected_result: expected added and removed lines of type as the actual result
    :type expected_result: {str, ([str], [str])}
    :returns: whether changes were detected
    :rtype: bool
    """
    change_diffs = assert_change_dict(actual_result, expected_result)
    for file_change in change_diffs.values():
        for line_change in file_change:
            if len(line_change) != 0:
                return False
    return True


def create_diff_report(change_diffs):
    """
    Pretty prints the output of the `change_diffs` variable.

    :param change_diffs: detected differences as groups of lines with filepath keys and a tuple of
                         (unexpected_adds, not_present_adds, unexpected_removes, not_present_removes)
    :type: {str, (str, str, str, str)}
    :returns: print string of the line differences
    :rtype: str
    """
    diff_strings = []
    for file_path, change_diff in change_diffs.items():
        if not (change_diff[0] or change_diff[1] or change_diff[2] or change_diff[3]):
            continue
        diff_strings.append(f"--- {get_temp_file_path(file_path)}")
        diff_strings.append(f"+++ {file_path}")
        for iter_category in range(4):
            change_category = change_diff[iter_category]
            if iter_category == 0 and change_category:
                diff_strings.append("*++ Additional unexpected adds")
            elif iter_category == 1 and change_category:
                diff_strings.append("/++ Not present expected adds")
            elif iter_category == 2 and change_category:
                diff_strings.append("*-- Additional unexpected removes")
            elif iter_category == 3 and change_category:
                diff_strings.append("/-- Not present expected removes")
            for line_change in change_category:
                diff_strings.append(str(line_change).encode("unicode_escape").decode())
    return "\n".join(diff_strings)


class Change:
    """Class for tracking and validating file changes"""

    def __init__(self):
        """Creates a change object."""
        self.files_dict = {}

    def get_target_files(self):
        """Get added files for change."""
        return list(self.files_dict.keys())

    def add_validated_files(self, filenames):
        """
        Add file to change object.

        :param filenames: files to validate
        :type filenames: [str]
        """
        for filename in filenames:
            self.files_dict[filename] = ([], [])

    def append_expected_add(self, filename, line):
        """
        Append expected added line to a file.

        :param str filename: file to append to
        :param str line: line to append to as an expected addition
        """
        try:
            self.files_dict[filename][0].append(line)
        except KeyError:
            self.files_dict[filename] = ([], [])
            self.files_dict[filename][0].append(line)

    def append_expected_remove(self, filename, line):
        """
        Append removed added line to a file.

        :param str filename: file to append to
        :param str line: line to append to as an expected removal
        """
        try:
            self.files_dict[filename][1].append(line)
        except KeyError:
            self.files_dict[filename] = ([], [])
            self.files_dict[filename][1].append(line)

    def get_all_adds(self):
        """Return a list of the added lines for all validated files."""
        all_adds = []
        for f in list(self.files_dict.keys()):
            for add in self.files_dict[f][0]:
                all_adds.append(add)
        return all_adds

    def get_all_removes(self):
        """Return a list of the removed lines for all validated files."""
        all_removes = []
        for f in list(self.files_dict.keys()):
            for add in self.files_dict[f][1]:
                all_removes.append(add)
        return all_removes