KarrLab/obj_tables

View on GitHub
obj_tables/migrate.py

Summary

Maintainability
F
2 wks
Test Coverage
A
91%
""" Support schema migration

:Author: Arthur Goldberg <Arthur.Goldberg@mssm.edu>
:Date: 2018-11-18
:Copyright: 2018, Karr Lab
:License: MIT
"""

from abc import ABC, abstractmethod
from cement import Controller
from enum import Enum
from modulefinder import ModuleFinder
from networkx.algorithms.dag import topological_sort, ancestors
from pathlib import Path, PurePath
from pprint import pformat
from urllib.parse import urlparse
from warnings import warn
import argparse
import cement
import collections.abc
import copy
import datetime
import git
import importlib
import importlib.util
import inspect
import networkx as nx
import os
import re
import shutil
import sys
import tempfile
import warnings
import yaml

from obj_tables import TableFormat, RelatedAttribute, get_models, utils
from obj_tables.io import WorkbookReader, Reader, Writer
from obj_tables.utils import SchemaRepoMetadata
from wc_utils.config.core import AltResourceName
from wc_utils.util.files import normalize_filename, remove_silently
from wc_utils.util.list import det_find_dupes, det_count_elements, dict_by_class
import obj_tables
import obj_tables.abstract
import wc_utils

# todo: more intuitive expression of renamed_attributes as [ExistingModelName.existing_attr_name, MigratedModelName.migrated_attr_name]
# todo: automatically retry git requests, perhaps using the requests package


class MigratorError(Exception):
    """ Exception raised for errors in obj_tables.migrate

    Attributes:
        message (:obj:`str`): the exception's message
    """

    def __init__(self, message=None):
        super().__init__(message)


class MigrateWarning(UserWarning):
    """ Migrate warning """
    pass


class SchemaModule(object):
    """ Represent and import a schema module

    Attributes:
        module_path (:obj:`str`): path to the module
        abs_module_path (:obj:`str`): absolute path to the module
        directory (:obj:`str`): if the module is in a package, the path to the package's directory;
            otherwise the directory containing the module
        package_name (:obj:`str`): if the module is in a package, the name of the package containing the
            module; otherwise `None`
        module_name (:obj:`str`): the module's module name
    """

    # cached schema modules that have been imported, indexed by full pathnames
    MODULES = {}

    def __init__(self, module_path, dir=None):
        """ Initialize a `SchemaModule`

        Args:
            module_path (:obj:`str`): path to the module
            dir (:obj:`str`, optional): a directory that contains `self.module_path`
        """
        self.module_path = module_path
        self.abs_module_path = normalize_filename(self.module_path, dir=dir)
        self.directory, self.package_name, self.module_name = self.parse_module_path(self.abs_module_path)

    def get_path(self):
        return str(self.abs_module_path)

    @staticmethod
    def parse_module_path(module_path):
        """ Parse the path to a module

        If the module is not in a package, provide its directory and module name.
        If the module is in a package, provide its directory, package name and module name.
        The directory can be used as a `sys.path` entry.

        Args:
            module_path (:obj:`str`): path of a Python module file

        Returns:
            :obj:`tuple`: a triple containing directory, package name and module name, as described
                above. If the module is not in a package, then package name is `None`.

        Raises:
            :obj:`MigratorError`: if `module_path` is not the name of a Python file, or is not a file
        """
        path = Path(module_path)

        if not path.suffixes == ['.py']:
            raise MigratorError("'{}' is not a Python source file name".format(module_path))
        if not path.is_file():
            raise MigratorError("'{}' is not a file".format(module_path))

        # go up directory hierarchy from path and get first directory that does not contain '__init__.py'
        try:
            package_root = AltResourceName.get_package_root(module_path)
            dir = os.path.dirname(package_root)
            package_name = str(path.relative_to(dir).parent).replace('/', '.')
            module_name = package_name + '.' + path.stem
            return dir, package_name, module_name

        except ValueError:
            module_directory = path.parent
            module_name = path.stem
            return str(module_directory), None, module_name

    # suffix for munged model names
    # include whitespace so munged Model names cannot collide with actual Model names
    MUNGED_MODEL_NAME_SUFFIX = '_MUNGED WITH SPACES'

    def in_package(self):
        """ Is the schema in a package

        Returns:
            :obj:`bool`: whether the schema is in a package
        """
        return self.package_name is not None

    @staticmethod
    def _munged_model_name(model):
        """ Provide munged name for `model`

        If `model`'s name is already munged, return the name.

        Args:
            model (:obj:`obj_tables.Model`): a model

        Returns:
            :obj:`str`: a munged name for model, made by appending `SchemaModule.MUNGED_MODEL_NAME_SUFFIX`
        """
        if not SchemaModule._model_name_is_munged(model):
            return "{}{}".format(model.__name__, SchemaModule.MUNGED_MODEL_NAME_SUFFIX)
        else:
            return model.__name__

    @staticmethod
    def _unmunged_model_name(model):
        """ Provide unmunged name for `model`

        If `model`'s name isn't munged, return the name.

        Args:
            model (:obj:`obj_tables.Model`): a model

        Returns:
            :obj:`str`: an unmunged name for `model`, made by removing the suffix
                `SchemaModule.MUNGED_MODEL_NAME_SUFFIX`
        """
        if SchemaModule._model_name_is_munged(model):
            return model.__name__[:-len(SchemaModule.MUNGED_MODEL_NAME_SUFFIX)]
        else:
            return model.__name__

    @staticmethod
    def _model_name_is_munged(model):
        """ Is `model`'s name munged

        Args:
            model (:obj:`obj_tables.Model`): a model

        Returns:
            :obj:`bool`: True if `model` is munged
        """
        return model.__name__.endswith("{}".format(SchemaModule.MUNGED_MODEL_NAME_SUFFIX))

    @staticmethod
    def _munge_all_model_names():
        """ Munge the names of all models, so the models cannot be found by name and reused
        """
        for model in get_models():
            model.__name__ = SchemaModule._munged_model_name(model)

    @staticmethod
    def _unmunge_all_munged_model_names():
        """ Unmunge the names of all models so they can be used, inverting `_munge_all_model_names`
        """
        for model in get_models():
            model.__name__ = SchemaModule._unmunged_model_name(model)

    def import_module_for_migration(self, validate=True, required_attrs=None, debug=False,
                                    mod_patterns=None, print_code=False):
        """ Import a schema from a Python module in a file, which may be in a package

        Args:
            validate (:obj:`bool`, optional): whether to validate the module; default is :obj:`True`
            required_attrs (:obj:`list` of :obj:`str`, optional): list of attributes that must be
                present in the imported module
            debug (:obj:`bool`, optional): if :obj:`True`, print debugging output; default is :obj:`False`
            mod_patterns (:obj:`list` of :obj:`str`, optional): RE patterns used to search for
                modules in `sys.modules`; modules whose names match a pattern
                are output when `debug` is :obj:`True`
            print_code (:obj:`bool`, optional): if :obj:`True`, while debugging print code being imported;
                default is :obj:`False`

        Returns:
            :obj:`Module`: the `Module` loaded from `self.module_path`

        Raises:
            :obj:`MigratorError`: if one of the following conditions is met:

                * The schema at `self.module_path` cannot be imported
                * Validate is :obj:`True` and any related attribute in any model references a model
                  not in the module
                * The module is missing a required attribute
        """
        if debug:
            print('\nimport_module_for_migration', self.module_name)
            if SchemaModule.MODULES:
                print('SchemaModule.MODULES:')
                for path, module in SchemaModule.MODULES.items():
                    name = module.__name__
                    package = module.__package__
                    if package:
                        name = package + '.' + name
                    print('\t', path, name)
            else:
                print('no SchemaModule.MODULES')

        # if a schema has already been imported, return its module so each schema has one internal representation
        if self.get_path() in SchemaModule.MODULES:
            if debug:
                print('reusing', self.get_path())
            return SchemaModule.MODULES[self.get_path()]

        # temporarily munge names of all models so they're not reused
        SchemaModule._munge_all_model_names()

        # copy sys.paths so it can be restored & sys.modules so new modules in self.module can be deleted
        sys_attrs = ['path', 'modules']
        saved = {}
        for sys_attr in sys_attrs:
            saved[sys_attr] = getattr(sys, sys_attr).copy()
        # temporarily put the directory holding the module being imported on sys.path
        sys.path.insert(0, self.directory)

        # todo: is this suspension of check that related attribute names don't clash really needed?
        obj_tables.core.ModelMeta.CHECK_SAME_RELATED_ATTRIBUTE_NAME = False

        def print_file(fn, max=100):
            print('\timporting: {}:'.format(fn))
            n = 0
            for line in open(fn, 'r').readlines():
                print('\t\t', line, end='')
                n += 1
                if max <= n:
                    print('\tExceeded max ({}) lines'.format(max))
                    break
            print()

        if debug:
            print('path:', self.get_path())
            i = 1
            targets = set('ChemicalStructure MolecularStructure Parameter DataValue'.split())
            for line in open(self.get_path(), 'r').readlines():
                i += 1
                for target in targets:
                    if 'class ' + target in line:
                        print('\t', i, ':', line, end='')
                        break

        if debug:
            print("\n== importing {} from '{}' ==".format(self.module_name, self.directory))
            if print_code:
                if '.' in self.module_name:
                    nested_modules = self.module_name.split('.')
                    for i in range(len(nested_modules)):
                        path = os.path.join(self.directory, *nested_modules[:i], '__init__.py')
                        if os.path.isfile(path):
                            print_file(path)
                self.module_name, self.directory
                print_file(self.get_path())

            if mod_patterns:
                if not isinstance(mod_patterns, collections.abc.Iterable) or isinstance(mod_patterns, str):
                    raise MigratorError(
                        "mod_patterns must be an iterator that's not a string; but it is a(n) '{}'".format(
                            type(mod_patterns).__name__))
                print("sys.modules entries matching RE patterns: '{}':".format("', '".join(mod_patterns)))
                compiled_mod_patterns = [re.compile(mod_pattern) for mod_pattern in mod_patterns]
                for name, module in sys.modules.items():
                    for compiled_mod_pattern in compiled_mod_patterns:
                        if compiled_mod_pattern.search(name):
                            if hasattr(module, '__file__'):
                                print('\t', name, module.__file__)
                            else:
                                print('\t', name)
                            break

            if debug:
                print('sys.path:')
                for p in sys.path:
                    print('\t', p)

        try:
            new_module = importlib.import_module(self.module_name)

            if debug:
                models_found = set()
                names = set()
                for name, cls in inspect.getmembers(new_module, inspect.isclass):
                    names.add(name)
                    if issubclass(cls, obj_tables.core.Model) and \
                            cls not in {obj_tables.Model, obj_tables.abstract.AbstractModel}:
                        models_found.add(name)
                print('targets in names', names.intersection(targets))
                print('targets in models_found', models_found.intersection(targets))

        except (SyntaxError, ImportError, AttributeError, ValueError, NameError) as e:
            raise MigratorError("'{}' cannot be imported and exec'ed: {}: {}".format(
                self.get_path(), e.__class__.__name__, e))
        finally:

            # reset global variable
            obj_tables.core.ModelMeta.CHECK_SAME_RELATED_ATTRIBUTE_NAME = True

            # unmunge names of all models, so they're normal for all other wc code
            SchemaModule._unmunge_all_munged_model_names()

            # to avoid accessing the schema via other import statements restore sys.path
            sys.path = saved['path']

            # sort the set difference to make this code deterministic
            new_modules = sorted(set(sys.modules) - set(saved['modules']))
            if debug:
                if new_modules:
                    print('new modules:')
                    for k in new_modules:
                        module = sys.modules[k]
                        name = getattr(module, '__name__')
                        package = getattr(module, '__package__')
                        if package:
                            name = package + '.' + name
                        file = module.__file__ if hasattr(module, '__file__') else ''
                        print('\t', name, file)

                changed_modules = [k for k in set(sys.modules) & set(saved['modules'])
                                   if sys.modules[k] != saved['modules'][k]]
                if changed_modules:
                    print('changed modules:')
                    for k in changed_modules:
                        print(k)

            # to enable the loading of many versions of schema modules, remove them from sys.modules
            # to improve performance, do not remove other modules
            def first_component_module_name(module_name):
                return module_name.split('.')[0]
            first_component_imported_module = first_component_module_name(self.module_name)
            for k in new_modules:
                if first_component_module_name(k) == first_component_imported_module:
                    del sys.modules[k]
            # leave changed modules alone

        if required_attrs:
            # module must have the required attributes
            errors = []
            for required_attr in required_attrs:
                if not hasattr(new_module, required_attr):
                    errors.append("module in '{}' missing required attribute '{}'".format(
                        self.get_path(), required_attr))
            if errors:
                raise MigratorError('\n'.join(errors))

        if validate:
            errors = self._check_imported_models(module=new_module)
            if errors:
                raise MigratorError('\n'.join(errors))

        # save an imported schema in SchemaModule.MODULES, indexed by its path
        # results will be unpredictable if different code is imported from the same path
        SchemaModule.MODULES[self.get_path()] = new_module

        return new_module

    def _get_model_defs(self, module):
        """ Obtain the `obj_tables.Model`\ s in a module

        Args:
            module (:obj:`Module`): a `Module` containing subclasses of `obj_tables.Model`

        Returns:
            :obj:`dict`: the Models in a module

        Raises:
            :obj:`MigratorError`: if no subclasses of `obj_tables.Model` are found
        """
        models = {}
        for name, cls in inspect.getmembers(module, inspect.isclass):
            if issubclass(cls, obj_tables.core.Model) and \
                    cls not in {obj_tables.Model, obj_tables.abstract.AbstractModel}:
                models[name] = cls
        # ensure that a schema contains some obj_tables.Models
        if not models:
            raise MigratorError("No subclasses of obj_tables.Model found in '{}'".format(self.abs_module_path))

        # start: temporary hack until all references to GitMetadata are removed from test repos
        # todo: rebuild obj_tables_test_schema_repo & obj_tables_test_migration_repo without references to GitMetadata
        to_delete = []
        for model in models.keys():
            if model.endswith('GitMetadata'):
                to_delete.append(model)
        for model in to_delete:
            del models[model]
        # end: temporary hack
        return models

    def _check_imported_models(self, module=None):
        """ Check consistency of an imported module

        Args:
            module (:obj:`Module`, optional): a `Module` containing subclasses of `obj_tables.Model`;
            if not provided, the module is imported

        Returns:
            :obj:`list`: errors in the module
        """
        module = self.import_module_for_migration() if module is None else module
        model_defs = self._get_model_defs(module)

        # ensure that all RelatedAttributes in all models reference models in the module
        errors = []
        for model_name, model in model_defs.items():
            for attr_name, local_attr in model.Meta.local_attributes.items():

                if isinstance(local_attr.attr, RelatedAttribute):
                    related_class = local_attr.related_class
                    if related_class != model_defs[related_class.__name__]:
                        errors.append("{}.{} references a {}, but it's not the model in module {}".format(
                            model_name, attr_name, related_class.__name__, module.__name__))
        return errors

    def run(self):
        """ Import a schema and provide its `obj_tables.Model`\ s

        Returns:
            :obj:`dict`: the imported Models

        Raises:
            :obj:`MigratorError`: if `self.module_path` cannot be loaded
        """
        module = self.import_module_for_migration()
        return self._get_model_defs(module)

    def __str__(self):
        vals = []
        for attr in ['module_path', 'abs_module_path', 'directory', 'package_name', 'module_name']:
            vals.append("{}: {}".format(attr, getattr(self, attr)))
        return '\n'.join(vals)


class Migrator(object):
    """ Support schema migration

    Attributes:
        existing_schema (:obj:`SchemaModule`): the existing schema, and its properties
        migrated_schema (:obj:`SchemaModule`): the migrated schema, and its properties
        existing_defs (:obj:`dict`): `obj_tables.Model` definitions of the existing models, keyed by name
        migrated_defs (:obj:`dict`): `obj_tables.Model` definitions of the migrated models, keyed by name
        deleted_models (:obj:`set`): model types defined in the existing models but not the migrated models
        renamed_models (:obj:`list` of :obj:`tuple`): model types renamed from the existing to the
            migrated schema
        models_map (:obj:`dict`): map from existing model names to migrated model names
        renamed_attributes (:obj:`list` of :obj:`tuple`): attribute names renamed from the existing to
            the migrated schema
        renamed_attributes_map (:obj:`dict`): map of attribute names renamed from the existing to the
            migrated schema
        _migrated_copy_attr_name (:obj:`str`): attribute name used to point existing models to corresponding
            migrated models; not used in any existing schema
        io_classes (:obj:`dict` of :obj:`type`): reader and writer for I/O of existing and migrated files,
            respectively; defaults provided in `DEFAULT_IO_CLASSES`
        transformations (:obj:`MigrationWrapper`): optional transformations which modify models before
            and/or after migration
    """

    SCALAR_ATTRS = ['deleted_models', '_migrated_copy_attr_name']
    COLLECTIONS_ATTRS = ['existing_defs', 'migrated_defs', 'renamed_models', 'models_map',
                         'renamed_attributes', 'renamed_attributes_map']

    # default suffix for a migrated model file
    MIGRATE_SUFFIX = '_migrated'

    # modules being used for migration, indexed by full pathname
    # Migrator does not need or support packages
    modules = {}

    # prefix of attribute name used to connect existing and migrated models during migration
    MIGRATED_COPY_ATTR_PREFIX = '__migrated_copy'

    # the name of the attribute used in expression Models to hold their ParsedExpressions
    PARSED_EXPR = '_parsed_expression'

    # default R/W methods are the obj_tables.io methods
    DEFAULT_IO_CLASSES = dict(reader=Reader, writer=Writer)

    def __init__(self, existing_defs_file=None, migrated_defs_file=None, renamed_models=None,
                 renamed_attributes=None, io_classes=None, transformations=None):
        """ Construct a Migrator

        If it's defined, `transformations` uses its `prepare_existing_models` method to modify existing
        models just before they are migrated and uses its `modify_migrated_models` method to modify
        migrated models just after they are migrated.
        A different `transformations` is associated with each granular step in a sequence
        of migrations (see :obj:`SchemaChanges` below) so that different transformations can wrap migration
        in each step.

        Args:
            existing_defs_file (:obj:`str`, optional): path of a file containing existing Model definitions
            migrated_defs_file (:obj:`str`, optional): path of a file containing migrated Model definitions;
                filenames optional so that `Migrator` can use models defined in memory
            renamed_models (:obj:`list` of :obj:`tuple`, optional): model types renamed from the existing to the
                migrated schema; has the form `[('Existing_1', 'Migrated_1'), ..., ('Existing_n', 'Migrated_n')]`,
                where `('Existing_i', 'Migrated_i')` indicates that existing model `Existing_i` is
                being renamed into migrated model `Migrated_i`.
            renamed_attributes (:obj:`list` of :obj:`tuple`, optional): attribute names renamed from the existing
                to the migrated schema; a list of tuples of the form
                `(('Existing_Model_i', 'Existing_Attr_x'), ('Migrated_Model_j', 'Migrated_Attr_y'))`,
                which indicates that `Existing_Model_i.Existing_Attr_x` will migrate to
                `Migrated_Model_j.Migrated_Attr_y`
            io_classes (:obj:`dict` of :obj:`type`, optional): reader and/or writer for I/O of existing and
                migrated files, respectively; if provided, overrides defaults provided in `DEFAULT_IO_CLASSES`
            transformations (:obj:`MigrationWrapper`, optional): transformations which modify models
                before and/or after migration
        """
        self.existing_schema = SchemaModule(existing_defs_file) if existing_defs_file else None
        self.migrated_schema = SchemaModule(migrated_defs_file) if migrated_defs_file else None
        self.renamed_models = renamed_models if renamed_models else []
        self.renamed_attributes = renamed_attributes if renamed_attributes else []
        self.io_classes = self.DEFAULT_IO_CLASSES
        if io_classes:
            self.io_classes = {**self.DEFAULT_IO_CLASSES, **io_classes}
        self.transformations = transformations

    def _load_defs_from_files(self):
        """ Initialize a `Migrator`\ s schemas from files

        Distinct from `prepare` so most of `Migrator` can be tested with models defined in code

        Returns:
            :obj:`Migrator`: `self`, to enable chaining
        """
        if self.existing_schema:
            self.existing_defs = self.existing_schema.run()
        if self.migrated_schema:
            self.migrated_defs = self.migrated_schema.run()
        return self

    def _get_migrated_copy_attr_name(self):
        """ Obtain name of attribute used in an existing model to reference its migrated model

        Returns:
            :obj:`str`: attribute name for a migrated copy
        """
        max_len = 0
        for existing_model_def in self.existing_defs.values():
            for attr in existing_model_def.Meta.attributes.values():
                max_len = max(max_len, len(attr.name))
        return "{}{}".format(Migrator.MIGRATED_COPY_ATTR_PREFIX,
                             '_' * (max_len + 1 - len(Migrator.MIGRATED_COPY_ATTR_PREFIX)))

    def _validate_renamed_models(self):
        """ Validate renamed models

        Ensure that renamed models:

        * properly reference the existing and migrated schemas, and
        * don't use an existing or migrated model in more than one mapping

        If the renamed models validate, then create a map from existing to migrated models.

        Returns:
            :obj:`list` of :obj:`str`: errors in the renamed models
        """
        self.models_map = {}
        errors = []
        # check renamed models
        for existing_model, migrated_model in self.renamed_models:
            if existing_model not in self.existing_defs:
                errors.append("'{}' in renamed models not an existing model".format(existing_model))
            if migrated_model not in self.migrated_defs:
                errors.append("'{}' in renamed models not a migrated model".format(migrated_model))
        duped_existing_models = det_find_dupes([existing_model for existing_model, _ in self.renamed_models])
        if duped_existing_models:
            errors.append("duplicated existing models in renamed models: '{}'".format(duped_existing_models))
        duped_migrated_models = det_find_dupes([migrated_model for _, migrated_model in self.renamed_models])
        if duped_migrated_models:
            errors.append("duplicated migrated models in renamed models: '{}'".format(duped_migrated_models))
        if errors:
            return errors

        # create self.models_map which maps existing model names to migrated model names
        # assumes that existing and migrated models with the same name correspond unless they're specified
        # in `self.renamed_models` or not migrated
        self.models_map = dict(self.renamed_models)
        for existing_model in self.existing_defs:
            if existing_model not in self.models_map and existing_model in self.migrated_defs:
                self.models_map[existing_model] = existing_model

        return errors

    def _validate_renamed_attrs(self):
        """ Validate renamed attributes

        Ensure that renamed attributes:

        * properly reference the existing and migrated schemas, and
        * don't use an existing or migrated attribute in more than one mapping

        If the renamed attributes validate, create a map from existing to migrated attributes.

        Returns:
            :obj:`list` of :obj:`str`: errors in the renamed attributes
        """
        self.renamed_attributes_map = {}
        errors = []

        # check renamed attributes
        for (existing_model, existing_attr), (migrated_model, migrated_attr) in self.renamed_attributes:
            if existing_model not in self.existing_defs or \
                    existing_attr not in self.existing_defs[existing_model].Meta.attributes:
                errors.append("'{}.{}' in renamed attributes not an existing model.attribute".format(
                    existing_model, existing_attr))
            if migrated_model not in self.migrated_defs or \
                    migrated_attr not in self.migrated_defs[migrated_model].Meta.attributes:
                errors.append("'{}.{}' in renamed attributes not a migrated model.attribute".format(
                    migrated_model, migrated_attr))
            # renamed attributes must be consistent with renamed models
            # i.e., if an attribute is renamed A.x -> B.y then model A must be renamed A -> B
            if existing_model not in self.models_map or \
                    migrated_model != self.models_map[existing_model]:
                errors.append("renamed attribute '{}.{} -> {}.{}' not consistent with renamed models".format(
                    existing_model, existing_attr, migrated_model, migrated_attr))
        duped_existing_attributes = det_find_dupes([existing_attr for existing_attr, _ in self.renamed_attributes])
        if duped_existing_attributes:
            errors.append("duplicated existing attributes in renamed attributes: '{}'".format(
                duped_existing_attributes))
        duped_migrated_attributes = det_find_dupes([migrated_attr for _, migrated_attr in self.renamed_attributes])
        if duped_migrated_attributes:
            errors.append("duplicated migrated attributes in renamed attributes: '{}'".format(
                duped_migrated_attributes))

        if errors:
            return errors

        # create self.renamed_attributes_map which maps renamed existing model, attribute pairs to
        # their corresponding migrated model, attribute pairs
        self.renamed_attributes_map = dict(self.renamed_attributes)
        return errors

    def _get_mapped_attribute(self, existing_class, existing_attribute):
        """ Get the corresponding migrated class and attribute for the existing class and attribute

        Args:
            existing_class (:obj:`obj_tables.core.ModelMeta` or :obj:`str`): an existing class
            existing_attribute (:obj:`obj_tables.core.ModelMeta` or :obj:`str`): an attribute in
                `existing_class`

        Returns:
            :obj:`tuple` of :obj:`str`: the pair (migrated model name, migrated attr name); return (None, None)
                if the existing class and attribute do not map to a migrated class and attribute
        """

        # convert existing_class and existing_attribute to strings
        if isinstance(existing_class, obj_tables.core.ModelMeta):
            existing_class = existing_class.__name__
        if isinstance(existing_attribute, obj_tables.Attribute):
            existing_attribute = existing_attribute.name

        # try to map existing_class, existing_attribute in renamed attributes
        if (existing_class, existing_attribute) in self.renamed_attributes_map:
            return self.renamed_attributes_map[(existing_class, existing_attribute)]

        # otherwise try to map existing_class as an unchanged model in self.models_map
        if existing_class in self.models_map:
            migrated_class = self.models_map[existing_class]
            if existing_attribute in self.migrated_defs[migrated_class].Meta.attributes:
                return (migrated_class, self.migrated_defs[migrated_class].Meta.attributes[existing_attribute].name)

        return (None, None)

    def prepare(self):
        """ Prepare for migration

        Raises:
            :obj:`MigratorError`: if renamings are not valid, or
                inconsistencies exist between corresponding existing and migrated classes
        """
        # load schemas from files
        self._load_defs_from_files()

        # validate model and attribute rename specifications
        errors = self._validate_renamed_models()
        if errors:
            raise MigratorError('\n'.join(errors))
        errors = self._validate_renamed_attrs()
        if errors:
            raise MigratorError('\n'.join(errors))

        # find deleted models
        used_models = set([existing_model for existing_model in self.models_map])
        self.deleted_models = set(self.existing_defs).difference(used_models)

        # check that corresponding models in existing and migrated are consistent
        inconsistencies = []
        for existing_model, migrated_model in self.models_map.items():
            inconsistencies.extend(self._get_inconsistencies(existing_model, migrated_model))
        if inconsistencies:
            raise MigratorError('\n' + '\n'.join(inconsistencies))

        # get attribute name not used in existing schemas so that existing models can point to migrated models
        self._migrated_copy_attr_name = self._get_migrated_copy_attr_name()

    def _get_inconsistencies(self, existing_model, migrated_model):
        """ Detect inconsistencies between `existing_model` and `migrated_model` model classes

        Detect inconsistencies between `existing_model` and `migrated_model`. Inconsistencies arise if the loaded `existing_model`
        or `migrated_model` definitions are not consistent with their model or attribute renaming specifications
        or with each other.

        Args:
            existing_model (:obj:`str`): name of an existing model class
            migrated_model (:obj:`str`): name of the corresponding migrated model class

        Returns:
            :obj:`list`: inconsistencies between existing_model_cls and migrated_model_cls; an empty list if
                no inconsistencies exist
        """
        inconsistencies = []

        # constraint: existing_model and migrated_model must be available in their respective schemas
        path = "'{}'".format(self.existing_schema.get_path()) if self.existing_schema \
            else 'existing models definitions'
        if existing_model not in self.existing_defs:
            inconsistencies.append("existing model {} not found in {}".format(existing_model, path))
        path = "'{}'".format(self.migrated_schema.get_path()) if self.migrated_schema \
            else 'migrated models definitions'
        if migrated_model not in self.migrated_defs:
            inconsistencies.append("migrated model {} corresponding to existing model {} not found in {}".format(
                migrated_model, existing_model, path))
        if inconsistencies:
            # return these inconsistencies because they prevent checks below from running accurately
            return inconsistencies

        # constraint: existing_model and migrated_model must be have the same type, which will be obj_tables.core.ModelMeta
        existing_model_cls = self.existing_defs[existing_model]
        migrated_model_cls = self.migrated_defs[migrated_model]
        if type(existing_model_cls) != type(migrated_model_cls):
            inconsistencies.append("type of existing model '{}' doesn't equal type of migrated model '{}'".format(
                type(existing_model_cls).__name__, type(migrated_model_cls).__name__))

        # constraint: names of existing_model and migrated_model classes must match their names in the models map
        if existing_model_cls.__name__ != existing_model:
            inconsistencies.append("name of existing model class '{}' not equal to its name in the models map '{}'".format(
                existing_model_cls.__name__, existing_model))
        if migrated_model_cls.__name__ != migrated_model:
            inconsistencies.append("name of migrated model class '{}' not equal to its name in the models map '{}'".format(
                migrated_model_cls.__name__, migrated_model))
        migrated_model_cls = self.migrated_defs[migrated_model]
        expected_migrated_model_name = self.models_map[existing_model]
        if migrated_model_cls.__name__ != expected_migrated_model_name:
            inconsistencies.append("models map says '{}' migrates to '{}', but _get_inconsistencies parameters "
                                   "say '{}' migrates to '{}'".format(existing_model, expected_migrated_model_name, existing_model,
                                                                      migrated_model))
        if inconsistencies:
            # given these inconsistencies the checks below would not be informative
            return inconsistencies

        # constraint: the types of attributes in existing_model and migrated_model classes must match
        for existing_attr_name, existing_attr in existing_model_cls.Meta.attributes.items():
            migrated_class, migrated_attr = self._get_mapped_attribute(existing_model, existing_attr_name)
            # skip if the attr isn't migrated
            if migrated_attr:
                migrated_attr = migrated_model_cls.Meta.attributes[migrated_attr]
                if type(existing_attr).__name__ != type(migrated_attr).__name__:
                    inconsistencies.append("existing attribute {}.{} type {} differs from its "
                                           "migrated attribute {}.{} type {}".format(
                                               existing_model, existing_attr_name,
                                               type(existing_attr).__name__, migrated_class, migrated_attr, type(migrated_attr).__name__))
        if inconsistencies:
            # given these inconsistencies the checks below would not be informative
            return inconsistencies

        # constraint: related names and types of related attributes in existing_model and migrated_model classes must match
        related_attrs_to_check = ['related_name', 'primary_class', 'related_class']
        for existing_attr_name, existing_attr in existing_model_cls.Meta.attributes.items():
            migrated_class, migrated_attr = self._get_mapped_attribute(existing_model, existing_attr_name)
            if migrated_attr and isinstance(existing_attr, obj_tables.RelatedAttribute):
                migrated_attr = migrated_model_cls.Meta.attributes[migrated_attr]
                for rel_attr in related_attrs_to_check:
                    existing_rel_attr = getattr(existing_attr, rel_attr)
                    migrated_rel_attr = getattr(migrated_attr, rel_attr)
                    if isinstance(existing_rel_attr, str) and isinstance(migrated_rel_attr, str):
                        if existing_rel_attr != migrated_rel_attr:
                            inconsistencies.append("{}.{}.{} is '{}', which differs from the migrated value "
                                                   "of {}.{}.{}, which is '{}'".format(
                                                       existing_model, existing_attr_name, rel_attr,
                                                       existing_rel_attr, migrated_class, migrated_attr, rel_attr, migrated_rel_attr))
                    else:
                        # the attributes are models
                        existing_rel_attr_name = existing_rel_attr.__name__
                        migrated_rel_attr_name = migrated_rel_attr.__name__
                        if existing_rel_attr_name in self.deleted_models:
                            inconsistencies.append("existing model '{}' is not migrated, "
                                                   "but is referenced by migrated attribute {}.{}".format(existing_rel_attr_name,
                                                                                                          migrated_class, migrated_attr))
                        else:
                            expected_migrated_rel_attr = self.models_map[existing_rel_attr_name]
                            if migrated_rel_attr_name != expected_migrated_rel_attr:
                                inconsistencies.append("{}.{}.{} is '{}', which migrates to '{}', but it "
                                                       "differs from {}.{}.{}, which is '{}'".format(
                                                           existing_model, existing_attr_name, rel_attr,
                                                           existing_rel_attr_name, expected_migrated_rel_attr,
                                                           migrated_class, migrated_attr, rel_attr, migrated_rel_attr_name))
        return inconsistencies

    def _get_existing_model_order(self, existing_file):
        """ Obtain the sequence in which models appear in the source file

        First determine the order of existing model types in worksheets in the source file. However, the
        mapping of some worksheets to models may be ambiguous. Then map the order to the existing models
        that will migrate.

        Args:
            existing_file (:obj:`str`): pathname of file being migrated

        Returns:
            :obj:`list` of `obj_tables.core.ModelMeta`: existing models in the same order as worksheets
                in `existing_file`, followed by existing models with ambiguous sheet names
        """
        _, ext = os.path.splitext(existing_file)
        utils_reader = wc_utils.workbook.io.get_reader(ext)(existing_file)
        utils_reader.initialize_workbook()
        sheet_names = utils_reader.get_sheet_names()

        existing_models_migrating = [self.existing_defs[model_name] for model_name in self.models_map.keys()]

        # use the existing_file sheet names to establish the order of existing models
        model_order = [None] * len(sheet_names)
        ambiguous_models = []
        for existing_model in existing_models_migrating:
            try:
                sheet_name = WorkbookReader.get_model_sheet_name(sheet_names, existing_model)
                if sheet_name is not None:
                    model_order[sheet_names.index(sheet_name)] = existing_model
            except ValueError:
                ambiguous_models.append(existing_model)
        model_order = [element for element in model_order if element is not None]

        # append models with ambiguous sheets
        model_order.extend(ambiguous_models)

        return model_order

    def _migrate_model_order(self, model_order):
        """ Migrate the sequence of models from the existing order to a migrated order

        Map the order of existing models to an order for migrated models. The migrated order can be
        used to sequence worksheets or files in migrated file(s).

        Args:
            model_order (:obj:`list` of `obj_tables.core.ModelMeta`:): order of existing models

        Returns:
            :obj:`list` of `obj_tables.core.ModelMeta`: migrated models in the same order as
                the corresponding existing models, followed by migrated models sorted by name
        """

        model_type_map = {}
        for existing_model, migrated_model in self.models_map.items():
            model_type_map[self.existing_defs[existing_model]] = self.migrated_defs[migrated_model]

        migrated_model_order = []
        for existing_model in model_order:
            try:
                migrated_model_order.append(model_type_map[existing_model])
            except KeyError:
                raise MigratorError("model '{}' not found in the model map".format(
                    existing_model.__name__))

        # append newly created models
        migrated_model_names = [migrated_model_name
                                for migrated_model_name in set(self.migrated_defs).difference(self.models_map.values())]
        migrated_models = [self.migrated_defs[model_name] for model_name in sorted(migrated_model_names)]
        migrated_model_order.extend(migrated_models)

        return migrated_model_order

    @staticmethod
    def _get_models_with_worksheets(models):
        """ Select subset of `models` that are stored in a worksheet or file

        Args:
            models (:obj:`dict` of `obj_tables.core.ModelMeta`): model classes keyed by name

        Returns:
            :obj:`str`: name of migrated file
        """
        return [model for model in models.values()
                if model.Meta.table_format not in [TableFormat.cell, TableFormat.multiple_cells]]

    def read_existing_file(self, existing_file):
        """ Read models from existing file

        Does not perform validation -- data in existing model file must be already validated with
        the existing schema

        Args:
            existing_file (:obj:`str`): pathname of file to migrate

        Returns:
            :obj:`list` of `obj_tables.Model`: the models in `existing_file`
        """
        Reader = self.io_classes['reader']
        # ignore_sheet_order because models obtained by inspect.getmembers() are returned in name order
        # data in model files must be already validated with the existing schema
        existing_models = Reader().run(existing_file, models=self._get_models_with_worksheets(self.existing_defs),
                                       allow_multiple_sheets_per_model=True,
                                       ignore_attribute_order=True,
                                       ignore_sheet_order=True,
                                       include_all_attributes=False,
                                       validate=False)
        if isinstance(existing_models, dict):
            models = []
            for obj_list in existing_models.values():
                models.extend(obj_list)
            existing_models = models
        existing_models = obj_tables.core.Model.get_all_related(existing_models)
        return existing_models

    def migrate(self, existing_models):
        """ Migrate existing model instances to the migrated schema

        Args:
            existing_models (:obj:`list` of `obj_tables.Model`:) the models being migrated

        Returns:
            :obj:`list` of `obj_tables.Model`: the migrated models
        """
        all_models = self._deep_migrate(existing_models)
        self._connect_models(all_models)
        migrated_models = [migrated_model for _, migrated_model in all_models]
        return migrated_models

    @staticmethod
    def path_of_migrated_file(existing_file, migrate_suffix=None, migrate_in_place=False):
        """ Determine the pathname of the migrated file

        Args:
            existing_file (:obj:`str`): pathname of file being migrated
            migrate_suffix (:obj:`str`, optional): suffix of automatically created migrated filename;
                default is `Migrator.MIGRATE_SUFFIX`
            migrate_in_place (:obj:`bool`, optional): if set, migrated file is `existing_file`, which
                will be overwritten

        Returns:
            :obj:`str`: name of migrated file
        """
        if migrate_in_place:
            return existing_file
        root, ext = os.path.splitext(existing_file)
        if migrate_suffix is None:
            migrate_suffix = Migrator.MIGRATE_SUFFIX
        migrated_file = os.path.join(os.path.dirname(existing_file),
                                     os.path.basename(root) + migrate_suffix + ext)
        return migrated_file

    # todo: unittest
    def write_migrated_file(self, migrated_models, model_order, existing_file, migrated_file=None,
                            migrate_suffix=None, migrate_in_place=False):
        """ Write migrated models to an external representation

        Does not perform validation -- validation must be performed independently.

        Args:
            migrated_models (:obj:`list` of `obj_tables.Model`:) the migrated models
            model_order (:obj:`list` of `obj_tables.core.ModelMeta`:) migrated models in the order they should appear in a workbook
            existing_file (:obj:`str`): pathname of file that is being migrated

            migrated_file (:obj:`str`, optional): pathname of migrated file; if not provided, save
                migrated file with migrated suffix in same directory as source file

            migrate_suffix (:obj:`str`, optional): suffix of automatically created migrated filename; default
                is :obj:`Migrator.MIGRATE_SUFFIX`

            migrate_in_place (:obj:`bool`, optional): if set, overwrite `existing_file` with the
                migrated file and ignore `migrated_file` and :obj:`migrate_suffix`

        Returns:
            :obj:`str`: name of migrated file

        Raises:
            :obj:`MigratorError`: if migrate_in_place is False and writing the migrated file would
                overwrite an existing file
        """
        if not migrated_file:
            migrated_file = self.path_of_migrated_file(existing_file,
                                                       migrate_suffix=migrate_suffix, migrate_in_place=migrate_in_place)

        if not migrate_in_place and os.path.exists(migrated_file):
            raise MigratorError("migrated file '{}' already exists".format(migrated_file))

        # write migrated models to disk
        Writer = self.io_classes['writer']
        Writer().run(migrated_file, migrated_models, models=model_order, validate=False,
                     data_repo_metadata=True)
        return migrated_file

    def _migrate_with_transformations(self, existing_models):
        """ Migrate existing models, and transform them with methods in `transformations`, if they're defined

        Args:
            existing_models (:obj:`list` of :obj:`obj_tables.Model`:) the existing models

        Returns:
            :obj:`list` of :obj:`obj_tables.Model`: the migrated models, including any transformations
                by `transformations`
        """
        # if it exists, execute prepare_existing_models in `transformations`
        if self.transformations:
            self.transformations.prepare_existing_models(self, existing_models)
        for count_uninitialized_attrs in self._check_models(existing_models):
            warn(count_uninitialized_attrs, MigrateWarning)
        migrated_models = self.migrate(existing_models)
        # if it exists, execute modify_migrated_models in `transformations`
        if self.transformations:
            self.transformations.modify_migrated_models(self, migrated_models)
        return migrated_models

    def _write(self, existing_file, migrated_models, schema_metadata_model=None, migrated_file=None,
               migrate_suffix=None, migrate_in_place=False):
        """ Write migrated models to a migrated file

        :obj:`Migrator` doesn't use the `schema_package` argument to :obj:`Writer.run` because the directory
        containing the migrated schema repo is not on :obj:`sys.path`.

        Args:
            existing_file (:obj:`str`): pathname of file being migrated
            migrated_models (:obj:`list` of :obj:`obj_tables.Model`:) the migrated models
            schema_metadata_model (:obj:`SchemaRepoMetadata`): a git metadata model for the
                migrated models schema
            migrated_file (:obj:`str`, optional): pathname of migrated file; if not provided,
                overwrite :obj:`existing_file` or save migrated file with migrated suffix in same directory
                as existing file
            migrate_suffix (:obj:`str`, optional): suffix of automatically created migrated filename;
                default is :obj:`Migrator.MIGRATE_SUFFIX`
            migrate_in_place (:obj:`bool`, optional): if set, overwrite :obj:`existing_file` with the
                migrated file and ignore :obj:`migrated_file` and :obj:`migrate_suffix`

        Returns:
            :obj:`str`: name of the migrated file

        Raises:
            :obj:`MigratorError`: if migrate_in_place is False and writing the migrated file would
                overwrite an existing file
        """
        # get sequence of migrated models in workbook of existing file
        existing_model_order = self._get_existing_model_order(existing_file)
        migrated_model_order = self._migrate_model_order(existing_model_order)
        # write schema git metadata, so the file can be migrated again
        if schema_metadata_model:
            # put SchemaRepoMetadata at front of migrated_model_order
            migrated_model_order.insert(0, SchemaRepoMetadata)
            migrated_models.append(schema_metadata_model)
        return self.write_migrated_file(migrated_models, migrated_model_order, existing_file,
                                        migrated_file=migrated_file, migrate_suffix=migrate_suffix, migrate_in_place=migrate_in_place)

    def full_migrate(self, existing_file, migrated_file=None, migrate_suffix=None, migrate_in_place=False):
        """ Migrate data from an existing file to a migrated file

        Args:
            existing_file (:obj:`str`): pathname of file to migrate
            migrated_file (:obj:`str`, optional): pathname of migrated file; if not provided,
                save migrated file with migrated suffix in same directory as existing file
            migrate_suffix (:obj:`str`, optional): suffix of automatically created migrated filename;
                default is :obj:`Migrator.MIGRATE_SUFFIX`
            migrate_in_place (:obj:`bool`, optional): if set, overwrite :obj:`existing_file` with the
                migrated file and ignore :obj:`migrated_file` and :obj:`migrate_suffix`

        Returns:
            :obj:`str`: name of migrated file

        Raises:
            :obj:`MigratorError`: if migrate_in_place is False and writing the migrated file would
                overwrite an existing file
        """
        existing_models = self.read_existing_file(existing_file)
        migrated_models = self._migrate_with_transformations(existing_models)
        return self._write(existing_file, migrated_models, migrated_file=migrated_file,
                           migrate_suffix=migrate_suffix, migrate_in_place=migrate_in_place)

    def _check_model(self, existing_model, existing_model_def):
        """ Check a model instance against its definition

        Args:
            existing_model (:obj:`obj_tables.Model`): the existing model
            existing_model_def (:obj:`obj_tables.core.ModelMeta`): type of the existing model

        Returns:
            :obj:`list`: uninitialized attributes in :obj:`existing_model`
        """
        uninitialized_attrs = []

        # are attributes in existing_model_def missing or uninitialized in existing_model
        for attr_name, attr in existing_model_def.Meta.attributes.items():
            if not hasattr(existing_model, attr_name) or \
                    getattr(existing_model, attr_name) is attr.get_default_cleaned_value():
                uninitialized_attrs.append("instance(s) of existing model '{}' lack(s) '{}' non-default value".format(
                    existing_model_def.__name__, attr_name))

        return uninitialized_attrs

    def _check_models(self, existing_models):
        """ Check existing model instances against their definitions

        Args:
            existing_models (:obj:`list` of :obj:`obj_tables.Model`) the models being migrated

        Returns:
            :obj:`list`: counts of uninitialized attributes in :obj:`existing_models`
        """
        existing_models_dict = dict_by_class(existing_models)
        uninitialized_attrs = []
        for existing_model_def, existing_models in existing_models_dict.items():
            for existing_model in existing_models:
                uninitialized_attrs.extend(self._check_model(existing_model, existing_model_def))
        counts_uninitialized_attrs = []
        for uninitialized_attr, count in det_count_elements(uninitialized_attrs):
            counts_uninitialized_attrs.append("{} {}".format(count, uninitialized_attr))
        return counts_uninitialized_attrs

    def _migrate_model(self, existing_model, existing_model_def, migrated_model_def):
        """ Migrate a model instance's non-related attributes

        Args:
            existing_model (:obj:`obj_tables.Model`): an existing model instance
            existing_model_def (:obj:`obj_tables.core.ModelMeta`): type of the existing model
            migrated_model_def (:obj:`obj_tables.core.ModelMeta`): type of the migrated model

        Returns:
            :obj:`obj_tables.Model`: a :obj:`migrated_model_def` instance migrated from :obj:`existing_model`
        """
        migrated_model = migrated_model_def()

        # copy non-Related attributes from existing_model to migrated_model
        for attr in existing_model_def.Meta.attributes.values():
            val = getattr(existing_model, attr.name)

            # skip attributes that do not get migrated to migrated_model_def
            _, migrated_attr = self._get_mapped_attribute(existing_model_def, attr)
            if migrated_attr is None:
                continue

            if not isinstance(attr, obj_tables.RelatedAttribute):
                if val is None:
                    copy_val = val
                elif isinstance(val, (str, bool, int, float, Enum, )):
                    copy_val = val
                elif isinstance(attr, obj_tables.sci.onto.OntoTermAttribute):
                    # pronto does not properly implement deepcopy
                    # temporarily share refs to OntoTermAttribute between existing and migrated models
                    copy_val = val
                else:
                    copy_val = copy.deepcopy(val)

                setattr(migrated_model, migrated_attr, copy_val)

        # save a reference to migrated_model in existing_model, which is used by _connect_models()
        setattr(existing_model, self._migrated_copy_attr_name, migrated_model)
        return migrated_model

    def _migrate_expression(self, existing_analyzed_expr):
        """ Migrate a model instance's :obj:`ParsedExpression.expression`

        The :obj:`ParsedExpression` syntax supports model type names in a Name.model_id notation.
        If a model type name changes then these must be migrated.

        Args:
            existing_analyzed_expr (:obj:`ParsedExpression`): an existing model's :obj:`ParsedExpression`

        Returns:
            :obj:`str`: a migrated :obj:`existing_analyzed_expr.expression`
        """
        from obj_tables.math.expression import ObjTablesTokenCodes

        changed_model_types = {}
        for existing_model_type in existing_analyzed_expr.related_objects.keys():
            existing_model_type_name = existing_model_type.__name__
            if existing_model_type_name != self.models_map[existing_model_type_name]:
                changed_model_types[existing_model_type_name] = self.models_map[existing_model_type_name]

        if not changed_model_types:
            # migration doesn't change data; if model type names didn't change then expression hasn't either
            return existing_analyzed_expr.expression
        else:
            # rename changed model type names used in ModelType.id notation
            wc_token_strings = []
            for wc_token in existing_analyzed_expr._obj_tables_tokens:
                wc_token_string = wc_token.token_string
                if wc_token.code == ObjTablesTokenCodes.obj_id and \
                        wc_token.model_type.__name__ in changed_model_types and \
                        wc_token_string.startswith(wc_token.model_type.__name__ + '.'):

                    # disambiguated referenced model
                    migrated_model_type_name = changed_model_types[wc_token.model_type.__name__]
                    migrated_wc_token_string = wc_token_string.replace(wc_token.model_type.__name__ + '.',
                                                                       migrated_model_type_name + '.')
                    wc_token_strings.append(migrated_wc_token_string)
                else:
                    wc_token_strings.append(wc_token_string)

            migrated_expr_wo_spaces = ''.join(wc_token_strings)
            return existing_analyzed_expr.recreate_whitespace(migrated_expr_wo_spaces)

    def _migrate_analyzed_expr(self, existing_model, migrated_model, migrated_models):
        """ Run the migration of a model instance's :obj:`ParsedExpression` attribute, if it has one

        This must be done after all migrated models have been created. The migrated :obj:`ParsedExpression`
        is assigned to the appropriate attribute in :obj:`migrated_model`.

        Args:
            existing_model (:obj:`obj_tables.Model`): the existing model
            migrated_model (:obj:`obj_tables.Model`): the corresponding migrated model
            migrated_models (:obj:`dict`): dict of Models; maps migrated model type to a dict mapping
                migrated model ids to migrated model instances

        Raises:
            :obj:`MigratorError`: if the :obj:`ParsedExpression` in :obj:`existing_model` cannot be migrated
        """
        if hasattr(existing_model, self.PARSED_EXPR):
            from obj_tables.math.expression import ParsedExpression

            existing_analyzed_expr = getattr(existing_model, self.PARSED_EXPR)
            migrated_attribute = self.PARSED_EXPR
            migrated_expression = self._migrate_expression(existing_analyzed_expr)
            migrated_given_model_types = []
            for existing_model_type in existing_analyzed_expr.related_objects.keys():
                migrated_given_model_types.append(self.migrated_defs[self.models_map[existing_model_type.__name__]])
            parsed_expr = ParsedExpression(migrated_model.__class__, migrated_attribute, migrated_expression, migrated_models)
            _, _, errors = parsed_expr.tokenize()
            if errors:
                raise MigratorError('\n'.join(errors))
            setattr(migrated_model, self.PARSED_EXPR, parsed_expr)

    def _migrate_all_analyzed_exprs(self, all_models):
        """ Migrate all model instances' :obj:`ParsedExpression`\ s

        This must be done after all migrated models have been created.

        Args:
            all_models (:obj:`list` of :obj:`tuple`): pairs of corresponding existing and migrated model instances

        Raises:
            :obj:`MigratorError`: if multiple instances of a model type have the same id
        """
        errors = []
        migrated_models = {}
        for _, migrated_model in all_models:
            if migrated_model.__class__ not in migrated_models:
                migrated_models[migrated_model.__class__] = {}
            # ignore models that do not have an 'id' attribute
            if hasattr(migrated_model, 'id'):
                id = migrated_model.id
                if id in migrated_models[migrated_model.__class__]:
                    errors.append("model type '{}' has duplicated id: '{}' ".format(
                        migrated_model.__class__.__name__, id))
                migrated_models[migrated_model.__class__][id] = migrated_model
        if errors:
            raise MigratorError('\n'.join(errors))

        for existing_model, migrated_model in all_models:
            self._migrate_analyzed_expr(existing_model, migrated_model, migrated_models)

    def _deep_migrate(self, existing_models):
        """ Migrate all model instances from the existing schema to the migrated schema

        Supports:

        * delete attributes from the existing schema
        * add attributes in the migrated schema
        * add model definitions in the migrated schema
        * models with expressions

        Assumes that otherwise the schemas are identical

        Args:
            existing_models (:obj:`list` of :obj:`obj_tables.Model`): the existing models

        Returns:
            :obj:`list`: list of (existing model, corresponding migrated model) pairs
        """
        existing_schema = self.existing_defs
        migrated_schema = self.migrated_defs

        existing_migrated_pairs = []
        for existing_model in existing_models:
            existing_class_name = existing_model.__class__.__name__

            # do not migrate model instances whose classes are not in the migrated schema
            if existing_class_name in self.deleted_models:
                continue

            migrated_class_name = self.models_map[existing_class_name]
            migrated_model = self._migrate_model(existing_model, existing_schema[existing_class_name],
                                                 migrated_schema[migrated_class_name])
            existing_migrated_pairs.append((existing_model, migrated_model))
        self._migrate_all_analyzed_exprs(existing_migrated_pairs)
        return existing_migrated_pairs

    def _connect_models(self, existing_migrated_pairs):
        """ Connect migrated model instances

        Migrate :obj:`obj_tables.RelatedAttribute` connections among existing models to migrated models

        Args:
            existing_migrated_pairs (:obj:`list` of :obj:`tuple`): pairs of corresponding existing and
                migrated model instances
        """
        for existing_model, migrated_model in existing_migrated_pairs:
            existing_model_cls = existing_model.__class__
            for attr_name, attr in existing_model_cls.Meta.attributes.items():

                # skip attributes that are not in migrated_model_def
                _, migrated_attr = self._get_mapped_attribute(existing_model_cls, attr)
                if migrated_attr is None:
                    continue

                if isinstance(attr, obj_tables.RelatedAttribute):
                    val = getattr(existing_model, attr_name)
                    if val is None:
                        migrated_val = val
                    elif isinstance(val, obj_tables.core.Model):
                        migrated_val = getattr(val, self._migrated_copy_attr_name)
                    elif isinstance(val, (set, list, tuple)):
                        migrated_val = []
                        for model in val:
                            migrated_val.append(getattr(model, self._migrated_copy_attr_name))
                    else:
                        # unreachable due to other error checking
                        raise MigratorError('Invalid related attribute value')  # pragma: no cover

                    setattr(migrated_model, migrated_attr, migrated_val)

        for existing_model, migrated_model in existing_migrated_pairs:
            # delete the reference to migrated_model in existing_model
            delattr(existing_model, self._migrated_copy_attr_name)

    def run(self, files):
        """ Migrate some files

        Args:
            files (:obj:`list`): names of model files to migrate
        """
        migrated_files = []
        for file in files:
            migrated_files.append(self.full_migrate(normalize_filename(file)))
        return migrated_files

    def __str__(self):
        """ Get string representation

        Returns:
            :obj:`str`: string representation of a :obj:`Migrator`; collections attributes are rendered
                by :obj:`pformat`
        """
        rv = []
        for attr in self.SCALAR_ATTRS:
            if hasattr(self, attr) and getattr(self, attr) is not None:
                rv.append("{}: {}".format(attr, getattr(self, attr)))
        for attr in self.COLLECTIONS_ATTRS:
            if hasattr(self, attr):
                rv.append("{}:\n{}".format(attr, pformat(getattr(self, attr))))
        return '\n'.join(rv)


class MigrationWrapper(ABC):
    """ Interface for classes that define a pair of methods that can modify :obj:`obj_tables.Model`\ s being migrated

    :obj:`MigrationWrapper` defines the interface used by `transformations`. If it's defined, `transformations`
    uses :obj:`prepare_existing_models` to modify existing models just before they are migrated and uses
    :obj:`modify_migrated_models` to modify migrated models just after they are migrated.

    A `transformations` is associated with each :obj:`SchemaChanges`, and obtained from the
    transformations file configured in a schema changes file.
    """

    @abstractmethod
    def prepare_existing_models(self, migrator, existing_models):    # pragma: no cover
        """ Prepare existing models before migration

        Args:
            migrator (:obj:`Migrator`:) the :obj:`Migrator` calling this method
            existing_models (:obj:`list` of :obj:`obj_tables.Model`:) the models that will be migrated
        """
        pass

    @abstractmethod
    def modify_migrated_models(self, migrator, migrated_models):    # pragma: no cover
        """ Modify migrated models after migration

        Args:
            migrator (:obj:`Migrator`:) the :obj:`Migrator` calling this method
            migrated_models (:obj:`list` of :obj:`obj_tables.Model`:) all models that have been migrated
        """
        pass

    @staticmethod
    def import_migration_wrapper(migration_wrapper_file):
        """ Import the migration wrapper instances defined in a Python file

        Args:
            migration_wrapper_file (:obj:`str`:) name of a file that defines a migration wrappers

        Returns:
            :obj:`dict`: the :obj:`MigrationWrapper` instances in :obj:`migration_wrapper_file`, keyed by their
                attribute names

        Raises:
            :obj:`MigratorError`: if :obj:`migration_wrapper_file` cannot be imported
        """
        # import MigrationWrappers from migration_wrapper_file
        dirname = os.path.dirname(migration_wrapper_file)
        schema_module = SchemaModule(migration_wrapper_file, dir=dirname)
        imported_module = schema_module.import_module_for_migration(validate=False)

        # confirm that it defines a some MigrationWrappers
        migration_wrappers = {}
        for attr in dir(imported_module):
            if isinstance(getattr(imported_module, attr), MigrationWrapper):
                migration_wrappers[attr] = getattr(imported_module, attr)

        if not migration_wrappers:
            raise MigratorError("'{}' does not define any MigrationWrapper instances".format(
                migration_wrapper_file))

        return migration_wrappers


class MigrationSpec(object):
    """ Specification of a sequence of migrations for a list of existing files

    Attributes:
        _REQUIRED_ATTRS (:obj:`list` of :obj:`str`): required attributes in a :obj:`MigrationSpec`
        _CHANGES_LISTS (:obj:`list` of :obj:`str`): lists of changes in a migration
        _ALLOWED_ATTRS (:obj:`list` of :obj:`str`): attributes allowed in a :obj:`MigrationSpec`
        name (:obj:`str`): name for this :obj:`MigrationSpec`
        existing_files (:obj:`list`: of :obj:`str`, optional): existing files to migrate
        schema_files (:obj:`list` of :obj:`str`, optional): list of Python files containing model
            definitions for each state in a sequence of migrations
        seq_of_renamed_models (:obj:`list` of :obj:`list`, optional): list of renamed models for use
            by a :obj:`Migrator` for each migration in a sequence of migrations
        seq_of_renamed_attributes (:obj:`list` of :obj:`list`, optional): list of renamed attributes
            for use by a :obj:`Migrator` for each migration in a sequence
        seq_of_transformations (:obj:`list` of :obj:`MigrationWrapper`, optional): list of transformations
            for use by a :obj:`Migrator` for each migration in a sequence
        migrated_files (:obj:`list`: of :obj:`str`, optional): migration destination files in 1-to-1
            correspondence with :obj:`existing_files`; if not provided, migrated files use a suffix or
            are migrated in place
        io_classes (:obj:`dict` of :obj:`type`, optional): reader and/or writer for I/O of existing and
            migrated files, respectively; IMPORTANT NOTE: `io_classes` are imported and loaded from
            current schema repositories by :obj:`DataSchemaMigration`
        migrate_suffix (:obj:`str`, optional): suffix added to destination file name, before the file type suffix
        migrate_in_place (:obj:`bool`, optional): whether to migrate in place
        migrations_config_file (:obj:`str`, optional): if created from a configuration file, the file's
            path
        final_schema_branch (:obj:`str`, optional): branch of the final schema repo
        final_schema_url (:obj:`str`, optional): base url of the final schema repo
        final_schema_hash (:obj:`str`, optional): hash of the head commit of the final schema repo
        final_schema_git_metadata (:obj:`SchemaRepoMetadata`, optional): a git metadata model for the repo
            containing the last schema in the migration; may initialized directly, or constructed
            from the other ``final_schema_*`` attributes
        _prepared (:obj:`bool`, optional): whether this :obj:`MigrationSpec` has been prepared
    """

    _REQUIRED_ATTRS = ['name', 'existing_files', 'schema_files']
    _CHANGES_LISTS = ['seq_of_renamed_models', 'seq_of_renamed_attributes', 'seq_of_transformations']
    _ALLOWED_ATTRS = _REQUIRED_ATTRS + _CHANGES_LISTS + ['migrated_files', 'io_classes',
                                                         'migrate_suffix', 'migrate_in_place', 'migrations_config_file',
                                                         'final_schema_branch', 'final_schema_url',
                                                         'final_schema_hash', 'final_schema_git_metadata',
                                                         '_prepared']

    def __init__(self, name, **kwargs):
        for attr in self._ALLOWED_ATTRS:
            setattr(self, attr, None)

        # check for extra attributes
        extra_attrs = set(kwargs) - set(self._ALLOWED_ATTRS)
        if extra_attrs:
            raise MigratorError("initialization of MigrationSpec supports ({}) but extra attribute(s) "
                                "were provided: {}".format(set(self._ALLOWED_ATTRS), extra_attrs))

        # initialize attributes provided
        for attr, value in kwargs.items():
            setattr(self, attr, value)

        self.name = name
        self._prepared = False

    def prepare(self):
        """ Validate and standardize this :obj:`MigrationSpec`

        Raises:
            :obj:`MigratorError`: if `migrations_config_file` cannot be read, or the migration specifications in
                `migrations_config_file` are not valid
        """
        migration_errors = self.validate()
        if migration_errors:
            raise MigratorError('\n'.join(migration_errors))
        self.standardize()
        self._prepared = True

    def is_prepared(self):
        """ Check that this :obj:`MigrationSpec` has been prepared

        Raises:
            :obj:`MigratorError`: if this :obj:`MigrationSpec` has not been prepared
        """
        if not self._prepared:
            raise MigratorError("MigrationSpec '{}' is not prepared".format(self.name))

    @classmethod
    def load(cls, migrations_config_file):
        """ Create a list of validated and standardized :obj:`MigrationSpec`\ s from a migrations configuration file

        Args:
            migrations_config_file (:obj:`str`): pathname of migrations configuration in YAML file

        Returns:
            :obj:`dict` of :obj:`MigrationSpec`: migration specifications

        Raises:
            :obj:`MigratorError`: if :obj:`migrations_config_file` cannot be read, or the migration
                specifications in :obj:`migrations_config_file` are not valid
        """
        migration_specs = cls.get_migrations_config(migrations_config_file)

        migration_errors = []
        for migration_spec_obj in migration_specs.values():
            migration_errors.extend(migration_spec_obj.validate())
        if migration_errors:
            raise MigratorError('\n'.join(migration_errors))
        for migration_spec_obj in migration_specs.values():
            migration_spec_obj.standardize()
            migration_spec_obj._prepared = True

        return migration_specs

    @staticmethod
    def get_migrations_config(migrations_config_file):
        """ Create a list of :obj:`MigrationSpec`\ s from a migrations configuration file

        Args:
            migrations_config_file (:obj:`str`): pathname of migrations configuration in YAML file

        Returns:
            :obj:`dict` of :obj:`MigrationSpec`: migration specifications

        Raises:
            :obj:`MigratorError`: if :obj:`migrations_config_file` cannot be read
        """
        try:
            fd = open(migrations_config_file, 'r')
        except FileNotFoundError:
            raise MigratorError("could not read migration config file: '{}'".format(migrations_config_file))

        try:
            migrations_config = yaml.load(fd, Loader=yaml.FullLoader)
        except yaml.YAMLError as e:
            raise MigratorError("could not parse YAML migration config file: '{}':\n{}".format(
                migrations_config_file, e))

        # parse the migrations config
        migration_specs = {}
        for migration_name, migration_desc in migrations_config.items():
            migration_spec = MigrationSpec(migration_name, migrations_config_file=migrations_config_file)
            for param, value in migration_desc.items():
                setattr(migration_spec, param, value)
            migration_specs[migration_name] = migration_spec

        return migration_specs

    def validate(self):
        """ Validate the attributes of a migration specification

        Returns:
            :obj:`list` of :obj:`str`: list of errors found
        """
        errors = []
        # check all attributes of self that aren't classes, methods, functions, or private
        members = inspect.getmembers(self, lambda a:
                                     not(inspect.isclass(a) or inspect.ismethod(a) or inspect.isfunction(a)))
        members = [attr for attr, value in members if not attr.startswith('_')]

        extra_attrs = set(members).difference(self._ALLOWED_ATTRS)
        if extra_attrs:
            errors.append("disallowed attribute(s) found: {}".format(extra_attrs))

        for required_attr in self._REQUIRED_ATTRS:
            if not hasattr(self, required_attr) or getattr(self, required_attr) is None:
                errors.append("missing required attribute '{}'".format(required_attr))
        if errors:
            return errors

        if len(self.schema_files) < 2:
            return ["a migration spec must contain at least 2 schemas, but it has only {}".format(
                len(self.schema_files))]

        for changes_list in self._CHANGES_LISTS:
            if getattr(self, changes_list) is not None:
                if len(getattr(self, changes_list)) != len(self.schema_files) - 1:
                    errors.append("{} must have 1 mapping for each of the {} migration(s) specified, "
                                  "but it has {}".format(changes_list, len(self.schema_files) - 1,
                                                         len(getattr(self, changes_list))))

        if self.seq_of_renamed_models:
            required_structure = "seq_of_renamed_models must be None, or a list of lists of pairs of strings"
            try:
                for renaming in self.seq_of_renamed_models:
                    if renaming is not None:
                        for pair in renaming:
                            if len(pair) != 2 or not isinstance(pair[0], str) or not isinstance(pair[1], str):
                                errors.append(required_structure)
            except TypeError as e:
                errors.append(required_structure + ", but examining it raises a '{}' error".format(str(e)))

        if self.seq_of_renamed_attributes:
            required_structure = ("seq_of_renamed_attributes must be None, or a list of lists of pairs "
                                  "of pairs of strings")
            try:
                for renamings in self.seq_of_renamed_attributes:
                    if renamings is not None:
                        for attribute_renaming in renamings:
                            for attr_spec in attribute_renaming:
                                if len(attr_spec) != 2 or not isinstance(attr_spec[0], str) or \
                                        not isinstance(attr_spec[1], str):
                                    errors.append(required_structure)
            except TypeError as e:
                errors.append(required_structure + ", but examining it raises a '{}' error".format(str(e)))

        if len(self.existing_files) < 1:
            errors.append("at least one existing file must be specified")

        # ensure that migrated_files is empty or specifies same count as existing_files
        if self.migrated_files is not None and len(self.migrated_files) != len(self.existing_files):
            errors.append("existing_files and migrated_files must provide 1-to-1 corresponding files, "
                          "but they have {} and {} entries, respectively".format(len(self.existing_files),
                                                                                 len(self.migrated_files)))

        return errors

    @staticmethod
    def _normalize_filenames(filenames, absolute_file=None):
        """ Normalize filenames relative to directory containing existing file

        Args:
            filenames (:obj:`list` of :obj:`str`): list of filenames
            absolute_file (:obj:`str`, optional): file whose directory contains files in :obj:`filenames`

        Returns:
            :obj:`list` of :obj:`str`: absolute paths for files in :obj:`filenames`
        """
        dir = None
        if absolute_file:
            dir = os.path.dirname(absolute_file)
        return [normalize_filename(filename, dir=dir) for filename in filenames]

    def standardize(self):
        """ Standardize the attributes of a :obj:`MigrationSpec`

        In particular, standardize a :obj:`MigrationSpec` that has been read from a YAML config file
        """
        # convert [model, attr] pairs in seq_of_renamed_attributes into tuples; needed for hashing
        if self.seq_of_renamed_attributes:
            migrated_renamed_attributes = []
            for renamed_attrs_in_a_migration in self.seq_of_renamed_attributes:
                if renamed_attrs_in_a_migration == []:
                    migrated_renamed_attributes.append(None)
                    continue
                a_migration_renaming = []
                for existing, migrated in renamed_attrs_in_a_migration:
                    a_migration_renaming.append((tuple(existing), tuple(migrated)))
                migrated_renamed_attributes.append(a_migration_renaming)
            self.seq_of_renamed_attributes = migrated_renamed_attributes

        # if a changes_list isn't provided, replace it with a list of Nones indicating no changes
        empty_per_migration_list = [None] * (len(self.schema_files) - 1)
        for changes_list in self._CHANGES_LISTS:
            if getattr(self, changes_list) is None:
                setattr(self, changes_list, empty_per_migration_list)

        # normalize filenames
        if self.migrations_config_file:
            self.existing_files = self._normalize_filenames(self.existing_files,
                                                            absolute_file=self.migrations_config_file)
            self.schema_files = self._normalize_filenames(self.schema_files,
                                                          absolute_file=self.migrations_config_file)
            if self.migrated_files:
                self.migrated_files = self._normalize_filenames(self.migrated_files,
                                                                absolute_file=self.migrations_config_file)

        # convert schema metadata into a SchemaRepoMetadata object
        if not self.final_schema_git_metadata:
            if self.final_schema_url and self.final_schema_branch and self.final_schema_hash:
                self.final_schema_git_metadata = SchemaRepoMetadata(
                    url=self.final_schema_url,
                    branch=self.final_schema_branch,
                    revision=self.final_schema_hash
                )

    def expected_migrated_files(self):
        """ Provide names of migrated files that migration of this :obj:`MigrationSpec` would produce

        Returns:
            :obj:`list` of :obj:`str`: the names of the migrated files that a successful migration of this
                :obj:`MigrationSpec` will produce
        """
        if self.migrated_files:
            return self.migrated_files
        migrated_files = []
        for existing_file in self.existing_files:
            migrated_files.append(
                Migrator.path_of_migrated_file(existing_file, migrate_suffix=self.migrate_suffix,
                                               migrate_in_place=self.migrate_in_place))
        return migrated_files

    def __str__(self):
        """ Get str representation

        Returns:
            :obj:`str`: string representation of all allowed attributes in a :obj:`MigrationSpec`
        """
        rv = []
        for attr in self._ALLOWED_ATTRS:
            if attr in self._CHANGES_LISTS:
                rv.append("{}:\n{}".format(attr, pformat(getattr(self, attr))))
            else:
                rv.append("{}: {}".format(attr, getattr(self, attr)))
        return '\n'.join(rv)


class MigrationController(object):
    """ Manage migrations

    Manage migrations on several dimensions:

    * Migrate a single model file through a sequence of schemas
    * Perform migrations parameterized by a configuration file
    """

    @staticmethod
    def migrate_over_schema_sequence(migration_spec):
        """ Migrate some model files over a sequence of schemas

        Args:
            migration_spec (:obj:`MigrationSpec`): a migration specification

        Returns:
            :obj:`tuple` of :obj:`list`, :obj:`list`: for each migration, its sequence of models and
                its migrated filename

        Raises:
            :obj:`MigratorError`: if `schema_files`, `renamed_models`, and `seq_of_renamed_attributes`
                are not consistent with each other;
        """
        ms = migration_spec
        ms.is_prepared()

        # iterate over existing_files & migrated_files
        migrated_files = ms.migrated_files if ms.migrated_files else [None] * len(ms.existing_files)
        all_migrated_files = []
        for existing_file, migrated_file in zip(ms.existing_files, migrated_files):
            num_migrations = len(ms.schema_files) - 1
            # since 0 < num_migrations the next loop always executes and branch coverage reports that
            # the 'for' line doesn't jump to return; this cannot be annotated with 'pragma: no cover'
            for i in range(num_migrations):
                # create Migrator for each pair of schemas

                migrator = Migrator(existing_defs_file=ms.schema_files[i],
                                    migrated_defs_file=ms.schema_files[i + 1],
                                    renamed_models=ms.seq_of_renamed_models[i],
                                    renamed_attributes=ms.seq_of_renamed_attributes[i],
                                    io_classes=ms.io_classes,
                                    transformations=ms.seq_of_transformations[i])
                migrator.prepare()
                # the 1st iteration reads models from the existing file; iteration n+1 uses the models set in iteration n
                if i == 0:
                    models = migrator.read_existing_file(existing_file)
                # migrate in memory until the last migration
                models = migrator._migrate_with_transformations(models)
                if i == num_migrations - 1:
                    # done migrating, write to file
                    actual_migrated_file = migrator._write(existing_file, models,
                                                           schema_metadata_model=ms.final_schema_git_metadata, migrated_file=migrated_file,
                                                           migrate_suffix=ms.migrate_suffix, migrate_in_place=ms.migrate_in_place)
                    all_migrated_files.append(actual_migrated_file)

        return all_migrated_files

    @staticmethod
    def migrate_from_spec(migration_spec):
        """ Perform the migration specified in a :obj:`MigrationSpec`

        Args:
            migration_spec (:obj:`MigrationSpec`): a migration specification

        Returns:
            :obj:`list`: of :obj:`str`: migrated filenames
        """
        migration_spec.is_prepared()
        migrated_filenames = MigrationController.migrate_over_schema_sequence(migration_spec)
        return migrated_filenames

    @staticmethod
    def migrate_from_config(migrations_spec_config_file):
        """ Perform the migrations specified in a migrations spec config file

        Args:
            migrations_spec_config_file (:obj:`str`): a migrations spec configuration file, written
                in YAML

        Returns:
            :obj:`list` of :obj:`tuple`: list of (:obj:`MigrationSpec`, migrated filenames) pairs
        """
        migration_specs = MigrationSpec.load(migrations_spec_config_file)
        results = []
        for migration_spec in migration_specs.values():
            results.append((migration_spec, MigrationController.migrate_from_spec(migration_spec)))
        return results


class SchemaChanges(object):
    """ Specification of the changes to a schema in a git commit

    More generally, a :obj:`SchemaChanges` should encode the set of changes to a schema over the sequence
    of git commits since the previous :obj:`SchemaChanges`.

    Attributes:
        _CHANGES_FILE_ATTRS (:obj:`list` of :obj:`str`): required attributes in a schema changes file
        _ATTRIBUTES (:obj:`list` of :obj:`str`): attributes in a :obj:`SchemaChanges` instance
        schema_repo (:obj:`GitRepo`): a Git repo that defines the data model (schema) of the data being
            migrated
        schema_changes_file (:obj:`str`): the schema changes file
        commit_hash (:obj:`str`): hash of a sentinel commit from a schema changes file
        renamed_models (:obj:`list`, optional): list of renamed models in the commit
        renamed_attributes (:obj:`list`, optional): list of renamed attributes in the commit
        transformations_file (:obj:`str`, optional): the name of a Python file containing transformations
        transformations (:obj:`MigrationWrapper`, optional): a wrapper that transforms models during
            migrations
    """
    _CHANGES_FILE_ATTRS = ['commit_hash', 'renamed_models', 'renamed_attributes', 'transformations_file']

    _ATTRIBUTES = ['schema_repo', 'transformations', 'schema_changes_file', 'commit_hash',
                   'renamed_models', 'renamed_attributes', 'transformations_file']

    # template for the name of a schema changes file; the format placeholders are replaced
    # with the file's creation timestamp and the prefix of the commit's git hash, respectively
    _CHANGES_FILENAME_TEMPLATE = "schema_changes_{}_{}.yaml"
    _SHA1_LEN = 40

    def __init__(self, schema_repo=None, schema_changes_file=None, commit_hash=None, renamed_models=None,
                 renamed_attributes=None, transformations_file=None):
        self.schema_repo = schema_repo
        self.schema_changes_file = schema_changes_file
        self.commit_hash = commit_hash
        self.renamed_models = renamed_models
        self.renamed_attributes = renamed_attributes
        self.transformations_file = transformations_file
        self.transformations = self.load_transformations()

    def get_hash(self):
        """ Get the repo's current commit hash

        Returns:
            :obj:`str`: the hash
        """
        return self.schema_repo.latest_hash()

    @staticmethod
    def get_date_timestamp():
        """ Get a current date timestamp, with second resolution

        Returns:
            :obj:`str`: the timestamp
        """
        dt = datetime.datetime.now(tz=None)
        return dt.strftime("%Y-%m-%d-%H-%M-%S")

    @staticmethod
    def hash_prefix_from_sc_file(schema_changes_file):
        """ Get the hash prefix from a schema changes filename

        Args:
            schema_changes_file (:obj:`str`): the schema changes file

        Returns:
            :obj:`str`: the hash prefix in a schema changes filename
        """
        path = Path(schema_changes_file)
        return path.stem.split('_')[-1]

    @staticmethod
    def all_schema_changes_files(migrations_directory):
        """ Find all schema changes files in a git repo

        Args:
            migrations_directory (:obj:`str`): path to the migrations directory in a git repo

        Returns:
            :obj:`list`: of :obj:`str`: pathnames of the schema changes files

        Raises:
            :obj:`MigratorError`: if no schema changes files are found
        """
        # use glob to search
        pattern = SchemaChanges._CHANGES_FILENAME_TEMPLATE.format('*', '*')
        files = list(Path(migrations_directory).glob(pattern))
        num_files = len(files)
        if not num_files:
            raise MigratorError("no schema changes files in '{}'".format(migrations_directory))
        return [str(file) for file in files]

    @staticmethod
    def all_schema_changes_with_commits(schema_repo):
        """ Instantiate all schema changes in a git repo

        Obtain all validated schema change files.

        Args:
            schema_repo (:obj:`GitRepo`): an initialized repo for the schema

        Returns:
            :obj:`tuple`: :obj:`list` of errors, :obj:`list` all validated schema change files
        """
        errors = []
        schema_changes_with_commits = []
        migrations_directory = schema_repo.migrations_dir()
        for sc_file in SchemaChanges.all_schema_changes_files(migrations_directory):
            try:
                hash_prefix = SchemaChanges.hash_prefix_from_sc_file(sc_file)
                sc_dict = SchemaChanges.load(sc_file)
                commit_hash = sc_dict['commit_hash']
                if GitRepo.hash_prefix(commit_hash) != hash_prefix:
                    errors.append("hash prefix in schema changes filename '{}' inconsistent "
                                  "with hash in file: '{}'".format(sc_file, sc_dict['commit_hash']))
                    continue

                # ensure that the hash corresponds to a commit
                try:
                    schema_repo.get_commit(commit_hash)
                except MigratorError:
                    errors.append("the hash in '{}', which is '{}', isn't the hash of a commit".format(
                        sc_file, sc_dict['commit_hash']))
                    continue

                schema_changes_with_commits.append(SchemaChanges.generate_instance(sc_file))

            except MigratorError as e:
                errors.append(str(e))

        return errors, schema_changes_with_commits

    @staticmethod
    def find_file(schema_repo, commit_hash):
        """ Find a schema changes file in a git repo

        Args:
            schema_repo (:obj:`GitRepo`): an initialized repo for the schema
            commit_hash (:obj:`str`): hash of a sentinel commit

        Returns:
            :obj:`str`: the pathname of the file found

        Raises:
            :obj:`MigratorError`: if a file with the hash cannot be found, or multiple files
                have the hash
        """
        # search with glob
        pattern = SchemaChanges._CHANGES_FILENAME_TEMPLATE.format('*',
                                                                  GitRepo.hash_prefix(commit_hash))
        migrations_directory = schema_repo.migrations_dir()
        files = list(Path(migrations_directory).glob(pattern))
        num_files = len(files)
        if not num_files:
            raise MigratorError("no schema changes file in '{}' for hash {}".format(
                migrations_directory, commit_hash))
        if 1 < num_files:
            raise MigratorError("multiple schema changes files in '{}' for hash {}".format(
                migrations_directory, commit_hash))
        schema_changes_file = str(files[0])

        # ensure that hash in name and file are consistent
        sc_dict = SchemaChanges.load(schema_changes_file)
        if GitRepo.hash_prefix(sc_dict['commit_hash']) != GitRepo.hash_prefix(commit_hash):
            raise MigratorError("hash prefix in schema changes filename '{}' inconsistent "
                                "with hash in file: '{}'".format(schema_changes_file, sc_dict['commit_hash']))

        # ensure that the hash corresponds to a commit
        try:
            schema_repo.get_commit(commit_hash)
        except MigratorError:
            raise MigratorError("the hash in '{}', which is '{}', isn't the hash of a commit".format(
                schema_changes_file, sc_dict['commit_hash']))

        return schema_changes_file

    def generate_filename(self, commit_hash):
        """ Generate a filename for a template schema changes file

        Returns:
            :obj:`str`: the filename
        """
        return SchemaChanges._CHANGES_FILENAME_TEMPLATE.format(self.get_date_timestamp(),
                                                               GitRepo.hash_prefix(commit_hash))

    def make_template(self, schema_url=None, commit_hash=None):
        """ Make a template schema changes file

        The template includes the repo hash which it describes and empty values for :obj:`SchemaChanges`
        attributes.

        Args:
            schema_url (:obj:`str`, optional): URL of the schema repo; if not provided, `self.schema_repo`
                must be already initialized
            commit_hash (:obj:`str`, optional): hash of the sentinel commit in the schema repo which
                the template schema changes file identifies; default is the most recent commit

        Returns:
            :obj:`str`: pathname of the schema changes file that was written

        Raises:
            :obj:`MigratorError`: if a repo cannot be cloned from :obj:`schema_url`, or
                checked out from :obj:`commit_hash`, or
                the schema changes file already exists
        """
        if schema_url:
            # clone the schema at schema_url
            self.schema_repo = GitRepo(schema_url)
        else:
            if not self.schema_repo:
                raise MigratorError("schema_repo must be initialized")
        if commit_hash:
            # ensure that commit_hash exists
            self.schema_repo.get_commit(commit_hash)
        else:
            commit_hash = self.get_hash()

        filename = self.generate_filename(commit_hash)
        changes_file_dir = self.schema_repo.migrations_dir()
        # if changes_file_dir doesn't exist, make it
        os.makedirs(changes_file_dir, exist_ok=True)

        pathname = os.path.join(changes_file_dir, filename)
        if os.path.exists(pathname):
            raise MigratorError("schema changes file '{}' already exists".format(pathname))

        with open(pathname, 'w') as file:
            file.write(u'# schema changes file\n')
            file.write(u"# stored in '{}'\n\n".format(filename))
            # generate YAML content
            template_data = dict(
                commit_hash=commit_hash,
                renamed_models=[],
                renamed_attributes=[],
                transformations_file=''
            )
            file.write(yaml.dump(template_data))

        # add the config file to the git repo
        self.schema_repo.repo.index.add([pathname])
        return pathname

    @staticmethod
    def make_template_command(schema_dir, commit_hash=None):
        """ Make a template schema changes file with CLI input

        Args:
            schema_dir (:obj:`str`): directory of the schema repo
            commit_hash (:obj:`str`, optional): hash of the sentinel commit in the schema repo which
                the template schema changes file identifies; default is the most recent commit

        Returns:
            :obj:`str`: pathname of the schema changes file that was written

        Raises:
            :obj:`MigratorError`: if a repo cannot be cloned from `schema_url`, or
                checked out from :obj:`commit_hash`, or
                the schema changes file already exists
        """

        # extract URL of schema repo from local repo clone
        schema_dir = os.path.abspath(schema_dir)
        if not os.path.isdir(schema_dir):
            raise MigratorError("schema_dir is not a directory: '{}'".format(schema_dir))
        git_repo = GitRepo(schema_dir)

        # create template schema changes file
        schema_changes = SchemaChanges(git_repo)
        schema_changes_template_file = schema_changes.make_template(commit_hash=commit_hash)
        return schema_changes_template_file

    @staticmethod
    def load(schema_changes_file):
        """ Read a schema changes file

        Args:
            schema_changes_file (:obj:`str`): path to the schema changes file

        Returns:
            :obj:`dict`: the data in the schema changes file

        Raises:
            :obj:`MigratorError`: if the schema changes file cannot be found,
                or is not proper YAML,
                or does not have the right format,
                or does not contain any changes
        """
        try:
            fd = open(schema_changes_file, 'r')
        except FileNotFoundError:
            raise MigratorError("could not read schema changes file: '{}'".format(
                schema_changes_file))
        try:
            schema_changes = yaml.load(fd, Loader=yaml.FullLoader)
        except yaml.YAMLError as e:
            raise MigratorError("could not parse YAML schema changes file: '{}':\n{}".format(
                schema_changes_file, e))

        if not isinstance(schema_changes, dict) or \
                any([attr not in schema_changes for attr in SchemaChanges._CHANGES_FILE_ATTRS]):
            raise MigratorError("schema changes file '{}' must have a dict with these attributes:\n{}".format(
                schema_changes_file, ', '.join(SchemaChanges._CHANGES_FILE_ATTRS)))

        # report empty schema changes files (unmodified templates)
        if schema_changes['renamed_models'] == [] and \
                schema_changes['renamed_attributes'] == [] and \
                schema_changes['transformations_file'] == '':
            raise MigratorError("schema changes file '{}' is empty (an unmodified template)".format(
                schema_changes_file))

        schema_changes['schema_changes_file'] = schema_changes_file
        return schema_changes

    def load_transformations(self):
        """ Load a transformations wrapper if a transformations file is configured

        Returns:
            :obj:`MigrationWrapper`: the transformations wrapper
        """
        # use MigrationWrapper to load transformations
        if self.transformations_file:
            transformations_file = os.path.join(os.path.dirname(self.schema_changes_file),
                                                self.transformations_file)
            wrappers = MigrationWrapper.import_migration_wrapper(transformations_file)
            if 'transformations' not in wrappers or \
                    not isinstance(wrappers['transformations'], MigrationWrapper):
                raise MigratorError("the 'transformations_file' in schema changes file '{}' must "
                                    "define a MigrationWrapper instance called 'transformations'".format(
                                        self.schema_changes_file))
            return wrappers['transformations']
        return None

    @staticmethod
    def validate(schema_changes_kwargs):
        """ Check that the attributes of the arguments to :obj:`SchemaChanges` have the right structure

        Args:
            schema_changes_kwargs (:obj:`dict`): kwargs arguments to :obj:`SchemaChanges` generated by loading a schema
                changes file

        Returns:
            :obj:`list`: errors in :obj:`schema_changes_kwargs`
        """
        # check types
        errors = []
        for str_attr in ['commit_hash', 'transformations_file']:
            if not isinstance(schema_changes_kwargs[str_attr], str):
                errors.append("{} must be a str, but is a(n) {}".format(
                    str_attr, type(schema_changes_kwargs[str_attr]).__name__))

        required_structure = "renamed_models must be a list of pairs of strings, but is '{}'"
        try:
            for renaming in schema_changes_kwargs['renamed_models']:
                if len(renaming) != 2 or not isinstance(renaming[0], str) or not isinstance(renaming[1], str):
                    errors.append(required_structure.format(schema_changes_kwargs['renamed_models']))
                    continue
        except TypeError as e:
            errors.append(required_structure.format(schema_changes_kwargs['renamed_models']) + ", "
                          "and examining it raises a(n) '{}' error".format(str(e)))

        required_structure = ("renamed_attributes must be a list of pairs of pairs of "
                              "strings, but is '{}'")
        renamed_attributes = schema_changes_kwargs['renamed_attributes']
        try:
            no_renamed_attributes_error = True
            for renaming in renamed_attributes:
                # a renaming should be like [['TestNew', 'new_existing_attr'], ['TestNew', 'existing_attr'], ...]
                existing, migrated = renaming
                for attr_name in existing, migrated:
                    if (len(attr_name) != 2 or not isinstance(attr_name[0], str) or
                            not isinstance(attr_name[1], str)) and no_renamed_attributes_error:
                        errors.append(required_structure.format(renamed_attributes))
                        no_renamed_attributes_error = False
                        continue
        except (TypeError, ValueError) as e:
            errors.append(required_structure.format(renamed_attributes) + ", and examining "
                          "it raises a(n) '{}' error".format(str(e)))

        if errors:
            return errors

        if len(schema_changes_kwargs['commit_hash']) != SchemaChanges._SHA1_LEN:
            errors.append("commit_hash is '{}', which isn't the right length for a "
                          "git hash".format(schema_changes_kwargs['commit_hash']))

        return errors

    @staticmethod
    def generate_instance(schema_changes_file):
        """ Generate a :obj:`SchemaChanges` instance from a schema changes file

        Args:
            schema_changes_file (:obj:`str`): path to the schema changes file

        Returns:
            :obj:`SchemaChanges`: the :obj:`SchemaChanges` instance
        """
        schema_changes_kwargs = SchemaChanges.load(schema_changes_file)
        errors = SchemaChanges.validate(schema_changes_kwargs)
        if errors:
            raise MigratorError(
                "in schema changes file '{}':\n)".format(schema_changes_file, '\n'.join(errors)))
        return SchemaChanges(**schema_changes_kwargs)

    def __str__(self):
        """ Provide a string representation

        Returns:
            :obj:`str`: a string representation of this :obj:`SchemaChanges`
        """
        rv = []
        for attr in self._ATTRIBUTES:
            rv.append("{}: {}".format(attr, getattr(self, attr)))
        return '\n'.join(rv)


class GitRepo(object):
    """ Methods for processing a git repo and its commit history

    Attributes:
        repo_dir (:obj:`str`): the repo's root directory
        repo_url (:obj:`str`): the repo's URL, if provided
        branch (:obj:`str`): the repo's branch, if it was cloned
        repo (:obj:`git.Repo`): the `GitPython` repo
        commit_DAG (:obj:`nx.classes.digraph.DiGraph`): `NetworkX` DAG of the repo's commit history
        git_hash_map (:obj:`dict`): map from each git hash in the repo to its commit
        temp_dirs (:obj:`list` of :obj:`str`): temporary directories that hold repo clones
    """
    # default repo name if name not known
    _NAME_UNKNOWN = 'name_unknown'

    # name of an empty subdirectory in a temp dir that can be used as a destination for :obj:`shutil.copytree`
    EMPTY_SUBDIR = 'empty_subdir'

    _HASH_PREFIX_LEN = 7

    def __init__(self, repo_location=None, repo_url=None, branch='master', search_parent_directories=False):
        """ Initialize a :obj:`GitRepo` from an existing Git repo

        If :obj:`repo_location` is a directory then use the Git repo in the directory. Otherwise it must
        be an URL and the repo is cloned into a temporary directory.

        Args:
            repo_location (:obj:`str`, optional): the location of the repo, either its directory or
                its URL
            repo_url (:obj:`str`, optional): the repo's original URL, which will be set in all
                copies of a repo that was initially cloned by :obj:`clone_repo_from_url`
            branch (:obj:`str`, optional): branch to clone if :obj:`repo_location` is an URL; default is
                ``master``
            search_parent_directories (:obj:`bool`, optional): :obj:`search_parent_directories` option to
                :obj:`git.Repo`; if set and :obj:`repo_location` is a directory, then all of its parent
                directories will be searched for a valid repo; default=False

        Returns:
            :obj:`str`: root directory for the repo (which contains the .git directory)
        """
        self.commit_DAG = None
        self.git_hash_map = None
        self.temp_dirs = []
        self.repo = None
        self.repo_dir = None
        # todo: metadata: check the format of repo_url?
        self.repo_url = repo_url
        self.branch = branch
        if repo_location:
            if os.path.isdir(repo_location):
                try:
                    self.repo = git.Repo(repo_location, search_parent_directories=search_parent_directories)
                except git.exc.GitError:
                    raise MigratorError("instantiating a git.Repo from directory '{}' failed".format(
                        repo_location))
                self.repo_dir = os.path.dirname(self.repo.git_dir)
            else:
                self.clone_repo_from_url(repo_location, branch=branch)
            self.commit_DAG = self.commits_as_graph()

    def get_temp_dir(self):
        """ Get a temporary directory, which must eventually be deleted by calling :obj:`del_temp_dirs`

        Returns:
            :obj:`str`: the pathname to a temporary directory
        """
        temp_dir = tempfile.mkdtemp()
        self.temp_dirs.append(temp_dir)
        return temp_dir

    def del_temp_dirs(self):
        """ Delete the temp dirs created by :obj:`get_temp_dir`

        Returns:
            :obj:`str`: the pathname to a temporary directory
        """
        for temp_dir in self.temp_dirs:
            if os.path.isdir(temp_dir):
                shutil.rmtree(temp_dir)

    def clone_repo_from_url(self, url, branch='master', directory=None):
        """ Clone a repo from an URL

        Args:
            url (:obj:`str`): URL for the repo
            branch (:obj:`str`, optional): branch to clone; default is ``master``
            directory (:obj:`str`, optional): directory to hold the repo; if not provided, the repo
                is stored in a new temporary dir

        Returns:
            :obj:`tuple`: (:obj:`git.Repo`, :obj:`str`): the repo cloned, and its root directory

        Raises:
            :obj:`MigratorError`: if repo cannot be cloned from :obj:`url`
        """
        if directory is None:
            directory = self.get_temp_dir()
        elif not os.path.isdir(directory):
            raise MigratorError("'{}' is not a directory".format(directory))
        try:
            kwargs = {'branch': branch}
            repo = git.Repo.clone_from(url, directory, **kwargs)
        except Exception as e:
            raise MigratorError("repo cannot be cloned from '{}'\n{}".format(url, e))
        self.repo = repo
        self.repo_dir = directory
        # todo: metadata: check the format of repo_url?
        self.repo_url = url
        self.branch = branch
        return repo, directory

    def copy(self, tmp_dir=None):
        """ Copy this :obj:`GitRepo` into a new directory

        For better performance use ``copy()`` instead of ``GitRepo()`` or ``clone_repo_from_url()`` if you
        need multiple copies of a repo, such as multiple instances checked out to different commits.
        This is an optimization because copying is faster than cloning over the network.
        To avoid ``bytecode is stale`` errors, doesn't copy ``__pycache__`` directories.

        Args:
            tmp_dir (:obj:`str`, optional): directory to hold the repo; if not provided, a new temporary
                directory is created to store the repo

        Returns:
            :obj:`GitRepo`: a new :obj:`GitRepo` that's a copy of :obj:`self` in a new temporary directory
        """
        if tmp_dir is None:
            tmp_dir = self.get_temp_dir()
        elif not os.path.isdir(tmp_dir):
            raise MigratorError("'{}' is not a directory".format(tmp_dir))
        dst = os.path.join(tmp_dir, self.EMPTY_SUBDIR)
        if not self.repo_dir:
            raise MigratorError("cannot copy an empty GitRepo")
        shutil.copytree(self.repo_dir, dst, ignore=shutil.ignore_patterns('*__pycache__*'))
        return GitRepo(dst, repo_url=self.repo_url, branch=self.branch)

    def migrations_dir(self):
        """ Get the repo's migrations directory

        Returns:
            :obj:`str`: the repo's migrations directory
        """
        return os.path.join(self.repo_dir, DataSchemaMigration._MIGRATIONS_DIRECTORY)

    def fixtures_dir(self):
        """ Get the repo's fixtures directory

        Returns:
            :obj:`str`: the repo's fixtures directory
        """
        return os.path.join(self.repo_dir, 'tests', 'fixtures')

    def repo_name(self):
        """ Get the repo's name

        Returns:
            :obj:`str`: the repo's name
        """
        if self.repo_url:
            split_url = self.repo_url.split('/')
            return split_url[-1]
        elif self.repo_dir:
            return os.path.basename(self.repo_dir)
        # todo: get the name even if the repo is in tmp dir by using the original location
        return self._NAME_UNKNOWN

    def head_commit(self):
        """ Get the repo's head commit

        Returns:
            :obj:`git.objects.commit.Commit`: the repo's latest commit
        """
        return self.repo.head.commit

    def latest_hash(self):
        """ Get the hash of the repo's head commit

        Returns:
            :obj:`str`: the head commit's SHA1 hash
        """
        return self.get_hash(self.head_commit())

    def get_commit(self, commit_or_hash):
        """ Obtain a commit from its hash

        Also, if :obj:`commit_or_hash` is a commit, simply return it.

        Args:
            commit_or_hash (:obj:`str` or :obj:`git.objects.commit.Commit`): the hash of a commit or a commit
                in the repo

        Returns:
            :obj:`git.objects.commit.Commit`: a commit

        Raises:
            :obj:`MigratorError`: if :obj:`commit_or_hash` is not a commit and cannot be converted into one
        """
        if isinstance(commit_or_hash, str):
            if commit_or_hash in self.git_hash_map:
                return self.git_hash_map[commit_or_hash]
        elif isinstance(commit_or_hash, git.objects.commit.Commit):
            return commit_or_hash
        raise MigratorError("commit_or_hash '{}' cannot be converted into a commit".format(commit_or_hash))

    def get_commits(self, commits_or_hashes):
        """ Get the commits with the given commits or hashes

        Args:
            commits_or_hashes (:obj:`list` of :obj:`str`): an iterator over commits or commit hashes

        Returns:
            :obj:`list` of :obj:`git.objects.commit.Commit`: list of the repo's commits with the
                commits or hashes in :obj:`commits_or_hashes`

        Raises:
            :obj:`MigratorError`: if any commit or hash in `commits_or_hashes` doesn't identify a
                commit in this repo
        """
        if not commits_or_hashes:
            return []
        commits = []
        bad_hashes = []
        for hash in commits_or_hashes:
            try:
                commits.append(self.get_commit(hash))
            except MigratorError:
                bad_hashes.append(hash)
        if bad_hashes:
            raise MigratorError("No commit found for {}".format(", or ".join(bad_hashes)))
        return commits

    def commits_as_graph(self):
        """ Make a DAG for this repo's commit dependencies - edges point from dependent commit to parent commit

        The DAG contains all commits in the repo on which the latest commit depends. Also creates
        `git_hash_map`, a map from all git hashes to their commits.

        Returns:
            :obj:`nx.classes.digraph.DiGraph`: a DAG representing the repo commit history
        """
        self.git_hash_map = {}
        commit_graph = nx.DiGraph()
        latest = self.head_commit()
        self.git_hash_map[self.get_hash(latest)] = latest
        commits_to_explore = {latest}
        commits_explored = set()
        while commits_to_explore:
            commit = commits_to_explore.pop()
            for parent in commit.parents:
                # edges point from dependent commit to parent commit
                commit_graph.add_edge(commit, parent)
                self.git_hash_map[self.get_hash(parent)] = parent
                if parent not in commits_explored:
                    commits_to_explore.add(parent)
            commits_explored.add(commit)
        return commit_graph

    @staticmethod
    def hash_prefix(hash):
        """ Get a commit hash's prefix

        Args:
            hash (:obj:`str`): git commit hash

        Returns:
            :obj:`str`: hash's prefix
        """
        return hash[:GitRepo._HASH_PREFIX_LEN]

    @staticmethod
    def get_hash(commit):
        """ Get a commit's hash

        Args:
            commit (:obj:`git.objects.commit.Commit`): a commit

        Returns:
            :obj:`str`: the commit's SHA1 hash
        """
        return commit.hexsha

    def get_metadata(self, metadata_type):
        """ Create a metadata model that describes the current state of this repo

        Args:
            metadata_type (:obj:`class`): a subclass of `RepoMetadata`, either
                `DataRepoMetadata` or `SchemaRepoMetadata`

        Returns:
            :obj:`RepoMetadata`: a `RepoMetadata` object

        Raises:
            :obj:`MigratorError`: if the metadata isn't available in this repo
        """
        attrs = ['repo_url', 'branch', 'repo']
        missing = [a for a in attrs if not getattr(self, a)]
        if missing:
            raise MigratorError("can't get metadata for '{}' repo; uninitialized attributes: '{}'".format(
                self.repo_name(), ', '.join(missing)))
        repo_metadata_obj = metadata_type(
            url=self.repo_url,
            branch=self.branch,
            revision=self.latest_hash()
        )
        return repo_metadata_obj

    def checkout_commit(self, commit_identifier):
        """ Checkout a commit for this repo

        Use `checkout_commit` carefully. If it checks out a new commit, then other operations on the
        repo may behave differently.

        Args:
            commit_identifier (:obj:`git.objects.commit.Commit` or :obj:`str`): a commit or a commit's hash

        Raises:
            :obj:`MigratorError`: if the commit cannot be checked out
        """
        try:
            commit_hash = GitRepo.get_hash(self.get_commit(commit_identifier))
            # use git directly, as per https://gitpython.readthedocs.io/en/stable/tutorial.html#using-git-directly
            self.repo.git.checkout(commit_hash, detach=True)
        except git.exc.GitError as e:
            raise MigratorError("checkout of '{}' to commit '{}' failed:\n{}".format(self.repo_name(),
                                                                                     commit_hash, e))

    def add_file(self, filename):
        """ Add a file to the index

        Args:
            filename (:obj:`str`): path to new file

        Raises:
            :obj:`MigratorError`: if the file cannot be added
        """
        try:
            self.repo.index.add([filename])
        except (OSError, git.exc.GitError) as e:
            raise MigratorError("adding file '{}' to repo '{}' failed:\n{}".format(
                filename, self.repo_name(), e))

    def commit_changes(self, message):
        """ Commit the changes in this repo

        Args:
            message (:obj:`str`): the commit message

        Raises:
            :obj:`MigratorError`: if the changes cannot be commited
        """
        try:
            self.repo.index.commit(message)
        except (git.exc.GitError, AttributeError) as e:
            raise MigratorError("commiting repo '{}' failed:\n{}".format(self.repo_name(), e))

    def get_dependents(self, commit_or_hash):
        """ Get all commits that depend on a commit, including transitively

        Args:
            commit_or_hash (:obj:`str` or :obj:`git.objects.commit.Commit`): the hash of a commit or a commit
                in the repo

        Returns:
            :obj:`set` of :obj:`git.objects.commit.Commit`: all commits that depend on `commit_or_hash`
        """
        commit = self.get_commit(commit_or_hash)
        return ancestors(self.commit_DAG, commit)

    def commits_in_dependency_consistent_seq(self, commits):
        """ Order some git commits into a sequence that's consistent with the repo's dependencies

        Generate a topological sort of the commits.
        If the dependency relationship contains branching, then
        the sequence found is not deterministic, because concurrent nodes
        can appear in any relative order in the sort.
        E.g., in a commit DAG with the paths a -> b -> c and a -> d -> c, nodes
        b and d can appear in either order in the sequence.

        Args:
            commits (:obj:`list` of :obj:`git.objects.commit.Commit`): commits to include in the
                returned sequence

        Returns:
            :obj:`list` of :obj:`git.objects.commit.Commit`: the elements of `commits`, in a sequence
                that's consistent with git commit dependencies in the repository, ordered from
                from antecedent to dependent
        """
        seq_with_schema_changes = []
        commits = set(commits)
        for commit in topological_sort(self.commit_DAG):
            if commit in commits:
                seq_with_schema_changes.append(commit)
        seq_with_schema_changes.reverse()
        return seq_with_schema_changes

    def __str__(self):
        """ Provide a string representation

        Returns:
            :obj:`str`: a string representation of this `GitRepo`
        """
        rv = []
        scalar_attrs = ['repo_dir', 'repo_url', 'repo']
        for attr in scalar_attrs:
            val = getattr(self, attr)
            if val:
                rv.append("{}: {}".format(attr, val))
            else:
                rv.append("{}: not initialized".format(attr))

        if self.repo:
            rv.append("latest commit:\n\thash: {}\n\tsummary: {}\n\tUT timestamp: {}".format(
                GitRepo.get_hash(self.head_commit()),
                self.head_commit().summary,
                datetime.datetime.fromtimestamp(self.head_commit().committed_date).strftime('%c')))

        if self.commit_DAG:
            rv.append("commit_DAG (child -> parent):\n\t{}".format(
                "\n\t".join(
                    ["{} -> {}".format(GitRepo.hash_prefix(GitRepo.get_hash(u)),
                                       GitRepo.hash_prefix(GitRepo.get_hash(v)))
                     for (u, v) in self.commit_DAG.edges])))
        else:
            rv.append("commit_DAG: empty")

        if self.git_hash_map:
            rv.append("git_hash_map: {} entries".format(len(self.git_hash_map)))
        else:
            rv.append("git_hash_map: empty")

        if self.temp_dirs:
            rv.append("temp_dirs:\n\t{}".format("\n\t".join(self.temp_dirs)))
        else:
            rv.append("temp_dirs: none used")

        return '\n'.join(rv)


class DataSchemaMigration(object):
    """ Automate the migration of the data files in a repo

    A *data* repo stores the data files that need to be migrated. A *schema* repo contains the
    schemas that provide the data models for these data files. The *data* and *schema* repos may be
    the same repo or two different repos.

    `DataSchemaMigration` uses configuration information in the *data* and *schema* repos to migrate
    data files in the *data* repo to the latest version of the *schema* repo.

    The `data` repo must contain a `migrations` directory that has:

    * Data-schema migration configuration files, written in YAML

    A data-schema migration configuration file contains the attributes described in
    `DataSchemaMigration._CONFIG_ATTRIBUTES`:

    * `files_to_migrate`: a list of files to be migrated
    * `schema_repo_url`: the URL of the `schema` repo
    * `branch`: the branch of the `schema` repo
    * `schema_file`: the relative path of the schema file in the `schema` repo

    The `schema` repo contains a `migrations` directory that contains schema changes files, which
    may refer to associated transformations files. Hashes
    in the changes files must refer to commits in the `schema` repo. These files are managed by
    `SchemaChanges` objects. Migration will not work if changes to the schema are not documented in
    schema changes files.

    Attributes:
        data_repo_location (:obj:`str`): directory or URL of the *data* repo
        data_git_repo (:obj:`GitRepo`): a :obj:`GitRepo` for a git clone of the *data* repo
        schema_git_repo (:obj:`GitRepo`): a :obj:`GitRepo` for a clone of the *schema* repo
        data_config_file_basename (:obj:`str`): the basename of the YAML configuration file for the
            migration, which is stored in the *data* repo's migrations directory
        migration_config_data (:obj:`dict`): the data in the data-schema migration config file
        loaded_schema_changes (:obj:`list`): all validated schema change files
        migration_specs (:obj:`MigrationSpec`): the migration's specification
        io_classes (:obj:`dict` of :obj:`type`, optional): custom schema repo specific reader and/or
            writer for I/O of existing and migrated files, respectively
        git_repos (:obj:`list` of :obj:`GitRepo`): all `GitRepo`\ s create by this `DataSchemaMigration`
    """

    # name of the migrations directory
    _MIGRATIONS_DIRECTORY = 'migrations'

    # template for the name of a data-schema migration config file; the format placeholders are replaced with
    # 1) the name of the data repo, 2) the name of the schema repo, and 3) a datetime value
    # assumes that the filled values do not contain '--'
    _MIGRATION_CONF_NAME_TEMPLATE = 'data_schema_migration_conf--{}--{}--{}.yaml'

    # constant for IO classes management
    CUSTOM_IO_CLASSES_FILE = 'custom_io_classes.py'

    # attributes in the data-schema migration configuration file
    _CONFIG_ATTRIBUTES = {
        # 'name': (type, description, default)
        'files_to_migrate': ('list', 'paths to files in the data repo to migrate', None),
        'schema_repo_url': ('str', 'the URL of the schema repo', None),
        'branch': ('str', "the schema's branch", 'main'),
        'schema_file': ('str', 'the relative path to the schema file in the schema repo', None)
    }

    # attributes in a `DataSchemaMigration`
    _ATTRIBUTES = ['data_repo_location', 'data_git_repo', 'schema_git_repo', 'data_config_file_basename',
                   'migration_config_data', 'loaded_schema_changes', 'migration_specs', 'io_classes', 'git_repos']
    _REQUIRED_ATTRIBUTES = ['data_repo_location', 'data_config_file_basename']

    def __init__(self, **kwargs):
        for attr in self._ATTRIBUTES:
            setattr(self, attr, None)

        # check that all required attributes are provided
        missing_attrs = set(self._REQUIRED_ATTRIBUTES) - set(kwargs)
        if missing_attrs:
            raise MigratorError("initialization of DataSchemaMigration must provide "
                                "DataSchemaMigration._REQUIRED_ATTRIBUTES ({}) but these are missing: {}".format(
                                    self._REQUIRED_ATTRIBUTES, missing_attrs))

        # initialize attributes provided
        for attr, value in kwargs.items():
            setattr(self, attr, value)

        self.git_repos = []

        # get data repo
        self.data_git_repo = GitRepo(self.data_repo_location)
        self.record_git_repo(self.data_git_repo)

        # load data config file
        self.migration_config_data = self.load_config_file(
            os.path.join(self.data_git_repo.migrations_dir(), self.data_config_file_basename))

        # clone and load the schema repo
        self.schema_git_repo = GitRepo(self.migration_config_data['schema_repo_url'],
                                       branch=self.migration_config_data['branch'])
        self.record_git_repo(self.schema_git_repo)

    @staticmethod
    def make_migration_config_file(data_git_repo, schema_repo_name, add_to_repo=True, **kwargs):
        """ Create a data-schema migration config file

        Args:
            data_git_repo (:obj:`GitRepo`): the data git repo that contains the data files to migrate
            schema_repo_name (:obj:`str`): name of the schema repo
            add_to_repo (:obj:`bool`, optional): if set, add the migration config file to the data repo;
                default = :obj:`True`:
            kwargs (:obj:`dict`): optional initial values for data-schema migration config file

        Returns:
            :obj:`str`: the pathname to the data-schema migration config file that was written

        Raises:
            :obj:`MigratorError`: if the data-schema migration configuration file already exists
        """
        pathname = os.path.join(data_git_repo.migrations_dir(),
                                DataSchemaMigration.get_name_static(data_git_repo.repo_name(), schema_repo_name))
        if os.path.exists(pathname):
            raise MigratorError("data-schema migration configuration file '{}' already exists".format(pathname))

        # create the migrations directory if it doesn't exist
        os.makedirs(data_git_repo.migrations_dir(), exist_ok=True)

        config_data = {}
        for name, config_attr in DataSchemaMigration._CONFIG_ATTRIBUTES.items():
            attr_type, _, default = config_attr
            default = default or eval(attr_type + '()')
            val = kwargs[name] if name in kwargs else default
            config_data[name] = val

        with open(pathname, 'w') as file:
            file.write(u'# data-schema migration configuration file\n\n')
            # add documentation about the config file attributes
            file.write(u'# description of the attributes:\n')
            for name, config_attr in DataSchemaMigration._CONFIG_ATTRIBUTES.items():
                _, description, _ = config_attr
                file.write("# '{}' contains {}\n".format(name, description))

            # generate YAML content
            file.write(yaml.dump(config_data))

        if add_to_repo:
            # add the data-schema migration configuration file to the data git repo
            data_git_repo.repo.index.add([pathname])
        return pathname

    @staticmethod
    def make_data_schema_migration_conf_file_cmd(data_repo_dir, schema_file_url, files_to_migrate,
                                                 add_to_repo=True):
        """ Make a data-schema migration configuration file from CLI input

        Args:
            data_repo_dir (:obj:`str`): directory of the data repo
            schema_file_url (:obj:`str`): URL for schema's Python file
            files_to_migrate (:obj:`list` of :obj:`str`): data files to migrate
            add_to_repo (:obj:`bool`, optional): if set, add the migration config file to the data repo;
                default = :obj:`True`:

        Returns:
            :obj:`str`: pathname of the schema changes file that was written

        Raises:
            :obj:`MigratorError`: if `data_repo_dir` isn't the directory of a repo, or
                `schema_file_url` isn't the URL of a schema file, or
                `files_to_migrate` aren't files
        """
        data_repo_dir = os.path.abspath(data_repo_dir)
        """ convert schema_file_url into URL for schema repo & relative path to schema's Python file """
        migration_config_file_kwargs = {}
        parsed_url = urlparse(schema_file_url)
        # error checks
        form = "scheme://git_website/organization/repo_name/'blob'/branch/relative_pathname"
        error_msg = "schema_file_url must be URL for python schema file, of the form: {}".format(form)
        if not parsed_url.scheme or not parsed_url.netloc:
            raise MigratorError(error_msg)
        elements = parsed_url.path.split('/')
        if len(elements) < 6 or not schema_file_url.endswith('.py'):
            raise MigratorError(error_msg)
        schema_repo_url = parsed_url.scheme + '://' + parsed_url.netloc + '/' + '/'.join(elements[1:3])
        migration_config_file_kwargs['schema_repo_url'] = schema_repo_url
        schema_repo_name = elements[2]
        branch = elements[4]
        migration_config_file_kwargs['branch'] = branch
        # strip first two elements from schema_path: 'blob'/branch
        schema_file = '/'.join(elements[5:])
        migration_config_file_kwargs['schema_file'] = os.path.join('..', schema_file)

        """ extract URL of data repo from local repo clone """
        if not os.path.isdir(data_repo_dir):
            raise MigratorError("data_repo_dir is not a directory: '{}'".format(data_repo_dir))
        git_repo = GitRepo(data_repo_dir)

        """ convert each data_file into path relative to migrations dir of data repo """
        data_repo_root_dir = git_repo.repo_dir
        converted_data_files = []
        errors = []
        for data_file in files_to_migrate:
            # get full path
            normalized_data_file = normalize_filename(data_file, dir=data_repo_dir)
            if not os.path.isfile(normalized_data_file):
                errors.append("cannot find data file: '{}'".format(data_file))
                continue
            # get path relative to migrations dir
            relative_path = str(PurePath(normalized_data_file).relative_to(data_repo_root_dir))
            relative_path = os.path.join('..', relative_path)
            converted_data_files.append(relative_path)
        if errors:
            raise MigratorError('\n'.join(errors))
        migration_config_file_kwargs['files_to_migrate'] = converted_data_files

        """ create data-schema migration config file """
        config_file_path = DataSchemaMigration.make_migration_config_file(
            git_repo,
            schema_repo_name,
            add_to_repo=add_to_repo,
            **migration_config_file_kwargs)
        return config_file_path

    @staticmethod
    def load_config_file(data_schema_migration_conf_file):
        """ Load a data-schema migration config file

        Args:
            data_schema_migration_conf_file (:obj:`str`): path to the data-schema migration config file

        Returns:
            :obj:`dict`: the data in the data-schema migration config file

        Raises:
            :obj:`MigratorError`: if the data-schema migration config file cannot be found,
                or is not proper YAML,
                or does not have the right format,
                or does not contain any data
        """
        try:
            fd = open(data_schema_migration_conf_file, 'r')
        except FileNotFoundError:
            raise MigratorError("could not read data-schema migration config file: '{}'".format(
                data_schema_migration_conf_file))
        try:
            data_schema_migration_conf = yaml.load(fd, Loader=yaml.FullLoader)
        except yaml.YAMLError as e:
            raise MigratorError("could not parse YAML data-schema migration config file: '{}':\n{}".format(
                data_schema_migration_conf_file, e))

        errors = []
        if not isinstance(data_schema_migration_conf, dict):
            errors.append('yaml does not contain a dictionary')
        else:
            for attr_name in DataSchemaMigration._CONFIG_ATTRIBUTES:
                if attr_name not in data_schema_migration_conf:
                    errors.append("missing DataSchemaMigration._CONFIG_ATTRIBUTES attribute: '{}'".format(
                        attr_name))
                elif not data_schema_migration_conf[attr_name]:
                    # all attributes must be initialized
                    errors.append("uninitialized DataSchemaMigration._CONFIG_ATTRIBUTES attribute: '{}'".format(
                        attr_name))
        if errors:
            raise MigratorError("invalid data-schema migration config file: '{}'\n{}".format(
                data_schema_migration_conf_file, '\n'.join(errors)))

        return data_schema_migration_conf

    # todo: migrator: direct unittest
    def get_schema_package(self):
        """ Obtain the name of the schema package from the schema file

        Returns:
            :obj:`str`: the package name
        """
        normalized_schema_file = normalize_filename(self.migration_config_data['schema_file'],
                                                    dir=self.schema_git_repo.migrations_dir())
        rel_path = Path(normalized_schema_file).relative_to(self.schema_git_repo.repo_dir)
        return rel_path.parent

    def record_git_repo(self, git_repo):
        """ Record a new :obj:`GitRepo`: so that its temp dir can be deleted later

        Args:
            git_repo (:obj:`GitRepo`): a git repo
        """
        self.git_repos.append(git_repo)

    def clean_up(self):
        """ Delete the temp dirs used by this `DataSchemaMigration`'s git repos
        """
        for git_repo in self.git_repos:
            git_repo.del_temp_dirs()

    def validate(self):
        """ Validate files to migrate, and load all schema changes files

        Raises:
            :obj:`MigratorError`: if any files to migrate do not exist,
                or all schema changes files cannot be loaded
        """
        errors = []
        expanded_files_to_migrate = []
        for file in self.migration_config_data['files_to_migrate']:
            abs_path = normalize_filename(file, dir=self.data_git_repo.migrations_dir())
            if not os.path.isfile(abs_path):
                errors.append("file to migrate '{}', with full path '{}', doesn't exist".format(file, abs_path))
            else:
                expanded_files_to_migrate.append(abs_path)
        if errors:
            raise MigratorError('\n'.join(errors))
        self.migration_config_data['files_to_migrate'] = expanded_files_to_migrate

        # load all schema changes files & make sure each one corresponds to a hash
        errors, self.loaded_schema_changes = SchemaChanges.all_schema_changes_with_commits(self.schema_git_repo)
        if errors:
            raise MigratorError('\n'.join(errors))

    def get_name(self):
        """ Make a timestamped name for a data-schema migration config file

        Returns:
            :obj:`str`: the name

        Raises:
            :obj:`MigratorError`: if either the data or the schema git repo are not initialized
        """
        if self.data_git_repo is None or self.schema_git_repo is None:
            raise MigratorError("To run get_name() data_git_repo and schema_git_repo must be initialized")
        return self.get_name_static(self.data_git_repo.repo_name(), self.schema_git_repo.repo_name())

    @staticmethod
    def get_name_static(data_repo_name, schema_repo_name):
        """ Make a timestamped name for a data-schema migration config file

        Args:
            data_repo_name (:obj:`str`): name of the data repo
            schema_repo_name (:obj:`str`): name of the schema repo

        Returns:
            :obj:`str`: the name
        """
        return DataSchemaMigration._MIGRATION_CONF_NAME_TEMPLATE.format(data_repo_name,
                                                                        schema_repo_name, SchemaChanges.get_date_timestamp())

    def get_data_file_git_commit_hash(self, data_file):
        """ Get the git commit hash of the schema repo that describes a data file

        Args:
            data_file (:obj:`str`): pathname of a data file

        Returns:
            :obj:`str`: the hash

        Raises:
            :obj:`MigratorError`: if `data_file` does not contain a schema repo metadata model
        """
        try:
            metadata = utils.read_metadata_from_file(data_file)
        except Exception as e:
            raise MigratorError("Cannot get schema repo commit hash for '{}':\n{}".format(data_file, e))
        if metadata.schema_repo_metadata:
            return metadata.schema_repo_metadata.revision
        else:
            raise MigratorError("No schema repo commit hash in '{}'".format(data_file))

    def generate_migration_spec(self, data_file, schema_changes):
        """ Generate a `MigrationSpec` from a sequence of schema changes

        The migration will migrate `data_file` in place.

        Args:
            data_file (:obj:`str`): the existing data file that will be migrated
            schema_changes (:obj:`list` of :obj:`SchemaChanges`): a sequence of schema changes instances

        Returns:
            :obj:`MigrationSpec`: a `MigrationSpec` that specifies a migration of the file through
                the sequence of schema changes

        Raises:
            :obj:`MigratorError`: if the `MigrationSpec` that's created doesn't validate
        """
        spec_args = {}
        spec_args['name'] = "{}:{}".format(data_file, self.get_name())
        spec_args['existing_files'] = [data_file]

        # get the schema for the data file
        commit_hash = self.get_data_file_git_commit_hash(data_file)
        git_repo = self.schema_git_repo.copy()
        self.record_git_repo(git_repo)
        git_repo.checkout_commit(commit_hash)
        normalized_schema_file = normalize_filename(self.migration_config_data['schema_file'],
                                                    dir=git_repo.migrations_dir())
        spec_args['schema_files'] = [normalized_schema_file]

        spec_args['seq_of_renamed_models'] = []
        spec_args['seq_of_renamed_attributes'] = []
        spec_args['seq_of_transformations'] = []
        for schema_change in schema_changes:
            # make a copy of each schema commit
            git_repo = self.schema_git_repo.copy()
            self.record_git_repo(git_repo)
            git_repo.checkout_commit(schema_change.commit_hash)
            schema_file = normalize_filename(self.migration_config_data['schema_file'], dir=git_repo.migrations_dir())
            spec_args['schema_files'].append(schema_file)
            spec_args['seq_of_renamed_models'].append(schema_change.renamed_models)
            spec_args['seq_of_renamed_attributes'].append(schema_change.renamed_attributes)
            spec_args['seq_of_transformations'].append(schema_change.transformations)

        spec_args['io_classes'] = self.io_classes
        spec_args['final_schema_git_metadata'] = git_repo.get_metadata(SchemaRepoMetadata)
        spec_args['migrate_in_place'] = True
        migration_spec = MigrationSpec(**spec_args)
        migration_spec.prepare()
        return migration_spec

    def schema_changes_for_data_file(self, data_file):
        """ Generate a sequence of `SchemaChanges` for migrating a data file

        Args:
            data_file (:obj:`str`): a data file in the data git repo

        Returns:
            :obj:`list`: a sequence of `SchemaChanges` for migrating a data file
        """
        # get the schema for the data file
        commit_hash = self.get_data_file_git_commit_hash(data_file)

        # make a SchemaChanges for each schema change in the file's schema dependents
        hashes_of_schema_changes = [sc.commit_hash for sc in self.loaded_schema_changes]
        hashes_of_dependents = \
            [GitRepo.get_hash(dependent) for dependent in self.schema_git_repo.get_dependents(commit_hash)]
        hashes_of_dependent_schema_changes = set(hashes_of_schema_changes).intersection(hashes_of_dependents)
        commits = self.schema_git_repo.get_commits(hashes_of_dependent_schema_changes)

        # create a SchemaChanges for each migration of data_file
        seq_of_schema_changes = []
        for commit in self.schema_git_repo.commits_in_dependency_consistent_seq(commits):
            schema_changes_file = SchemaChanges.find_file(self.schema_git_repo, GitRepo.get_hash(commit))
            seq_of_schema_changes.append(SchemaChanges.generate_instance(schema_changes_file))
        return seq_of_schema_changes

    def import_custom_IO_classes(self, io_classes_file_basename=None):
        """ If the schema repo has an IO classes file, import custom IO classes for accessing data files

        Args:
            io_classes_file_basename (:obj:`str`, optional): custom basename for the custom IO classes file,
                which overrides `CUSTOM_IO_CLASSES_FILE`

        Returns:
            :obj:`dict`: or :obj:`None`: map from 'reader' and/or 'writer' to IO classes, or :obj:`None`:
                if a custom IO classes file doesn't exist

        Raises:
            :obj:`MigratorError`: if the custom IO classes file cannot be imported,
                or if `schema_repo_dir` is deleted from `sys.path` while importing it,
                or neither Reader or Writer are defined in it
        """
        if not io_classes_file_basename:
            io_classes_file_basename = self.CUSTOM_IO_CLASSES_FILE
        if not io_classes_file_basename.endswith('.py'):
            raise MigratorError("custom IO classes file '{}' not a Python file".format(
                io_classes_file_basename))
        schema_repo_dir = self.schema_git_repo.repo_dir
        schema_repo_custom_io_classes_file = os.path.join(schema_repo_dir, self.get_schema_package(),
                                                          'migrations', io_classes_file_basename)
        if os.path.isfile(schema_repo_custom_io_classes_file):

            schema_module = SchemaModule(schema_repo_custom_io_classes_file)
            imported_module = schema_module.import_module_for_migration(validate=False)

            io_classes = {}
            if hasattr(imported_module, 'Reader'):
                io_classes['reader'] = getattr(imported_module, 'Reader')
            if hasattr(imported_module, 'Writer'):
                io_classes['writer'] = getattr(imported_module, 'Writer')
            if not io_classes:
                raise MigratorError("neither Reader or Writer defined in '{}'".format(
                    schema_repo_custom_io_classes_file))
            return io_classes

    def prepare(self):
        """ Prepare for migration

        * Validate this `DataSchemaMigration`
        * Clone each schema version specified by a schema change
        * Generate and prepare :obj:`MigrationSpec`: instances for the migration, one for each file

        Raises:
            :obj:`MigratorError`: if the `DataSchemaMigration` doesn't validate
        """
        self.validate()
        self.migration_specs = []
        self.io_classes = self.import_custom_IO_classes()
        for file_to_migrate in self.migration_config_data['files_to_migrate']:
            schema_changes = self.schema_changes_for_data_file(file_to_migrate)
            migration_spec = self.generate_migration_spec(file_to_migrate, schema_changes)
            self.migration_specs.append(migration_spec)

    def automated_migrate(self, tmp_dir=None):
        """ Migrate the *data* repo's data files

        Migrate to the latest commit referenced by a schema changes file in the *schema* repo, and
        migrate data files in place.
        If the *data* repo passed to `DataSchemaMigration` was a directory, then the migrated
        data files will be stored in that directory.

        Args:
            tmp_dir (:obj:`str`, optional): if the data repo passed to `DataSchemaMigration` was an URL,
                then the migrated files will be returned in a temporary directory.
                If `tmp_dir` is provided then it will contain the migrated files; if not, then
                a temporary directory is created to hold them, and the caller is responsible for
                deleting it.

        Returns:
            :obj:`tuple` of :obj:`list`, :obj:`str`: the migrated files, and the value of `tmp_dir`
        """
        self.prepare()
        # migrate
        all_migrated_files = []
        for migration_spec in self.migration_specs:
            migrated_filenames = MigrationController.migrate_from_spec(migration_spec)
            single_migrated_file = migrated_filenames[0]
            all_migrated_files.append(single_migrated_file)

        if not os.path.isdir(self.data_repo_location):
            # data repo is in a temp dir made by GitRepo.clone_repo_from_url -- copy migrated files to tmp_dir
            dest_files = []
            if tmp_dir is None:
                tmp_dir = tempfile.mkdtemp()
            for migrated_file in all_migrated_files:
                # migrated_file is relative to self.data_git_repo.repo_dir
                relative_migrated_file = os.path.relpath(migrated_file, self.data_git_repo.repo_dir)
                dest = os.path.join(tmp_dir, relative_migrated_file)
                dest_files.append(dest)
                os.makedirs(os.path.dirname(dest))
                # copy migrated_file
                shutil.copyfile(migrated_file, dest)
        self.clean_up()

        # return the paths of the migrated files
        if not os.path.isdir(self.data_repo_location):
            return dest_files, tmp_dir
        else:
            return all_migrated_files, tmp_dir

    def verify_schemas(self):
        """ Verify that each schema can be independently imported

        It can be difficult to import a schema via `importlib.import_module()` in
        `import_module_for_migration()`. This method tests that proactively.

        Returns:
            :obj:`list` of :obj:`str`: all errors obtained
        """
        self.prepare()
        errors = []
        for migration_spec in self.migration_specs:
            for schema_file in migration_spec.schema_files:
                try:
                    SchemaModule(schema_file).import_module_for_migration()
                except MigratorError as e:
                    errors.append("cannot import: '{}'\n\t{}".format(schema_file, e))
        return errors

    @staticmethod
    def migrate_files(schema_url, local_dir, data_files):
        """ Migrate some data files specified by command line input

        Migrate data files in place in a local repository.

        Args:
            schema_url (:obj:`str`): URL of the schema's Python file
            local_dir (:obj:`str`): directory in a local data repo that contains the data files
            data_files (:obj:`list` of :obj:`str`): data files to migrate

        Returns:
            :obj:`list` of :obj:`str`: list of pathnames of migrated files

        Raises:
            :obj:`MigratorError`: if `schema_url` isn't in the right form, or
                `local_dir` isn't a directory, or
                any of the data files cannot be found, or
                the migration fails
        """
        local_dir = os.path.abspath(local_dir)
        config_file_path = DataSchemaMigration.make_data_schema_migration_conf_file_cmd(local_dir,
                                                                                        schema_url, data_files, add_to_repo=False)

        # create and run DataSchemaMigration
        data_schema_migration = DataSchemaMigration(data_repo_location=local_dir,
                                                    data_config_file_basename=os.path.basename(config_file_path))
        migrated_files, _ = data_schema_migration.automated_migrate()
        # delete the temporary data-schema migration config file
        remove_silently(config_file_path)
        return migrated_files

    def test_migration(self):  # pragma: no cover
        """ Test a migration

        Check ...

        The trickiest part of a migration is importing the schema. Unfortunately, imports that use the Python `import`
        command may fail with migration's import, which uses the library call `importlib.import_module`.
        This should be called whenever a schema that may be migrated is changed.

        This method reports:

        * any validation errors in automatic config files
        * any validation errors in schema changes files
        * any errors in transformations
        * any failures to import schemas

        It does not alter any files.

        Args:
            data_repo_location (:obj:`str`): directory or URL of the *data* repo
        """
        '''
        # todo:
            ensure that all of these are OK:
                automatic config files
                schema changes files
                transformations
                schemas
        '''

    def __str__(self):
        """ Provide a string representation

        Returns:
            :obj:`str`: a string representation of this `DataSchemaMigration`
        """
        rv = []
        for attr in self._ATTRIBUTES:
            rv.append("{}: {}".format(attr, getattr(self, attr)))
        return '\n'.join(rv)


class Utils(object):    # pragma: no cover
    """ Utilities for migration """

    @staticmethod
    def find_schema_modules():
        """ Find the modules used by a schema

        Useful for creating schema changes files for a schema repo

        Returns:
            :obj:`argparse.Namespace`: ???
        """
        parser = argparse.ArgumentParser(
            description="Find the modules used by a schema, and their commits")
        parser.add_argument('schema_path', help="path to a Python schema")
        parser.add_argument('repo', help="name of the repo")
        parser.add_argument('-p', '--paths', help="generate list of paths", action="store_true")
        args = parser.parse_args()
        finder = ModuleFinder()
        finder.run_script(args.schema_path)
        if not args.paths:
            max_name = 0
            max_path = 0
            for name, module in finder.modules.items():
                if module.__file__ and module.__file__.startswith(args.repo):
                    max_name = max(max_name, len(module.__name__))
                    max_path = max(max_path, len(module.__file__))
            custom_format = "{{:<{}}}{{:<{}}}".format(max_name + 4, max_path)
            missing, maybe = finder.any_missing_maybe()
            wc_missing = [missin for missin in missing if 'wc_' in missin]
            if wc_missing:
                print('wc_missing:')
                for name in wc_missing:
                    print(name)
            else:
                print('no wc_missing')
            wc_maybe = [perhaps for perhaps in maybe if 'wc_' in perhaps]
            if wc_maybe:
                print('wc_maybe:')
                for name in wc_maybe:
                    print(name)
            else:
                print('no wc_maybe')

            print(custom_format.format('module name', 'module filename'))
            for name, module in finder.modules.items():
                if module.__file__ and module.__file__.startswith(args.repo):
                    print(custom_format.format(module.__name__, module.__file__))
        else:
            paths = []
            for module in finder.modules.values():
                if module.__file__ and module.__file__.startswith(args.repo):
                    paths.append(module.__file__)
            print(' '.join(paths))


# todo: add a controller that tests all migrations configured in data-schema migration config files
class CementControllers(object):
    """ Cement Controllers for cement CLIs in data and schema repos involved with migrating files

    Because these controllers are used by multiple schema and data repos, they're defined here and
    imported into `__main__.py` modules in schema repos that use `ObjTables` to define data schemas
    and into `__main__.py`
    modules in data repos that contain data files to migrate. `wc_lang` is an example schema repo.
    `wc_sim` is an example data repo that contains data files whose schema is defined in `wc_lang`.
    """

    class SchemaChangesTemplateController(Controller):
        """ Create a template schema changes file

        This controller is used by schema repos.
        """

        class Meta:
            label = 'make-changes-template'
            description = 'Create a template schema changes file'
            help = 'Create a template schema changes file'
            stacked_on = 'base'
            stacked_type = 'nested'
            arguments = [
                (['--schema_repo_dir'],
                    {'type': str,
                        'help': "path of the directory of the schema's repository; defaults to the "
                     "current directory",
                        'default': '.'}),
                (['--commit'],
                    {'type': str,
                        'help': "hash of a commit containing the changes; default is most recent commit"}),
            ]

        @cement.ex(hide=True)
        def _default(self):
            """ Make a template schema changes file in the schema repo

            Outputs the path of the template schema changes file that's created, or error(s) produced.
            """
            args = self.app.pargs
            schema_changes_template_file = SchemaChanges.make_template_command(args.schema_repo_dir,
                                                                               commit_hash=args.commit)
            # print directions to complete creating, commit & push the schema changes template file
            print("Created and added template schema changes file: '{}'.".format(schema_changes_template_file))
            print("Describe the schema changes in the file's attributes (renamed_attributes, "
                  "renamed_models, and transformations_file). Then commit and push it with git.")

    class DataSchemaMigrationConfigController(Controller):
        """ Create a data-schema migration configuration file.

        This controller is used by data repos.
        """

        class Meta:
            label = 'make-data-schema-migration-config-file'
            description = 'Create a data-schema migration configuration file'
            help = 'Create a data-schema migration configuration file'
            stacked_on = 'base'
            stacked_type = 'nested'
            arguments = [
                (['--data_repo_dir'],
                    {'type': str,
                        'help': "path of the directory of the repository storing the data file(s) to migrate; "
                     "defaults to the current directory",
                        'default': '.'}),
                (['schema_url'], {'type': str,
                                  'help': 'URL of the schema in its git repository, including the branch'}),
                (['file_to_migrate'],
                    dict(action='store', type=str, nargs='+', help='a file to migrate')),
            ]

        @cement.ex(hide=True)
        def _default(self):
            args = self.app.pargs
            # args.file_to_migrate is a list of all files to migrate
            migration_config_file = DataSchemaMigration.make_data_schema_migration_conf_file_cmd(
                args.data_repo_dir, args.schema_url, args.file_to_migrate)
            # print directions to commit & push the data-schema migration config file
            print("Data-schema migration config file created: '{}'".format(migration_config_file))
            print("Commit and push it with git.")

    class MigrateController(Controller):
        """ Perform a migration configured by a data-schema migration config file

        This controller is used by data repos.
        """

        class Meta:
            label = 'do-configured-migration'
            description = 'Migrate data file(s) as configured in a data-schema migration configuration file'
            help = 'Migrate data file(s) as configured in a data-schema migration configuration file'
            stacked_on = 'base'
            stacked_type = 'nested'
            arguments = [
                (['migration_config_file'],
                    {'type': str, 'help': 'name of the data-schema migration configuration file to use'})
            ]

        @cement.ex(hide=True)
        def _default(self):
            args = self.app.pargs
            migration_config_basename = Path(args.migration_config_file).name
            with warnings.catch_warnings():
                warnings.simplefilter("ignore", category=MigrateWarning)
                data_schema_migration = DataSchemaMigration(
                    **dict(data_repo_location='.', data_config_file_basename=migration_config_basename))
                migrated_files, _ = data_schema_migration.automated_migrate()

                for migrated_file in migrated_files:
                    print("'{}' migrated in place".format(migrated_file))

    class MigrateFileController(Controller):
        """ Migrate specified data file(s)

        This controller is used by data repos.
        """

        class Meta:
            label = 'migrate-data'
            description = 'Migrate specified data file(s)'
            help = 'Migrate specified data file(s)'
            stacked_on = 'base'
            stacked_type = 'nested'
            arguments = [
                (['--data_repo_dir'],
                    {'type': str,
                        'help': "path of the directory of the repository storing the data file(s) to migrate; "
                     "defaults to the current directory",
                        'default': '.'}),
                (['schema_url'], {'type': str,
                                  'help': 'URL of the schema in its git repository, including the branch'}),
                (['file_to_migrate'],
                    dict(action='store', type=str, nargs='+',
                         help='a file to migrate')),
            ]

        @cement.ex(hide=True)
        def _default(self):
            args = self.app.pargs
            # args.file_to_migrate is a list of all files to migrate
            migrated_files = DataSchemaMigration.migrate_files(args.schema_url, args.data_repo_dir,
                                                               args.file_to_migrate)
            print('migrated files:')
            for migrated_file in migrated_files:
                print(migrated_file)


# controllers that can be added to the __main__.App.Meta.handlers in a data repo; e.g., see wc_sim
data_repo_migration_controllers = [
    CementControllers.DataSchemaMigrationConfigController,
    CementControllers.MigrateController,
    CementControllers.MigrateFileController
]


# controllers that can be added to the __main__.App.Meta.handlers in a schema repo
schema_repo_migration_controllers = [
    CementControllers.SchemaChangesTemplateController
]


class VirtualEnvUtil(object):   # pragma: no cover
    # INCOMPLETE: started and not finished; not tested
    # NEEDS:
    # from virtualenvapi.manage import VirtualEnvironment
    # import virtualenvapi
    """ Support creation, use and distruction of virtual environments for Python packages

    Will be used to allow different schema versions depend on different package versions

    Attributes:
        name (:obj:`str`): name of the `VirtualEnvUtil`
    """

    def __init__(self, name, dir=None):
        """ Initialize a `VirtualEnvUtil`

        Args:
            name (:obj:`str`): name for the `VirtualEnvUtil`
            dir (:obj:`str`, optional): a directory to hold the `VirtualEnvUtil`
        """
        if re.search(r'\s', name):
            raise ValueError("name '{}' may not contain whitespace".format(name))
        self.name = name
        if dir is None:
            dir = tempfile.mkdtemp()
        self.virtualenv_dir = os.path.join(dir, name)
        if os.path.isdir(self.virtualenv_dir):
            raise ValueError("directory '{}' already exists".format(self.virtualenv_dir))
        os.mkdir(self.virtualenv_dir)
        self.env = VirtualEnvironment(self.virtualenv_dir)

    def is_installed(self, pip_spec):
        return self.env.is_installed(pip_spec)

    def install_from_pip_spec(self, pip_spec):
        """ Install a package from a `pip` specification

        Args:
            pip_spec (:obj:`str`): a `pip` specification for a package to load

        Raises:
            :obj:`ValueError`: if the package described by `pip_spec` cannot be installed
        """
        try:
            self.env.install(pip_spec)
        except virtualenvapi.exceptions.PackageInstallationException as e:
            print('returncode', e.returncode)
            print('output', e.output)
            print('package', e.package)

    def activate(self):
        """ Use this `VirtualEnvUtil`
        """
        # put the env on sys.path
        pass

    def deactivate(self):
        """ Stop using this `VirtualEnvUtil`
        """
        # remove this env from sys.path
        pass

    def destroy(self):
        """ Destroy this `VirtualEnvUtil`

        Distruction deletes the directory storing the `VirtualEnvUtil`
        """
        shutil.rmtree(self.virtualenv_dir)

    def destroyed(self):
        """ Test whether this `VirtualEnvUtil` has been destroyed
        """
        return not os.path.isdir(self.virtualenv_dir)