django/django

View on GitHub
django/db/backends/base/schema.py

Summary

Maintainability
F
1 wk
Test Coverage
import logging
import operator
from datetime import datetime

from django.conf import settings
from django.db.backends.ddl_references import (
    Columns,
    Expressions,
    ForeignKeyName,
    IndexName,
    Statement,
    Table,
)
from django.db.backends.utils import names_digest, split_identifier, truncate_name
from django.db.models import NOT_PROVIDED, Deferrable, Index
from django.db.models.sql import Query
from django.db.transaction import TransactionManagementError, atomic
from django.utils import timezone

logger = logging.getLogger("django.db.backends.schema")


def _is_relevant_relation(relation, altered_field):
    """
    When altering the given field, must constraints on its model from the given
    relation be temporarily dropped?
    """
    field = relation.field
    if field.many_to_many:
        # M2M reverse field
        return False
    if altered_field.primary_key and field.to_fields == [None]:
        # Foreign key constraint on the primary key, which is being altered.
        return True
    # Is the constraint targeting the field being altered?
    return altered_field.name in field.to_fields


def _all_related_fields(model):
    # Related fields must be returned in a deterministic order.
    return sorted(
        model._meta._get_fields(
            forward=False,
            reverse=True,
            include_hidden=True,
            include_parents=False,
        ),
        key=operator.attrgetter("name"),
    )


def _related_non_m2m_objects(old_field, new_field):
    # Filter out m2m objects from reverse relations.
    # Return (old_relation, new_relation) tuples.
    related_fields = zip(
        (
            obj
            for obj in _all_related_fields(old_field.model)
            if _is_relevant_relation(obj, old_field)
        ),
        (
            obj
            for obj in _all_related_fields(new_field.model)
            if _is_relevant_relation(obj, new_field)
        ),
    )
    for old_rel, new_rel in related_fields:
        yield old_rel, new_rel
        yield from _related_non_m2m_objects(
            old_rel.remote_field,
            new_rel.remote_field,
        )


class BaseDatabaseSchemaEditor:
    """
    This class and its subclasses are responsible for emitting schema-changing
    statements to the databases - model creation/removal/alteration, field
    renaming, index fiddling, and so on.
    """

    # Overrideable SQL templates
    sql_create_table = "CREATE TABLE %(table)s (%(definition)s)"
    sql_rename_table = "ALTER TABLE %(old_table)s RENAME TO %(new_table)s"
    sql_retablespace_table = "ALTER TABLE %(table)s SET TABLESPACE %(new_tablespace)s"
    sql_delete_table = "DROP TABLE %(table)s CASCADE"

    sql_create_column = "ALTER TABLE %(table)s ADD COLUMN %(column)s %(definition)s"
    sql_alter_column = "ALTER TABLE %(table)s %(changes)s"
    sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s%(collation)s"
    sql_alter_column_null = "ALTER COLUMN %(column)s DROP NOT NULL"
    sql_alter_column_not_null = "ALTER COLUMN %(column)s SET NOT NULL"
    sql_alter_column_default = "ALTER COLUMN %(column)s SET DEFAULT %(default)s"
    sql_alter_column_no_default = "ALTER COLUMN %(column)s DROP DEFAULT"
    sql_alter_column_no_default_null = sql_alter_column_no_default
    sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s CASCADE"
    sql_rename_column = (
        "ALTER TABLE %(table)s RENAME COLUMN %(old_column)s TO %(new_column)s"
    )
    sql_update_with_default = (
        "UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL"
    )

    sql_unique_constraint = "UNIQUE (%(columns)s)%(deferrable)s"
    sql_check_constraint = "CHECK (%(check)s)"
    sql_delete_constraint = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
    sql_constraint = "CONSTRAINT %(name)s %(constraint)s"

    sql_create_check = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s CHECK (%(check)s)"
    sql_delete_check = sql_delete_constraint

    sql_create_unique = (
        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s "
        "UNIQUE%(nulls_distinct)s (%(columns)s)%(deferrable)s"
    )
    sql_delete_unique = sql_delete_constraint

    sql_create_fk = (
        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
        "REFERENCES %(to_table)s (%(to_column)s)%(deferrable)s"
    )
    sql_create_inline_fk = None
    sql_create_column_inline_fk = None
    sql_delete_fk = sql_delete_constraint

    sql_create_index = (
        "CREATE INDEX %(name)s ON %(table)s "
        "(%(columns)s)%(include)s%(extra)s%(condition)s"
    )
    sql_create_unique_index = (
        "CREATE UNIQUE INDEX %(name)s ON %(table)s "
        "(%(columns)s)%(include)s%(nulls_distinct)s%(condition)s"
    )
    sql_rename_index = "ALTER INDEX %(old_name)s RENAME TO %(new_name)s"
    sql_delete_index = "DROP INDEX %(name)s"

    sql_create_pk = (
        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
    )
    sql_delete_pk = sql_delete_constraint

    sql_delete_procedure = "DROP PROCEDURE %(procedure)s"

    sql_alter_table_comment = "COMMENT ON TABLE %(table)s IS %(comment)s"
    sql_alter_column_comment = "COMMENT ON COLUMN %(table)s.%(column)s IS %(comment)s"

    def __init__(self, connection, collect_sql=False, atomic=True):
        self.connection = connection
        self.collect_sql = collect_sql
        if self.collect_sql:
            self.collected_sql = []
        self.atomic_migration = self.connection.features.can_rollback_ddl and atomic

    # State-managing methods

    def __enter__(self):
        self.deferred_sql = []
        if self.atomic_migration:
            self.atomic = atomic(self.connection.alias)
            self.atomic.__enter__()
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        if exc_type is None:
            for sql in self.deferred_sql:
                self.execute(sql)
        if self.atomic_migration:
            self.atomic.__exit__(exc_type, exc_value, traceback)

    # Core utility functions

    def execute(self, sql, params=()):
        """Execute the given SQL statement, with optional parameters."""
        # Don't perform the transactional DDL check if SQL is being collected
        # as it's not going to be executed anyway.
        if (
            not self.collect_sql
            and self.connection.in_atomic_block
            and not self.connection.features.can_rollback_ddl
        ):
            raise TransactionManagementError(
                "Executing DDL statements while in a transaction on databases "
                "that can't perform a rollback is prohibited."
            )
        # Account for non-string statement objects.
        sql = str(sql)
        # Log the command we're running, then run it
        logger.debug(
            "%s; (params %r)", sql, params, extra={"params": params, "sql": sql}
        )
        if self.collect_sql:
            ending = "" if sql.rstrip().endswith(";") else ";"
            if params is not None:
                self.collected_sql.append(
                    (sql % tuple(map(self.quote_value, params))) + ending
                )
            else:
                self.collected_sql.append(sql + ending)
        else:
            with self.connection.cursor() as cursor:
                cursor.execute(sql, params)

    def quote_name(self, name):
        return self.connection.ops.quote_name(name)

    def table_sql(self, model):
        """Take a model and return its table definition."""
        # Add any unique_togethers (always deferred, as some fields might be
        # created afterward, like geometry fields with some backends).
        for field_names in model._meta.unique_together:
            fields = [model._meta.get_field(field) for field in field_names]
            self.deferred_sql.append(self._create_unique_sql(model, fields))
        # Create column SQL, add FK deferreds if needed.
        column_sqls = []
        params = []
        for field in model._meta.local_fields:
            # SQL.
            definition, extra_params = self.column_sql(model, field)
            if definition is None:
                continue
            # Check constraints can go on the column SQL here.
            db_params = field.db_parameters(connection=self.connection)
            if db_params["check"]:
                definition += " " + self.sql_check_constraint % db_params
            # Autoincrement SQL (for backends with inline variant).
            col_type_suffix = field.db_type_suffix(connection=self.connection)
            if col_type_suffix:
                definition += " %s" % col_type_suffix
            params.extend(extra_params)
            # FK.
            if field.remote_field and field.db_constraint:
                to_table = field.remote_field.model._meta.db_table
                to_column = field.remote_field.model._meta.get_field(
                    field.remote_field.field_name
                ).column
                if self.sql_create_inline_fk:
                    definition += " " + self.sql_create_inline_fk % {
                        "to_table": self.quote_name(to_table),
                        "to_column": self.quote_name(to_column),
                    }
                elif self.connection.features.supports_foreign_keys:
                    self.deferred_sql.append(
                        self._create_fk_sql(
                            model, field, "_fk_%(to_table)s_%(to_column)s"
                        )
                    )
            # Add the SQL to our big list.
            column_sqls.append(
                "%s %s"
                % (
                    self.quote_name(field.column),
                    definition,
                )
            )
            # Autoincrement SQL (for backends with post table definition
            # variant).
            if field.get_internal_type() in (
                "AutoField",
                "BigAutoField",
                "SmallAutoField",
            ):
                autoinc_sql = self.connection.ops.autoinc_sql(
                    model._meta.db_table, field.column
                )
                if autoinc_sql:
                    self.deferred_sql.extend(autoinc_sql)
        constraints = [
            constraint.constraint_sql(model, self)
            for constraint in model._meta.constraints
        ]
        sql = self.sql_create_table % {
            "table": self.quote_name(model._meta.db_table),
            "definition": ", ".join(
                str(constraint)
                for constraint in (*column_sqls, *constraints)
                if constraint
            ),
        }
        if model._meta.db_tablespace:
            tablespace_sql = self.connection.ops.tablespace_sql(
                model._meta.db_tablespace
            )
            if tablespace_sql:
                sql += " " + tablespace_sql
        return sql, params

    # Field <-> database mapping functions

    def _iter_column_sql(
        self, column_db_type, params, model, field, field_db_params, include_default
    ):
        yield column_db_type
        if collation := field_db_params.get("collation"):
            yield self._collate_sql(collation)
        if self.connection.features.supports_comments_inline and field.db_comment:
            yield self._comment_sql(field.db_comment)
        # Work out nullability.
        null = field.null
        # Add database default.
        if field.db_default is not NOT_PROVIDED:
            default_sql, default_params = self.db_default_sql(field)
            yield f"DEFAULT {default_sql}"
            params.extend(default_params)
            include_default = False
        # Include a default value, if requested.
        include_default = (
            include_default
            and not self.skip_default(field)
            and
            # Don't include a default value if it's a nullable field and the
            # default cannot be dropped in the ALTER COLUMN statement (e.g.
            # MySQL longtext and longblob).
            not (null and self.skip_default_on_alter(field))
        )
        if include_default:
            default_value = self.effective_default(field)
            if default_value is not None:
                column_default = "DEFAULT " + self._column_default_sql(field)
                if self.connection.features.requires_literal_defaults:
                    # Some databases can't take defaults as a parameter
                    # (Oracle, SQLite). If this is the case, the individual
                    # schema backend should implement prepare_default().
                    yield column_default % self.prepare_default(default_value)
                else:
                    yield column_default
                    params.append(default_value)
        # Oracle treats the empty string ('') as null, so coerce the null
        # option whenever '' is a possible value.
        if (
            field.empty_strings_allowed
            and not field.primary_key
            and self.connection.features.interprets_empty_strings_as_nulls
        ):
            null = True
        if field.generated:
            generated_sql, generated_params = self._column_generated_sql(field)
            params.extend(generated_params)
            yield generated_sql
        elif not null:
            yield "NOT NULL"
        elif not self.connection.features.implied_column_null:
            yield "NULL"
        if field.primary_key:
            yield "PRIMARY KEY"
        elif field.unique:
            yield "UNIQUE"
        # Optionally add the tablespace if it's an implicitly indexed column.
        tablespace = field.db_tablespace or model._meta.db_tablespace
        if (
            tablespace
            and self.connection.features.supports_tablespaces
            and field.unique
        ):
            yield self.connection.ops.tablespace_sql(tablespace, inline=True)

    def column_sql(self, model, field, include_default=False):
        """
        Return the column definition for a field. The field must already have
        had set_attributes_from_name() called.
        """
        # Get the column's type and use that as the basis of the SQL.
        field_db_params = field.db_parameters(connection=self.connection)
        column_db_type = field_db_params["type"]
        # Check for fields that aren't actually columns (e.g. M2M).
        if column_db_type is None:
            return None, None
        params = []
        return (
            " ".join(
                # This appends to the params being returned.
                self._iter_column_sql(
                    column_db_type,
                    params,
                    model,
                    field,
                    field_db_params,
                    include_default,
                )
            ),
            params,
        )

    def skip_default(self, field):
        """
        Some backends don't accept default values for certain columns types
        (i.e. MySQL longtext and longblob).
        """
        return False

    def skip_default_on_alter(self, field):
        """
        Some backends don't accept default values for certain columns types
        (i.e. MySQL longtext and longblob) in the ALTER COLUMN statement.
        """
        return False

    def prepare_default(self, value):
        """
        Only used for backends which have requires_literal_defaults feature
        """
        raise NotImplementedError(
            "subclasses of BaseDatabaseSchemaEditor for backends which have "
            "requires_literal_defaults must provide a prepare_default() method"
        )

    def _column_default_sql(self, field):
        """
        Return the SQL to use in a DEFAULT clause. The resulting string should
        contain a '%s' placeholder for a default value.
        """
        return "%s"

    def db_default_sql(self, field):
        """Return the sql and params for the field's database default."""
        from django.db.models.expressions import Value

        db_default = field._db_default_expression
        sql = (
            self._column_default_sql(field) if isinstance(db_default, Value) else "(%s)"
        )
        query = Query(model=field.model)
        compiler = query.get_compiler(connection=self.connection)
        default_sql, params = compiler.compile(db_default)
        if self.connection.features.requires_literal_defaults:
            # Some databases don't support parameterized defaults (Oracle,
            # SQLite). If this is the case, the individual schema backend
            # should implement prepare_default().
            default_sql %= tuple(self.prepare_default(p) for p in params)
            params = []
        return sql % default_sql, params

    def _column_generated_sql(self, field):
        """Return the SQL to use in a GENERATED ALWAYS clause."""
        expression_sql, params = field.generated_sql(self.connection)
        persistency_sql = "STORED" if field.db_persist else "VIRTUAL"
        if self.connection.features.requires_literal_defaults:
            expression_sql = expression_sql % tuple(self.quote_value(p) for p in params)
            params = ()
        return f"GENERATED ALWAYS AS ({expression_sql}) {persistency_sql}", params

    @staticmethod
    def _effective_default(field):
        # This method allows testing its logic without a connection.
        if field.has_default():
            default = field.get_default()
        elif field.generated:
            default = None
        elif not field.null and field.blank and field.empty_strings_allowed:
            if field.get_internal_type() == "BinaryField":
                default = b""
            else:
                default = ""
        elif getattr(field, "auto_now", False) or getattr(field, "auto_now_add", False):
            internal_type = field.get_internal_type()
            if internal_type == "DateTimeField":
                default = timezone.now()
            else:
                default = datetime.now()
                if internal_type == "DateField":
                    default = default.date()
                elif internal_type == "TimeField":
                    default = default.time()
        else:
            default = None
        return default

    def effective_default(self, field):
        """Return a field's effective database default value."""
        return field.get_db_prep_save(self._effective_default(field), self.connection)

    def quote_value(self, value):
        """
        Return a quoted version of the value so it's safe to use in an SQL
        string. This is not safe against injection from user code; it is
        intended only for use in making SQL scripts or preparing default values
        for particularly tricky backends (defaults are not user-defined, though,
        so this is safe).
        """
        raise NotImplementedError()

    # Actions

    def create_model(self, model):
        """
        Create a table and any accompanying indexes or unique constraints for
        the given `model`.
        """
        sql, params = self.table_sql(model)
        # Prevent using [] as params, in the case a literal '%' is used in the
        # definition on backends that don't support parametrized DDL.
        self.execute(sql, params or None)

        if self.connection.features.supports_comments:
            # Add table comment.
            if model._meta.db_table_comment:
                self.alter_db_table_comment(model, None, model._meta.db_table_comment)
            # Add column comments.
            if not self.connection.features.supports_comments_inline:
                for field in model._meta.local_fields:
                    if field.db_comment:
                        field_db_params = field.db_parameters(
                            connection=self.connection
                        )
                        field_type = field_db_params["type"]
                        self.execute(
                            *self._alter_column_comment_sql(
                                model, field, field_type, field.db_comment
                            )
                        )
        # Add any field index (deferred as SQLite _remake_table needs it).
        self.deferred_sql.extend(self._model_indexes_sql(model))

        # Make M2M tables
        for field in model._meta.local_many_to_many:
            if field.remote_field.through._meta.auto_created:
                self.create_model(field.remote_field.through)

    def delete_model(self, model):
        """Delete a model from the database."""
        # Handle auto-created intermediary models
        for field in model._meta.local_many_to_many:
            if field.remote_field.through._meta.auto_created:
                self.delete_model(field.remote_field.through)

        # Delete the table
        self.execute(
            self.sql_delete_table
            % {
                "table": self.quote_name(model._meta.db_table),
            }
        )
        # Remove all deferred statements referencing the deleted table.
        for sql in list(self.deferred_sql):
            if isinstance(sql, Statement) and sql.references_table(
                model._meta.db_table
            ):
                self.deferred_sql.remove(sql)

    def add_index(self, model, index):
        """Add an index on a model."""
        if (
            index.contains_expressions
            and not self.connection.features.supports_expression_indexes
        ):
            return None
        # Index.create_sql returns interpolated SQL which makes params=None a
        # necessity to avoid escaping attempts on execution.
        self.execute(index.create_sql(model, self), params=None)

    def remove_index(self, model, index):
        """Remove an index from a model."""
        if (
            index.contains_expressions
            and not self.connection.features.supports_expression_indexes
        ):
            return None
        self.execute(index.remove_sql(model, self))

    def rename_index(self, model, old_index, new_index):
        if self.connection.features.can_rename_index:
            self.execute(
                self._rename_index_sql(model, old_index.name, new_index.name),
                params=None,
            )
        else:
            self.remove_index(model, old_index)
            self.add_index(model, new_index)

    def add_constraint(self, model, constraint):
        """Add a constraint to a model."""
        sql = constraint.create_sql(model, self)
        if sql:
            # Constraint.create_sql returns interpolated SQL which makes
            # params=None a necessity to avoid escaping attempts on execution.
            self.execute(sql, params=None)

    def remove_constraint(self, model, constraint):
        """Remove a constraint from a model."""
        sql = constraint.remove_sql(model, self)
        if sql:
            self.execute(sql)

    def alter_unique_together(self, model, old_unique_together, new_unique_together):
        """
        Deal with a model changing its unique_together. The input
        unique_togethers must be doubly-nested, not the single-nested
        ["foo", "bar"] format.
        """
        olds = {tuple(fields) for fields in old_unique_together}
        news = {tuple(fields) for fields in new_unique_together}
        # Deleted uniques
        for fields in olds.difference(news):
            self._delete_composed_index(
                model,
                fields,
                {"unique": True, "primary_key": False},
                self.sql_delete_unique,
            )
        # Created uniques
        for field_names in news.difference(olds):
            fields = [model._meta.get_field(field) for field in field_names]
            self.execute(self._create_unique_sql(model, fields))

    def alter_index_together(self, model, old_index_together, new_index_together):
        """
        Deal with a model changing its index_together. The input
        index_togethers must be doubly-nested, not the single-nested
        ["foo", "bar"] format.
        """
        olds = {tuple(fields) for fields in old_index_together}
        news = {tuple(fields) for fields in new_index_together}
        # Deleted indexes
        for fields in olds.difference(news):
            self._delete_composed_index(
                model,
                fields,
                {"index": True, "unique": False},
                self.sql_delete_index,
            )
        # Created indexes
        for field_names in news.difference(olds):
            fields = [model._meta.get_field(field) for field in field_names]
            self.execute(self._create_index_sql(model, fields=fields, suffix="_idx"))

    def _delete_composed_index(self, model, fields, constraint_kwargs, sql):
        meta_constraint_names = {
            constraint.name for constraint in model._meta.constraints
        }
        meta_index_names = {constraint.name for constraint in model._meta.indexes}
        columns = [model._meta.get_field(field).column for field in fields]
        constraint_names = self._constraint_names(
            model,
            columns,
            exclude=meta_constraint_names | meta_index_names,
            **constraint_kwargs,
        )
        if (
            constraint_kwargs.get("unique") is True
            and constraint_names
            and self.connection.features.allows_multiple_constraints_on_same_fields
        ):
            # Constraint matching the unique_together name.
            default_name = str(
                self._unique_constraint_name(model._meta.db_table, columns, quote=False)
            )
            if default_name in constraint_names:
                constraint_names = [default_name]
        if len(constraint_names) != 1:
            raise ValueError(
                "Found wrong number (%s) of constraints for %s(%s)"
                % (
                    len(constraint_names),
                    model._meta.db_table,
                    ", ".join(columns),
                )
            )
        self.execute(self._delete_constraint_sql(sql, model, constraint_names[0]))

    def alter_db_table(self, model, old_db_table, new_db_table):
        """Rename the table a model points to."""
        if old_db_table == new_db_table or (
            self.connection.features.ignores_table_name_case
            and old_db_table.lower() == new_db_table.lower()
        ):
            return
        self.execute(
            self.sql_rename_table
            % {
                "old_table": self.quote_name(old_db_table),
                "new_table": self.quote_name(new_db_table),
            }
        )
        # Rename all references to the old table name.
        for sql in self.deferred_sql:
            if isinstance(sql, Statement):
                sql.rename_table_references(old_db_table, new_db_table)

    def alter_db_table_comment(self, model, old_db_table_comment, new_db_table_comment):
        if self.sql_alter_table_comment and self.connection.features.supports_comments:
            self.execute(
                self.sql_alter_table_comment
                % {
                    "table": self.quote_name(model._meta.db_table),
                    "comment": self.quote_value(new_db_table_comment or ""),
                }
            )

    def alter_db_tablespace(self, model, old_db_tablespace, new_db_tablespace):
        """Move a model's table between tablespaces."""
        self.execute(
            self.sql_retablespace_table
            % {
                "table": self.quote_name(model._meta.db_table),
                "old_tablespace": self.quote_name(old_db_tablespace),
                "new_tablespace": self.quote_name(new_db_tablespace),
            }
        )

    def add_field(self, model, field):
        """
        Create a field on a model. Usually involves adding a column, but may
        involve adding a table instead (for M2M fields).
        """
        # Special-case implicit M2M tables
        if field.many_to_many and field.remote_field.through._meta.auto_created:
            return self.create_model(field.remote_field.through)
        # Get the column's definition
        definition, params = self.column_sql(model, field, include_default=True)
        # It might not actually have a column behind it
        if definition is None:
            return
        if col_type_suffix := field.db_type_suffix(connection=self.connection):
            definition += f" {col_type_suffix}"
        # Check constraints can go on the column SQL here
        db_params = field.db_parameters(connection=self.connection)
        if db_params["check"]:
            definition += " " + self.sql_check_constraint % db_params
        if (
            field.remote_field
            and self.connection.features.supports_foreign_keys
            and field.db_constraint
        ):
            constraint_suffix = "_fk_%(to_table)s_%(to_column)s"
            # Add FK constraint inline, if supported.
            if self.sql_create_column_inline_fk:
                to_table = field.remote_field.model._meta.db_table
                to_column = field.remote_field.model._meta.get_field(
                    field.remote_field.field_name
                ).column
                namespace, _ = split_identifier(model._meta.db_table)
                definition += " " + self.sql_create_column_inline_fk % {
                    "name": self._fk_constraint_name(model, field, constraint_suffix),
                    "namespace": (
                        "%s." % self.quote_name(namespace) if namespace else ""
                    ),
                    "column": self.quote_name(field.column),
                    "to_table": self.quote_name(to_table),
                    "to_column": self.quote_name(to_column),
                    "deferrable": self.connection.ops.deferrable_sql(),
                }
            # Otherwise, add FK constraints later.
            else:
                self.deferred_sql.append(
                    self._create_fk_sql(model, field, constraint_suffix)
                )
        # Build the SQL and run it
        sql = self.sql_create_column % {
            "table": self.quote_name(model._meta.db_table),
            "column": self.quote_name(field.column),
            "definition": definition,
        }
        # Prevent using [] as params, in the case a literal '%' is used in the
        # definition on backends that don't support parametrized DDL.
        self.execute(sql, params or None)
        # Drop the default if we need to
        if (
            field.db_default is NOT_PROVIDED
            and not self.skip_default_on_alter(field)
            and self.effective_default(field) is not None
        ):
            changes_sql, params = self._alter_column_default_sql(
                model, None, field, drop=True
            )
            sql = self.sql_alter_column % {
                "table": self.quote_name(model._meta.db_table),
                "changes": changes_sql,
            }
            self.execute(sql, params)
        # Add field comment, if required.
        if (
            field.db_comment
            and self.connection.features.supports_comments
            and not self.connection.features.supports_comments_inline
        ):
            field_type = db_params["type"]
            self.execute(
                *self._alter_column_comment_sql(
                    model, field, field_type, field.db_comment
                )
            )
        # Add an index, if required
        self.deferred_sql.extend(self._field_indexes_sql(model, field))
        # Reset connection if required
        if self.connection.features.connection_persists_old_columns:
            self.connection.close()

    def remove_field(self, model, field):
        """
        Remove a field from a model. Usually involves deleting a column,
        but for M2Ms may involve deleting a table.
        """
        # Special-case implicit M2M tables
        if field.many_to_many and field.remote_field.through._meta.auto_created:
            return self.delete_model(field.remote_field.through)
        # It might not actually have a column behind it
        if field.db_parameters(connection=self.connection)["type"] is None:
            return
        # Drop any FK constraints, MySQL requires explicit deletion
        if field.remote_field:
            fk_names = self._constraint_names(model, [field.column], foreign_key=True)
            for fk_name in fk_names:
                self.execute(self._delete_fk_sql(model, fk_name))
        # Delete the column
        sql = self.sql_delete_column % {
            "table": self.quote_name(model._meta.db_table),
            "column": self.quote_name(field.column),
        }
        self.execute(sql)
        # Reset connection if required
        if self.connection.features.connection_persists_old_columns:
            self.connection.close()
        # Remove all deferred statements referencing the deleted column.
        for sql in list(self.deferred_sql):
            if isinstance(sql, Statement) and sql.references_column(
                model._meta.db_table, field.column
            ):
                self.deferred_sql.remove(sql)

    def alter_field(self, model, old_field, new_field, strict=False):
        """
        Allow a field's type, uniqueness, nullability, default, column,
        constraints, etc. to be modified.
        `old_field` is required to compute the necessary changes.
        If `strict` is True, raise errors if the old column does not match
        `old_field` precisely.
        """
        if not self._field_should_be_altered(old_field, new_field):
            return
        # Ensure this field is even column-based
        old_db_params = old_field.db_parameters(connection=self.connection)
        old_type = old_db_params["type"]
        new_db_params = new_field.db_parameters(connection=self.connection)
        new_type = new_db_params["type"]
        if (old_type is None and old_field.remote_field is None) or (
            new_type is None and new_field.remote_field is None
        ):
            raise ValueError(
                "Cannot alter field %s into %s - they do not properly define "
                "db_type (are you using a badly-written custom field?)"
                % (old_field, new_field),
            )
        elif (
            old_type is None
            and new_type is None
            and (
                old_field.remote_field.through
                and new_field.remote_field.through
                and old_field.remote_field.through._meta.auto_created
                and new_field.remote_field.through._meta.auto_created
            )
        ):
            return self._alter_many_to_many(model, old_field, new_field, strict)
        elif (
            old_type is None
            and new_type is None
            and (
                old_field.remote_field.through
                and new_field.remote_field.through
                and not old_field.remote_field.through._meta.auto_created
                and not new_field.remote_field.through._meta.auto_created
            )
        ):
            # Both sides have through models; this is a no-op.
            return
        elif old_type is None or new_type is None:
            raise ValueError(
                "Cannot alter field %s into %s - they are not compatible types "
                "(you cannot alter to or from M2M fields, or add or remove "
                "through= on M2M fields)" % (old_field, new_field)
            )
        elif old_field.generated != new_field.generated or (
            new_field.generated
            and (
                old_field.db_persist != new_field.db_persist
                or old_field.generated_sql(self.connection)
                != new_field.generated_sql(self.connection)
            )
        ):
            raise ValueError(
                f"Modifying GeneratedFields is not supported - the field {new_field} "
                "must be removed and re-added with the new definition."
            )

        self._alter_field(
            model,
            old_field,
            new_field,
            old_type,
            new_type,
            old_db_params,
            new_db_params,
            strict,
        )

    def _field_db_check(self, field, field_db_params):
        # Always check constraints with the same mocked column name to avoid
        # recreating constrains when the column is renamed.
        check_constraints = self.connection.data_type_check_constraints
        data = field.db_type_parameters(self.connection)
        data["column"] = "__column_name__"
        try:
            return check_constraints[field.get_internal_type()] % data
        except KeyError:
            return None

    def _alter_field(
        self,
        model,
        old_field,
        new_field,
        old_type,
        new_type,
        old_db_params,
        new_db_params,
        strict=False,
    ):
        """Perform a "physical" (non-ManyToMany) field update."""
        # Drop any FK constraints, we'll remake them later
        fks_dropped = set()
        if (
            self.connection.features.supports_foreign_keys
            and old_field.remote_field
            and old_field.db_constraint
            and self._field_should_be_altered(
                old_field,
                new_field,
                ignore={"db_comment"},
            )
        ):
            fk_names = self._constraint_names(
                model, [old_field.column], foreign_key=True
            )
            if strict and len(fk_names) != 1:
                raise ValueError(
                    "Found wrong number (%s) of foreign key constraints for %s.%s"
                    % (
                        len(fk_names),
                        model._meta.db_table,
                        old_field.column,
                    )
                )
            for fk_name in fk_names:
                fks_dropped.add((old_field.column,))
                self.execute(self._delete_fk_sql(model, fk_name))
        # Has unique been removed?
        if old_field.unique and (
            not new_field.unique or self._field_became_primary_key(old_field, new_field)
        ):
            # Find the unique constraint for this field
            meta_constraint_names = {
                constraint.name for constraint in model._meta.constraints
            }
            constraint_names = self._constraint_names(
                model,
                [old_field.column],
                unique=True,
                primary_key=False,
                exclude=meta_constraint_names,
            )
            if strict and len(constraint_names) != 1:
                raise ValueError(
                    "Found wrong number (%s) of unique constraints for %s.%s"
                    % (
                        len(constraint_names),
                        model._meta.db_table,
                        old_field.column,
                    )
                )
            for constraint_name in constraint_names:
                self.execute(self._delete_unique_sql(model, constraint_name))
        # Drop incoming FK constraints if the field is a primary key or unique,
        # which might be a to_field target, and things are going to change.
        old_collation = old_db_params.get("collation")
        new_collation = new_db_params.get("collation")
        drop_foreign_keys = (
            self.connection.features.supports_foreign_keys
            and (
                (old_field.primary_key and new_field.primary_key)
                or (old_field.unique and new_field.unique)
            )
            and ((old_type != new_type) or (old_collation != new_collation))
        )
        if drop_foreign_keys:
            # '_meta.related_field' also contains M2M reverse fields, these
            # will be filtered out
            for _old_rel, new_rel in _related_non_m2m_objects(old_field, new_field):
                rel_fk_names = self._constraint_names(
                    new_rel.related_model, [new_rel.field.column], foreign_key=True
                )
                for fk_name in rel_fk_names:
                    self.execute(self._delete_fk_sql(new_rel.related_model, fk_name))
        # Removed an index? (no strict check, as multiple indexes are possible)
        # Remove indexes if db_index switched to False or a unique constraint
        # will now be used in lieu of an index. The following lines from the
        # truth table show all True cases; the rest are False:
        #
        # old_field.db_index | old_field.unique | new_field.db_index | new_field.unique
        # ------------------------------------------------------------------------------
        # True               | False            | False              | False
        # True               | False            | False              | True
        # True               | False            | True               | True
        if (
            old_field.db_index
            and not old_field.unique
            and (not new_field.db_index or new_field.unique)
        ):
            # Find the index for this field
            meta_index_names = {index.name for index in model._meta.indexes}
            # Retrieve only BTREE indexes since this is what's created with
            # db_index=True.
            index_names = self._constraint_names(
                model,
                [old_field.column],
                index=True,
                type_=Index.suffix,
                exclude=meta_index_names,
            )
            for index_name in index_names:
                # The only way to check if an index was created with
                # db_index=True or with Index(['field'], name='foo')
                # is to look at its name (refs #28053).
                self.execute(self._delete_index_sql(model, index_name))
        # Change check constraints?
        old_db_check = self._field_db_check(old_field, old_db_params)
        new_db_check = self._field_db_check(new_field, new_db_params)
        if old_db_check != new_db_check and old_db_check:
            meta_constraint_names = {
                constraint.name for constraint in model._meta.constraints
            }
            constraint_names = self._constraint_names(
                model,
                [old_field.column],
                check=True,
                exclude=meta_constraint_names,
            )
            if strict and len(constraint_names) != 1:
                raise ValueError(
                    "Found wrong number (%s) of check constraints for %s.%s"
                    % (
                        len(constraint_names),
                        model._meta.db_table,
                        old_field.column,
                    )
                )
            for constraint_name in constraint_names:
                self.execute(self._delete_check_sql(model, constraint_name))
        # Have they renamed the column?
        if old_field.column != new_field.column:
            self.execute(
                self._rename_field_sql(
                    model._meta.db_table, old_field, new_field, new_type
                )
            )
            # Rename all references to the renamed column.
            for sql in self.deferred_sql:
                if isinstance(sql, Statement):
                    sql.rename_column_references(
                        model._meta.db_table, old_field.column, new_field.column
                    )
        # Next, start accumulating actions to do
        actions = []
        null_actions = []
        post_actions = []
        # Type suffix change? (e.g. auto increment).
        old_type_suffix = old_field.db_type_suffix(connection=self.connection)
        new_type_suffix = new_field.db_type_suffix(connection=self.connection)
        # Type, collation, or comment change?
        if (
            old_type != new_type
            or old_type_suffix != new_type_suffix
            or old_collation != new_collation
            or (
                self.connection.features.supports_comments
                and old_field.db_comment != new_field.db_comment
            )
        ):
            fragment, other_actions = self._alter_column_type_sql(
                model, old_field, new_field, new_type, old_collation, new_collation
            )
            actions.append(fragment)
            post_actions.extend(other_actions)

        if new_field.db_default is not NOT_PROVIDED:
            if (
                old_field.db_default is NOT_PROVIDED
                or new_field.db_default != old_field.db_default
            ):
                actions.append(
                    self._alter_column_database_default_sql(model, old_field, new_field)
                )
        elif old_field.db_default is not NOT_PROVIDED:
            actions.append(
                self._alter_column_database_default_sql(
                    model, old_field, new_field, drop=True
                )
            )
        # When changing a column NULL constraint to NOT NULL with a given
        # default value, we need to perform 4 steps:
        #  1. Add a default for new incoming writes
        #  2. Update existing NULL rows with new default
        #  3. Replace NULL constraint with NOT NULL
        #  4. Drop the default again.
        # Default change?
        needs_database_default = False
        if (
            old_field.null
            and not new_field.null
            and new_field.db_default is NOT_PROVIDED
        ):
            old_default = self.effective_default(old_field)
            new_default = self.effective_default(new_field)
            if (
                not self.skip_default_on_alter(new_field)
                and old_default != new_default
                and new_default is not None
            ):
                needs_database_default = True
                actions.append(
                    self._alter_column_default_sql(model, old_field, new_field)
                )
        # Nullability change?
        if old_field.null != new_field.null:
            fragment = self._alter_column_null_sql(model, old_field, new_field)
            if fragment:
                null_actions.append(fragment)
        # Only if we have a default and there is a change from NULL to NOT NULL
        four_way_default_alteration = (
            new_field.has_default() or new_field.db_default is not NOT_PROVIDED
        ) and (old_field.null and not new_field.null)
        if actions or null_actions:
            if not four_way_default_alteration:
                # If we don't have to do a 4-way default alteration we can
                # directly run a (NOT) NULL alteration
                actions += null_actions
            # Combine actions together if we can (e.g. postgres)
            if self.connection.features.supports_combined_alters and actions:
                sql, params = tuple(zip(*actions))
                actions = [(", ".join(sql), sum(params, []))]
            # Apply those actions
            for sql, params in actions:
                self.execute(
                    self.sql_alter_column
                    % {
                        "table": self.quote_name(model._meta.db_table),
                        "changes": sql,
                    },
                    params,
                )
            if four_way_default_alteration:
                if new_field.db_default is NOT_PROVIDED:
                    default_sql = "%s"
                    params = [new_default]
                else:
                    default_sql, params = self.db_default_sql(new_field)
                # Update existing rows with default value
                self.execute(
                    self.sql_update_with_default
                    % {
                        "table": self.quote_name(model._meta.db_table),
                        "column": self.quote_name(new_field.column),
                        "default": default_sql,
                    },
                    params,
                )
                # Since we didn't run a NOT NULL change before we need to do it
                # now
                for sql, params in null_actions:
                    self.execute(
                        self.sql_alter_column
                        % {
                            "table": self.quote_name(model._meta.db_table),
                            "changes": sql,
                        },
                        params,
                    )
        if post_actions:
            for sql, params in post_actions:
                self.execute(sql, params)
        # If primary_key changed to False, delete the primary key constraint.
        if old_field.primary_key and not new_field.primary_key:
            self._delete_primary_key(model, strict)
        # Added a unique?
        if self._unique_should_be_added(old_field, new_field):
            self.execute(self._create_unique_sql(model, [new_field]))
        # Added an index? Add an index if db_index switched to True or a unique
        # constraint will no longer be used in lieu of an index. The following
        # lines from the truth table show all True cases; the rest are False:
        #
        # old_field.db_index | old_field.unique | new_field.db_index | new_field.unique
        # ------------------------------------------------------------------------------
        # False              | False            | True               | False
        # False              | True             | True               | False
        # True               | True             | True               | False
        if (
            (not old_field.db_index or old_field.unique)
            and new_field.db_index
            and not new_field.unique
        ):
            self.execute(self._create_index_sql(model, fields=[new_field]))
        # Type alteration on primary key? Then we need to alter the column
        # referring to us.
        rels_to_update = []
        if drop_foreign_keys:
            rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
        # Changed to become primary key?
        if self._field_became_primary_key(old_field, new_field):
            # Make the new one
            self.execute(self._create_primary_key_sql(model, new_field))
            # Update all referencing columns
            rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
        # Handle our type alters on the other end of rels from the PK stuff above
        for old_rel, new_rel in rels_to_update:
            rel_db_params = new_rel.field.db_parameters(connection=self.connection)
            rel_type = rel_db_params["type"]
            rel_collation = rel_db_params.get("collation")
            old_rel_db_params = old_rel.field.db_parameters(connection=self.connection)
            old_rel_collation = old_rel_db_params.get("collation")
            fragment, other_actions = self._alter_column_type_sql(
                new_rel.related_model,
                old_rel.field,
                new_rel.field,
                rel_type,
                old_rel_collation,
                rel_collation,
            )
            self.execute(
                self.sql_alter_column
                % {
                    "table": self.quote_name(new_rel.related_model._meta.db_table),
                    "changes": fragment[0],
                },
                fragment[1],
            )
            for sql, params in other_actions:
                self.execute(sql, params)
        # Does it have a foreign key?
        if (
            self.connection.features.supports_foreign_keys
            and new_field.remote_field
            and (
                fks_dropped or not old_field.remote_field or not old_field.db_constraint
            )
            and new_field.db_constraint
        ):
            self.execute(
                self._create_fk_sql(model, new_field, "_fk_%(to_table)s_%(to_column)s")
            )
        # Rebuild FKs that pointed to us if we previously had to drop them
        if drop_foreign_keys:
            for _, rel in rels_to_update:
                if rel.field.db_constraint:
                    self.execute(
                        self._create_fk_sql(rel.related_model, rel.field, "_fk")
                    )
        # Does it have check constraints we need to add?
        if old_db_check != new_db_check and new_db_check:
            constraint_name = self._create_index_name(
                model._meta.db_table, [new_field.column], suffix="_check"
            )
            self.execute(
                self._create_check_sql(model, constraint_name, new_db_params["check"])
            )
        # Drop the default if we need to
        # (Django usually does not use in-database defaults)
        if needs_database_default:
            changes_sql, params = self._alter_column_default_sql(
                model, old_field, new_field, drop=True
            )
            sql = self.sql_alter_column % {
                "table": self.quote_name(model._meta.db_table),
                "changes": changes_sql,
            }
            self.execute(sql, params)
        # Reset connection if required
        if self.connection.features.connection_persists_old_columns:
            self.connection.close()

    def _alter_column_null_sql(self, model, old_field, new_field):
        """
        Hook to specialize column null alteration.

        Return a (sql, params) fragment to set a column to null or non-null
        as required by new_field, or None if no changes are required.
        """
        if (
            self.connection.features.interprets_empty_strings_as_nulls
            and new_field.empty_strings_allowed
        ):
            # The field is nullable in the database anyway, leave it alone.
            return
        else:
            new_db_params = new_field.db_parameters(connection=self.connection)
            sql = (
                self.sql_alter_column_null
                if new_field.null
                else self.sql_alter_column_not_null
            )
            return (
                sql
                % {
                    "column": self.quote_name(new_field.column),
                    "type": new_db_params["type"],
                },
                [],
            )

    def _alter_column_default_sql(self, model, old_field, new_field, drop=False):
        """
        Hook to specialize column default alteration.

        Return a (sql, params) fragment to add or drop (depending on the drop
        argument) a default to new_field's column.
        """
        new_default = self.effective_default(new_field)
        default = self._column_default_sql(new_field)
        params = [new_default]

        if drop:
            params = []
        elif self.connection.features.requires_literal_defaults:
            # Some databases (Oracle) can't take defaults as a parameter
            # If this is the case, the SchemaEditor for that database should
            # implement prepare_default().
            default = self.prepare_default(new_default)
            params = []

        new_db_params = new_field.db_parameters(connection=self.connection)
        if drop:
            if new_field.null:
                sql = self.sql_alter_column_no_default_null
            else:
                sql = self.sql_alter_column_no_default
        else:
            sql = self.sql_alter_column_default
        return (
            sql
            % {
                "column": self.quote_name(new_field.column),
                "type": new_db_params["type"],
                "default": default,
            },
            params,
        )

    def _alter_column_database_default_sql(
        self, model, old_field, new_field, drop=False
    ):
        """
        Hook to specialize column database default alteration.

        Return a (sql, params) fragment to add or drop (depending on the drop
        argument) a default to new_field's column.
        """
        if drop:
            sql = self.sql_alter_column_no_default
            default_sql = ""
            params = []
        else:
            sql = self.sql_alter_column_default
            default_sql, params = self.db_default_sql(new_field)

        new_db_params = new_field.db_parameters(connection=self.connection)
        return (
            sql
            % {
                "column": self.quote_name(new_field.column),
                "type": new_db_params["type"],
                "default": default_sql,
            },
            params,
        )

    def _alter_column_type_sql(
        self, model, old_field, new_field, new_type, old_collation, new_collation
    ):
        """
        Hook to specialize column type alteration for different backends,
        for cases when a creation type is different to an alteration type
        (e.g. SERIAL in PostgreSQL, PostGIS fields).

        Return a 2-tuple of: an SQL fragment of (sql, params) to insert into
        an ALTER TABLE statement and a list of extra (sql, params) tuples to
        run once the field is altered.
        """
        other_actions = []
        if collate_sql := self._collate_sql(
            new_collation, old_collation, model._meta.db_table
        ):
            collate_sql = f" {collate_sql}"
        else:
            collate_sql = ""
        # Comment change?
        comment_sql = ""
        if self.connection.features.supports_comments and not new_field.many_to_many:
            if old_field.db_comment != new_field.db_comment:
                # PostgreSQL and Oracle can't execute 'ALTER COLUMN ...' and
                # 'COMMENT ON ...' at the same time.
                sql, params = self._alter_column_comment_sql(
                    model, new_field, new_type, new_field.db_comment
                )
                if sql:
                    other_actions.append((sql, params))
            if new_field.db_comment:
                comment_sql = self._comment_sql(new_field.db_comment)
        return (
            (
                self.sql_alter_column_type
                % {
                    "column": self.quote_name(new_field.column),
                    "type": new_type,
                    "collation": collate_sql,
                    "comment": comment_sql,
                },
                [],
            ),
            other_actions,
        )

    def _alter_column_comment_sql(self, model, new_field, new_type, new_db_comment):
        return (
            self.sql_alter_column_comment
            % {
                "table": self.quote_name(model._meta.db_table),
                "column": self.quote_name(new_field.column),
                "comment": self._comment_sql(new_db_comment),
            },
            [],
        )

    def _comment_sql(self, comment):
        return self.quote_value(comment or "")

    def _alter_many_to_many(self, model, old_field, new_field, strict):
        """Alter M2Ms to repoint their to= endpoints."""
        # Rename the through table
        if (
            old_field.remote_field.through._meta.db_table
            != new_field.remote_field.through._meta.db_table
        ):
            self.alter_db_table(
                old_field.remote_field.through,
                old_field.remote_field.through._meta.db_table,
                new_field.remote_field.through._meta.db_table,
            )
        # Repoint the FK to the other side
        self.alter_field(
            new_field.remote_field.through,
            # The field that points to the target model is needed, so we can
            # tell alter_field to change it - this is m2m_reverse_field_name()
            # (as opposed to m2m_field_name(), which points to our model).
            old_field.remote_field.through._meta.get_field(
                old_field.m2m_reverse_field_name()
            ),
            new_field.remote_field.through._meta.get_field(
                new_field.m2m_reverse_field_name()
            ),
        )
        self.alter_field(
            new_field.remote_field.through,
            # for self-referential models we need to alter field from the other end too
            old_field.remote_field.through._meta.get_field(old_field.m2m_field_name()),
            new_field.remote_field.through._meta.get_field(new_field.m2m_field_name()),
        )

    def _create_index_name(self, table_name, column_names, suffix=""):
        """
        Generate a unique name for an index/unique constraint.

        The name is divided into 3 parts: the table name, the column names,
        and a unique digest and suffix.
        """
        _, table_name = split_identifier(table_name)
        hash_suffix_part = "%s%s" % (
            names_digest(table_name, *column_names, length=8),
            suffix,
        )
        max_length = self.connection.ops.max_name_length() or 200
        # If everything fits into max_length, use that name.
        index_name = "%s_%s_%s" % (table_name, "_".join(column_names), hash_suffix_part)
        if len(index_name) <= max_length:
            return index_name
        # Shorten a long suffix.
        if len(hash_suffix_part) > max_length / 3:
            hash_suffix_part = hash_suffix_part[: max_length // 3]
        other_length = (max_length - len(hash_suffix_part)) // 2 - 1
        index_name = "%s_%s_%s" % (
            table_name[:other_length],
            "_".join(column_names)[:other_length],
            hash_suffix_part,
        )
        # Prepend D if needed to prevent the name from starting with an
        # underscore or a number (not permitted on Oracle).
        if index_name[0] == "_" or index_name[0].isdigit():
            index_name = "D%s" % index_name[:-1]
        return index_name

    def _get_index_tablespace_sql(self, model, fields, db_tablespace=None):
        if db_tablespace is None:
            if len(fields) == 1 and fields[0].db_tablespace:
                db_tablespace = fields[0].db_tablespace
            elif settings.DEFAULT_INDEX_TABLESPACE:
                db_tablespace = settings.DEFAULT_INDEX_TABLESPACE
            elif model._meta.db_tablespace:
                db_tablespace = model._meta.db_tablespace
        if db_tablespace is not None:
            return " " + self.connection.ops.tablespace_sql(db_tablespace)
        return ""

    def _index_condition_sql(self, condition):
        if condition:
            return " WHERE " + condition
        return ""

    def _index_include_sql(self, model, columns):
        if not columns or not self.connection.features.supports_covering_indexes:
            return ""
        return Statement(
            " INCLUDE (%(columns)s)",
            columns=Columns(model._meta.db_table, columns, self.quote_name),
        )

    def _create_index_sql(
        self,
        model,
        *,
        fields=None,
        name=None,
        suffix="",
        using="",
        db_tablespace=None,
        col_suffixes=(),
        sql=None,
        opclasses=(),
        condition=None,
        include=None,
        expressions=None,
    ):
        """
        Return the SQL statement to create the index for one or several fields
        or expressions. `sql` can be specified if the syntax differs from the
        standard (GIS indexes, ...).
        """
        fields = fields or []
        expressions = expressions or []
        compiler = Query(model, alias_cols=False).get_compiler(
            connection=self.connection,
        )
        tablespace_sql = self._get_index_tablespace_sql(
            model, fields, db_tablespace=db_tablespace
        )
        columns = [field.column for field in fields]
        sql_create_index = sql or self.sql_create_index
        table = model._meta.db_table

        def create_index_name(*args, **kwargs):
            nonlocal name
            if name is None:
                name = self._create_index_name(*args, **kwargs)
            return self.quote_name(name)

        return Statement(
            sql_create_index,
            table=Table(table, self.quote_name),
            name=IndexName(table, columns, suffix, create_index_name),
            using=using,
            columns=(
                self._index_columns(table, columns, col_suffixes, opclasses)
                if columns
                else Expressions(table, expressions, compiler, self.quote_value)
            ),
            extra=tablespace_sql,
            condition=self._index_condition_sql(condition),
            include=self._index_include_sql(model, include),
        )

    def _delete_index_sql(self, model, name, sql=None):
        return Statement(
            sql or self.sql_delete_index,
            table=Table(model._meta.db_table, self.quote_name),
            name=self.quote_name(name),
        )

    def _rename_index_sql(self, model, old_name, new_name):
        return Statement(
            self.sql_rename_index,
            table=Table(model._meta.db_table, self.quote_name),
            old_name=self.quote_name(old_name),
            new_name=self.quote_name(new_name),
        )

    def _index_columns(self, table, columns, col_suffixes, opclasses):
        return Columns(table, columns, self.quote_name, col_suffixes=col_suffixes)

    def _model_indexes_sql(self, model):
        """
        Return a list of all index SQL statements (field indexes, Meta.indexes)
        for the specified model.
        """
        if not model._meta.managed or model._meta.proxy or model._meta.swapped:
            return []
        output = []
        for field in model._meta.local_fields:
            output.extend(self._field_indexes_sql(model, field))

        for index in model._meta.indexes:
            if (
                not index.contains_expressions
                or self.connection.features.supports_expression_indexes
            ):
                output.append(index.create_sql(model, self))
        return output

    def _field_indexes_sql(self, model, field):
        """
        Return a list of all index SQL statements for the specified field.
        """
        output = []
        if self._field_should_be_indexed(model, field):
            output.append(self._create_index_sql(model, fields=[field]))
        return output

    def _field_should_be_altered(self, old_field, new_field, ignore=None):
        if not old_field.concrete and not new_field.concrete:
            return False
        ignore = ignore or set()
        _, old_path, old_args, old_kwargs = old_field.deconstruct()
        _, new_path, new_args, new_kwargs = new_field.deconstruct()
        # Don't alter when:
        # - changing only a field name
        # - changing an attribute that doesn't affect the schema
        # - changing an attribute in the provided set of ignored attributes
        # - adding only a db_column and the column name is not changed
        # - db_table does not change for model referenced by foreign keys
        for attr in ignore.union(old_field.non_db_attrs):
            old_kwargs.pop(attr, None)
        for attr in ignore.union(new_field.non_db_attrs):
            new_kwargs.pop(attr, None)
        if (
            not new_field.many_to_many
            and old_field.remote_field
            and new_field.remote_field
            and old_field.remote_field.model._meta.db_table
            == new_field.remote_field.model._meta.db_table
        ):
            old_kwargs.pop("to", None)
            new_kwargs.pop("to", None)
        # db_default can take many form but result in the same SQL.
        if (
            old_kwargs.get("db_default")
            and new_kwargs.get("db_default")
            and self.db_default_sql(old_field) == self.db_default_sql(new_field)
        ):
            old_kwargs.pop("db_default")
            new_kwargs.pop("db_default")
        return self.quote_name(old_field.column) != self.quote_name(
            new_field.column
        ) or (old_path, old_args, old_kwargs) != (new_path, new_args, new_kwargs)

    def _field_should_be_indexed(self, model, field):
        return field.db_index and not field.unique

    def _field_became_primary_key(self, old_field, new_field):
        return not old_field.primary_key and new_field.primary_key

    def _unique_should_be_added(self, old_field, new_field):
        return (
            not new_field.primary_key
            and new_field.unique
            and (not old_field.unique or old_field.primary_key)
        )

    def _rename_field_sql(self, table, old_field, new_field, new_type):
        return self.sql_rename_column % {
            "table": self.quote_name(table),
            "old_column": self.quote_name(old_field.column),
            "new_column": self.quote_name(new_field.column),
            "type": new_type,
        }

    def _create_fk_sql(self, model, field, suffix):
        table = Table(model._meta.db_table, self.quote_name)
        name = self._fk_constraint_name(model, field, suffix)
        column = Columns(model._meta.db_table, [field.column], self.quote_name)
        to_table = Table(field.target_field.model._meta.db_table, self.quote_name)
        to_column = Columns(
            field.target_field.model._meta.db_table,
            [field.target_field.column],
            self.quote_name,
        )
        deferrable = self.connection.ops.deferrable_sql()
        return Statement(
            self.sql_create_fk,
            table=table,
            name=name,
            column=column,
            to_table=to_table,
            to_column=to_column,
            deferrable=deferrable,
        )

    def _fk_constraint_name(self, model, field, suffix):
        def create_fk_name(*args, **kwargs):
            return self.quote_name(self._create_index_name(*args, **kwargs))

        return ForeignKeyName(
            model._meta.db_table,
            [field.column],
            split_identifier(field.target_field.model._meta.db_table)[1],
            [field.target_field.column],
            suffix,
            create_fk_name,
        )

    def _delete_fk_sql(self, model, name):
        return self._delete_constraint_sql(self.sql_delete_fk, model, name)

    def _deferrable_constraint_sql(self, deferrable):
        if deferrable is None:
            return ""
        if deferrable == Deferrable.DEFERRED:
            return " DEFERRABLE INITIALLY DEFERRED"
        if deferrable == Deferrable.IMMEDIATE:
            return " DEFERRABLE INITIALLY IMMEDIATE"

    def _unique_index_nulls_distinct_sql(self, nulls_distinct):
        if nulls_distinct is False:
            return " NULLS NOT DISTINCT"
        elif nulls_distinct is True:
            return " NULLS DISTINCT"
        return ""

    def _unique_supported(
        self,
        condition=None,
        deferrable=None,
        include=None,
        expressions=None,
        nulls_distinct=None,
    ):
        return (
            (not condition or self.connection.features.supports_partial_indexes)
            and (
                not deferrable
                or self.connection.features.supports_deferrable_unique_constraints
            )
            and (not include or self.connection.features.supports_covering_indexes)
            and (
                not expressions or self.connection.features.supports_expression_indexes
            )
            and (
                nulls_distinct is None
                or self.connection.features.supports_nulls_distinct_unique_constraints
            )
        )

    def _unique_sql(
        self,
        model,
        fields,
        name,
        condition=None,
        deferrable=None,
        include=None,
        opclasses=None,
        expressions=None,
        nulls_distinct=None,
    ):
        if not self._unique_supported(
            condition=condition,
            deferrable=deferrable,
            include=include,
            expressions=expressions,
            nulls_distinct=nulls_distinct,
        ):
            return None

        if (
            condition
            or include
            or opclasses
            or expressions
            or nulls_distinct is not None
        ):
            # Databases support conditional, covering, functional unique,
            # and nulls distinct constraints via a unique index.
            sql = self._create_unique_sql(
                model,
                fields,
                name=name,
                condition=condition,
                include=include,
                opclasses=opclasses,
                expressions=expressions,
                nulls_distinct=nulls_distinct,
            )
            if sql:
                self.deferred_sql.append(sql)
            return None
        constraint = self.sql_unique_constraint % {
            "columns": ", ".join([self.quote_name(field.column) for field in fields]),
            "deferrable": self._deferrable_constraint_sql(deferrable),
        }
        return self.sql_constraint % {
            "name": self.quote_name(name),
            "constraint": constraint,
        }

    def _create_unique_sql(
        self,
        model,
        fields,
        name=None,
        condition=None,
        deferrable=None,
        include=None,
        opclasses=None,
        expressions=None,
        nulls_distinct=None,
    ):
        if not self._unique_supported(
            condition=condition,
            deferrable=deferrable,
            include=include,
            expressions=expressions,
            nulls_distinct=nulls_distinct,
        ):
            return None

        compiler = Query(model, alias_cols=False).get_compiler(
            connection=self.connection
        )
        table = model._meta.db_table
        columns = [field.column for field in fields]
        if name is None:
            name = self._unique_constraint_name(table, columns, quote=True)
        else:
            name = self.quote_name(name)
        if condition or include or opclasses or expressions:
            sql = self.sql_create_unique_index
        else:
            sql = self.sql_create_unique
        if columns:
            columns = self._index_columns(
                table, columns, col_suffixes=(), opclasses=opclasses
            )
        else:
            columns = Expressions(table, expressions, compiler, self.quote_value)
        return Statement(
            sql,
            table=Table(table, self.quote_name),
            name=name,
            columns=columns,
            condition=self._index_condition_sql(condition),
            deferrable=self._deferrable_constraint_sql(deferrable),
            include=self._index_include_sql(model, include),
            nulls_distinct=self._unique_index_nulls_distinct_sql(nulls_distinct),
        )

    def _unique_constraint_name(self, table, columns, quote=True):
        if quote:

            def create_unique_name(*args, **kwargs):
                return self.quote_name(self._create_index_name(*args, **kwargs))

        else:
            create_unique_name = self._create_index_name

        return IndexName(table, columns, "_uniq", create_unique_name)

    def _delete_unique_sql(
        self,
        model,
        name,
        condition=None,
        deferrable=None,
        include=None,
        opclasses=None,
        expressions=None,
        nulls_distinct=None,
    ):
        if not self._unique_supported(
            condition=condition,
            deferrable=deferrable,
            include=include,
            expressions=expressions,
            nulls_distinct=nulls_distinct,
        ):
            return None
        if condition or include or opclasses or expressions:
            sql = self.sql_delete_index
        else:
            sql = self.sql_delete_unique
        return self._delete_constraint_sql(sql, model, name)

    def _check_sql(self, name, check):
        return self.sql_constraint % {
            "name": self.quote_name(name),
            "constraint": self.sql_check_constraint % {"check": check},
        }

    def _create_check_sql(self, model, name, check):
        if not self.connection.features.supports_table_check_constraints:
            return None
        return Statement(
            self.sql_create_check,
            table=Table(model._meta.db_table, self.quote_name),
            name=self.quote_name(name),
            check=check,
        )

    def _delete_check_sql(self, model, name):
        if not self.connection.features.supports_table_check_constraints:
            return None
        return self._delete_constraint_sql(self.sql_delete_check, model, name)

    def _delete_constraint_sql(self, template, model, name):
        return Statement(
            template,
            table=Table(model._meta.db_table, self.quote_name),
            name=self.quote_name(name),
        )

    def _constraint_names(
        self,
        model,
        column_names=None,
        unique=None,
        primary_key=None,
        index=None,
        foreign_key=None,
        check=None,
        type_=None,
        exclude=None,
    ):
        """Return all constraint names matching the columns and conditions."""
        if column_names is not None:
            column_names = [
                (
                    self.connection.introspection.identifier_converter(
                        truncate_name(name, self.connection.ops.max_name_length())
                    )
                    if self.connection.features.truncates_names
                    else self.connection.introspection.identifier_converter(name)
                )
                for name in column_names
            ]
        with self.connection.cursor() as cursor:
            constraints = self.connection.introspection.get_constraints(
                cursor, model._meta.db_table
            )
        result = []
        for name, infodict in constraints.items():
            if column_names is None or column_names == infodict["columns"]:
                if unique is not None and infodict["unique"] != unique:
                    continue
                if primary_key is not None and infodict["primary_key"] != primary_key:
                    continue
                if index is not None and infodict["index"] != index:
                    continue
                if check is not None and infodict["check"] != check:
                    continue
                if foreign_key is not None and not infodict["foreign_key"]:
                    continue
                if type_ is not None and infodict["type"] != type_:
                    continue
                if not exclude or name not in exclude:
                    result.append(name)
        return result

    def _delete_primary_key(self, model, strict=False):
        constraint_names = self._constraint_names(model, primary_key=True)
        if strict and len(constraint_names) != 1:
            raise ValueError(
                "Found wrong number (%s) of PK constraints for %s"
                % (
                    len(constraint_names),
                    model._meta.db_table,
                )
            )
        for constraint_name in constraint_names:
            self.execute(self._delete_primary_key_sql(model, constraint_name))

    def _create_primary_key_sql(self, model, field):
        return Statement(
            self.sql_create_pk,
            table=Table(model._meta.db_table, self.quote_name),
            name=self.quote_name(
                self._create_index_name(
                    model._meta.db_table, [field.column], suffix="_pk"
                )
            ),
            columns=Columns(model._meta.db_table, [field.column], self.quote_name),
        )

    def _delete_primary_key_sql(self, model, name):
        return self._delete_constraint_sql(self.sql_delete_pk, model, name)

    def _collate_sql(self, collation, old_collation=None, table_name=None):
        return "COLLATE " + self.quote_name(collation) if collation else ""

    def remove_procedure(self, procedure_name, param_types=()):
        sql = self.sql_delete_procedure % {
            "procedure": self.quote_name(procedure_name),
            "param_types": ",".join(param_types),
        }
        self.execute(sql)