OCA/openupgradelib

View on GitHub
openupgradelib/openupgrade_merge_records.py

Summary

Maintainability
F
1 wk
Test Coverage
# -*- coding: utf-8 -*- # pylint: disable=C8202
# Copyright 2018 Tecnativa - Pedro M. Baeza
# Copyright 2018 Opener B.V. - Stefan Rijnhart
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

import functools
import logging

from psycopg2 import IntegrityError, ProgrammingError, sql
from psycopg2.errorcodes import UNDEFINED_COLUMN, UNIQUE_VIOLATION
from psycopg2.extensions import AsIs

from .openupgrade import get_model2table, logged_query, version_info
from .openupgrade_tools import column_exists, table_exists

logger = logging.getLogger("OpenUpgrade")
logger.setLevel(logging.DEBUG)


def _change_foreign_key_refs(
    env,
    model_name,
    record_ids,
    target_record_id,
    exclude_columns,
    model_table,
    extra_where=None,
):
    # As found on https://stackoverflow.com/questions/1152260
    # /postgres-sql-to-list-table-foreign-keys
    # Adapted for specific Odoo structures like many2many tables
    env.cr.execute(
        """ SELECT tc.table_name, kcu.column_name
            FROM information_schema.table_constraints AS tc
            JOIN information_schema.key_column_usage AS kcu
                ON tc.constraint_name = kcu.constraint_name
                AND tc.table_schema = kcu.table_schema
            JOIN information_schema.constraint_column_usage AS ccu
                ON ccu.constraint_name = tc.constraint_name
                AND ccu.table_schema = tc.table_schema
            WHERE tc.constraint_type = 'FOREIGN KEY'
            AND ccu.table_name = %s and ccu.column_name = 'id'
        """,
        (model_table,),
    )
    for table, column in env.cr.fetchall():
        if (table, column) in exclude_columns:
            continue
        # Try one big swoop first
        env.cr.execute("SAVEPOINT sp1")  # can't use env.cr.savepoint() in base
        try:
            query = sql.SQL(
                """UPDATE {table}
                SET {column} = %(target_record_id)s
                WHERE {column} in %(record_ids)s"""
            ).format(
                table=sql.Identifier(table),
                column=sql.Identifier(column),
            )
            if extra_where:
                query += sql.SQL(extra_where)
            logged_query(
                env.cr,
                query,
                {
                    "record_ids": tuple(record_ids),
                    "target_record_id": target_record_id,
                },
                skip_no_result=True,
            )
        except (ProgrammingError, IntegrityError) as error:
            env.cr.execute("ROLLBACK TO SAVEPOINT sp1")
            if error.pgcode == UNDEFINED_COLUMN and extra_where:
                # extra_where is introducing a bad column. Ignore this table.
                continue
            elif error.pgcode != UNIQUE_VIOLATION:
                raise
            # Fallback on setting each row separately
            m2m_table = not column_exists(env.cr, table, "id")
            target_column = column if m2m_table else "id"
            env.cr.execute(
                """SELECT %(target_column)s FROM %(table)s
                WHERE "%(column)s" in %(record_ids)s""",
                {
                    "target_column": AsIs(target_column),
                    "table": AsIs(table),
                    "column": AsIs(column),
                    "record_ids": tuple(record_ids),
                },
            )
            for row in list(set([x[0] for x in env.cr.fetchall()])):
                env.cr.execute("SAVEPOINT sp2")
                try:
                    logged_query(
                        env.cr,
                        """UPDATE %(table)s
                        SET "%(column)s" = %(target_record_id)s
                        WHERE %(target_column)s = %(record_id)s""",
                        {
                            "target_column": AsIs(target_column),
                            "table": AsIs(table),
                            "column": AsIs(column),
                            "record_id": row,
                            "target_record_id": target_record_id,
                        },
                    )
                except (ProgrammingError, IntegrityError) as error:
                    env.cr.execute("ROLLBACK TO SAVEPOINT sp2")
                    if error.pgcode != UNIQUE_VIOLATION:
                        raise
                else:
                    env.cr.execute("RELEASE SAVEPOINT sp2")
            if m2m_table:
                # delete remaining values that could not be merged
                logged_query(
                    env.cr,
                    """DELETE FROM %(table)s
                    WHERE "%(column)s" in %(record_ids)s""",
                    {
                        "table": AsIs(table),
                        "column": AsIs(column),
                        "record_ids": tuple(record_ids),
                    },
                    skip_no_result=True,
                )
        else:
            env.cr.execute("RELEASE SAVEPOINT sp1")


def _change_many2one_refs_orm(
    env, model_name, record_ids, target_record_id, exclude_columns
):
    fields = env["ir.model.fields"].search(
        [
            ("ttype", "=", "many2one"),
            ("relation", "=", model_name),
        ]
    )
    for field in fields:
        try:
            model = env[field.model].with_context(active_test=False)
        except KeyError:
            continue
        field_name = field.name
        if (
            not model._auto
            or not model._fields.get(field_name)
            or not field.store
            or (model._table, field_name) in exclude_columns
        ):
            continue  # Discard SQL views + invalid fields + non-stored fields
        records = model.search([(field_name, "in", record_ids)])
        if records:
            records.write({field_name: target_record_id})
            logger.debug(
                "Changed %s record(s) in many2one field '%s' of model '%s'",
                len(records),
                field_name,
                field.model,
            )


def _change_many2many_refs_orm(
    env, model_name, record_ids, target_record_id, exclude_columns
):
    fields = env["ir.model.fields"].search(
        [
            ("ttype", "=", "many2many"),
            ("relation", "=", model_name),
        ]
    )
    for field in fields:
        try:
            model = env[field.model].with_context(active_test=False)
        except KeyError:
            continue
        field_name = field.name
        if (
            not model._auto
            or not model._fields.get(field_name)
            or not field.store
            or (model._table, field_name) in exclude_columns
        ):
            continue  # Discard SQL views + invalid fields + non-stored fields
        records = model.search([(field_name, "in", record_ids)])
        if records:
            records.write(
                {
                    field_name: (
                        [(3, x) for x in record_ids] + [(4, target_record_id)]
                    ),
                }
            )
            logger.debug(
                "Changed %s record(s) in many2many field '%s' of model '%s'",
                len(records),
                field_name,
                field.model,
            )


def _change_reference_refs_sql(
    env, model_name, record_ids, target_record_id, exclude_columns
):
    cr = env.cr
    cr.execute(
        """
        SELECT model, name
        FROM ir_model_fields
        WHERE ttype='reference'
        """
    )
    rows = cr.fetchall()
    if ("ir.property", "value_reference") not in rows:
        rows.append(("ir.property", "value_reference"))
    for row in rows:
        try:
            model = env[row[0]]
            if not model._auto:  # Discard SQL views
                continue
            table = model._table
        except KeyError:
            table = get_model2table(row[0])
        if not table_exists(cr, table):
            continue
        column = row[1]
        if not column_exists(cr, table, column) or ((table, column) in exclude_columns):
            continue
        where = " OR ".join(
            ["%s = '%s,%s'" % (column, model_name, x) for x in record_ids]
        )
        logged_query(
            cr,
            """
            UPDATE %s
            SET %s = %s
            WHERE %s
            """,
            (
                AsIs(table),
                AsIs(column),
                "%s,%s" % (model_name, target_record_id),
                AsIs(where),
            ),
            skip_no_result=True,
        )


def _change_reference_refs_orm(
    env, model_name, record_ids, target_record_id, exclude_columns
):
    fields = env["ir.model.fields"].search([("ttype", "=", "reference")])
    if version_info[0] >= 12:
        fields |= env.ref("base.field_ir_property__value_reference")
    else:
        fields |= env.ref("base.field_ir_property_value_reference")
    for field in fields:
        try:
            model = env[field.model].with_context(active_test=False)
        except KeyError:
            continue
        field_name = field.name
        if (
            not model._auto
            or not model._fields.get(field_name)
            or not field.store
            or (model._table, field_name) in exclude_columns
        ):
            continue  # Discard SQL views + invalid fields + non-stored fields
        expr = ["%s,%s" % (model_name, x) for x in record_ids]
        records = model.search([(field_name, "in", expr)])
        if records:
            records.write(
                {
                    field_name: "%s,%s" % (model_name, target_record_id),
                }
            )
            logger.debug(
                "Changed %s record(s) in reference field '%s' of model '%s'",
                len(records),
                field_name,
                field.model,
            )


def _change_translations_orm(
    env, model_name, record_ids, target_record_id, exclude_columns
):
    if version_info[0] > 15:
        return
    if ("ir_translation", "res_id") in exclude_columns:
        return
    translation_obj = env["ir.translation"]
    groups = translation_obj.read_group(
        [
            ("type", "=", "model"),
            ("res_id", "in", record_ids),
            ("name", "like", "%s,%%" % model_name),
        ],
        ["name", "lang"],
        ["name", "lang"],
        lazy=False,
    )
    for group in groups:
        target_translation = translation_obj.search(
            [
                ("type", "=", "model"),
                ("res_id", "=", target_record_id),
                ("name", "=", group["name"]),
                ("lang", "=", group["lang"]),
            ]
        )
        records = translation_obj.search(group["__domain"])
        if not target_translation and records:
            # There is no target translation, we pick one for being the new one
            records[:1].res_id = target_record_id
            records = records[1:]
        if records:
            records.unlink()
            logger.debug(
                "Deleted %s extra translations for %s (lang = %s).",
                len(records),
                group["name"],
                group["lang"],
            )


def _change_translations_sql(
    env, model_name, record_ids, target_record_id, exclude_columns
):
    if version_info[0] > 15:
        return
    if ("ir_translation", "res_id") in exclude_columns:
        return
    logged_query(
        env.cr,
        """
        UPDATE ir_translation it
        SET res_id = %(target_record_id)s
        FROM (
            SELECT min(it.id) as id, it.name, it.lang
            FROM ir_translation it
            LEFT JOIN ir_translation it2 ON (
                it2.type = it.type AND it2.name = it.name
                AND it2.lang = it.lang AND it2.res_id = %(target_record_id)s)
            WHERE it.type = 'model' AND it.res_id IN %(record_ids)s
                AND it.name like %(model_name)s || ',%%' AND it2.id IS NULL
            GROUP BY it.name, it.lang
        ) AS to_update
        WHERE it.id = to_update.id""",
        {
            "target_record_id": target_record_id,
            "record_ids": tuple(record_ids),
            "model_name": model_name,
        },
        skip_no_result=True,
    )
    logged_query(
        env.cr,
        """
        DELETE FROM ir_translation it
        USING (
            SELECT it.id
            FROM ir_translation it
            WHERE it.type = 'model' AND it.res_id IN %(record_ids)s
                AND it.name like %(model_name)s || ',%%'
        ) AS to_delete
        WHERE it.id = to_delete.id""",
        {
            "target_record_id": target_record_id,
            "record_ids": record_ids,
            "model_name": model_name,
        },
    )


# flake8: noqa: C901
def apply_operations_by_field_type(
    env,
    model_name,
    record_ids,
    target_record_id,
    field_spec,
    field_vals,
    field_type,
    column,
    operation,
    method,
):
    vals = {}
    o2m_changes = 0
    if method == "orm":
        model = env[model_name]
        all_records = model.browse((target_record_id,) + tuple(record_ids))
        target_record = model.browse(target_record_id)
        first_value = getattr(target_record, column)
        field = model._fields[column]
    else:
        first_value = field_vals[0]
    if field_type in ("char", "text", "html"):
        if not operation:
            operation = "other" if field_type == "char" else "merge"
        if operation == "first_not_null":
            field_vals = [x for x in field_vals if x]
            if field_vals:
                vals[column] = field_vals[0]
        elif operation == "merge":
            _list = filter(lambda x: x, field_vals)
            vals[column] = " | ".join(_list)
    elif field_type in ("jsonb", "serialized"):
        operation = operation or "first_not_null"
        if operation == "first_not_null":
            field_vals.reverse()
            field_val = {}
            for x in field_vals:
                field_val |= x or {}
            if field_val:
                if method == "sql":
                    if field_type == "serialized":
                        import json

                        vals[column] = json.dumps(field_val)
                    elif field_type == "jsonb":
                        from psycopg2.extras import Json

                        vals[column] = Json(field_val)
                else:
                    vals[column] = field_val
    elif field_type in ("integer", "float", "monetary"):
        if operation or field_type != "integer":
            field_vals = [0 if not x else x for x in field_vals]
        if not operation:
            operation = "other" if field_type == "integer" else "sum"
        if operation == "sum":
            vals[column] = sum(field_vals)
        elif operation == "avg":
            vals[column] = sum(field_vals) / len(field_vals)
        elif operation == "max":
            vals[column] = max(field_vals)
        elif operation == "min":
            vals[column] = min(field_vals)
        elif operation == "first_not_null":
            field_vals = [x for x in field_vals if x]
            if field_vals:
                vals[column] = field_vals[0]
    elif field_type == "boolean":
        if operation:
            field_vals = [False if x is None else x for x in field_vals]
        operation = operation or "other"
        if operation == "and":
            vals[column] = functools.reduce(lambda x, y: x & y, field_vals)
        elif operation == "or":
            vals[column] = functools.reduce(lambda x, y: x | y, field_vals)
    elif field_type in ("date", "datetime"):
        if operation:
            field_vals = list(filter(lambda x: x, field_vals))
        operation = field_vals and operation or "other"
        if operation == "max":
            vals[column] = max(field_vals)
        elif operation == "min":
            vals[column] = min(field_vals)
        elif operation == "first_not_null":
            field_vals = [x for x in field_vals if x]
            if field_vals:
                vals[column] = field_vals[0]
    elif field_type == "many2many" and method == "orm":
        operation = operation or "merge"
        if operation == "merge":
            field_vals = filter(lambda x: x is not False, field_vals)
            vals[column] = [(4, x.id) for x in field_vals]
    elif field_type == "one2many" and method == "orm":
        operation = operation or "merge"
        if operation == "merge":
            o2m_changes += 1
            field_vals.write({field.inverse_name: target_record_id})
    elif field_type == "binary":
        operation = operation or "merge"
        if operation == "merge":
            field_vals = [x for x in field_vals if x]
            if not first_value and field_vals:
                vals[column] = field_vals[0]
    elif field_type in ("many2one", "reference"):
        operation = operation or "merge"
        if operation == "merge":
            if method != "orm":
                field_vals = [x for x in field_vals if x]
            if not first_value and field_vals:
                vals[column] = field_vals[0]
    elif (
        field_type == "many2one_reference"
        and method == "orm"
        and field.model_field in model._fields
    ):
        operation = operation or "merge"
        if operation == "merge":
            if field.model_field in field_spec:
                del field_spec[field.model_field]
            list_model_field = all_records.mapped(field.model_field)
            zip_list = [(x, y) for x, y in zip(field_vals, list_model_field) if x and y]
            if first_value and zip_list:
                vals[column] = zip_list[0][0]
                vals[field.model_field] = zip_list[0][1]
    elif field_type == "selection":
        if operation == "first_not_null":
            field_vals = [x for x in field_vals if x]
            if field_vals:
                vals[column] = field_vals[0]
    if method == "orm":
        return vals, o2m_changes
    else:
        return vals


def _adjust_merged_values_orm(
    env, model_name, record_ids, target_record_id, field_spec
):
    """This method deals with the values on the records to be merged +
    the target record, performing operations that make sense on the meaning
    of the model.

    :param: field_spec: Dictionary with field names as keys and forced
      operation to perform as values. If a field is not present here, default
      operation will be performed.
      Note: If you pass 'openupgrade_other_fields': 'preserve' in the dict,
      the fields that are not specified in the dict will not be adjusted.

      Possible operations by field types:

      * Char, Text and Html fields:
        - 'merge' (default for Text and Html): content is concatenated
          with an ' | ' as separator
        - 'first_not_null': Put first not null value.
        - other value (default for Char): content on target record is preserved
      * Integer, Float and Monetary fields:
        - 'sum' (default for Float and Monetary): Sum all the values of
          the records.
        - 'avg': Perform the arithmetic average of the values of the records.
        - 'max': Put the maximum of all the values.
        - 'min': Put the minimum of all the values.
        - 'first_not_null': Put first non-zero value.
        - other value (default for Integer): content on target record
          is preserved
      * Binary field:
        - 'merge' (default): apply first not null value of the records if
        value of target record is null, preserve target value otherwise.
        - other value: content on target record is preserved
      * Boolean field:
        - 'and': Perform a logical AND over all values.
        - 'or': Perform a logical OR over all values.
        - other value (default): content on target record is preserved
      * Date and Datetime fields:
        - 'max': Put the maximum of all the values.
        - 'min': Put the minimum of all the values.
        - 'first_not_null': Put first defined Date(time) value.
        - other value (default): content on target record is preserved
      * Many2one fields:
        - 'merge' (default): apply first not null value of the records if
        value of target record is null, preserve target value otherwise.
        - other value: content on target record is preserved
      * Many2many fields:
        - 'merge' (default): combine all the values
        - other value: content on target record is preserved
      * One2many fields:
        - 'merge' (default): combine all the values
        - other value: content on target record is preserved
      * Many2manyReference fields:
        - 'merge' (default): if its model_field is in field_spec,
        delete it from there. Apply first positive (on field and
        corresponding model_field) of the records if value of target record
        is not positive, preserve target value otherwise.
        - other value: content on target record is preserved
      * Reference fields:
        - 'merge' (default): apply first not null value of the records if
        value of target record is null, preserve target value otherwise.
        - other value: content on target record is preserved
      * Selection fields:
        - any value: content on target record is preserved
        - 'first_not_null': Put first not null value.
      * Serialized fields:
        - 'first_not_null' (default): For each found key, put first not null value.
        - other value: content on target record is preserved
      * Translatable (in v16 or greater) fields as 'Jsonb' columns:
        - 'first_not_null' (default): For each found key, put first not null value.
        - other value: content on target record is preserved
    """
    model = env[model_name]
    fields = model._fields.values()
    all_records = model.browse((target_record_id,) + tuple(record_ids))
    target_record = model.browse(target_record_id)
    vals = {}
    o2m_changes = 0
    for field in fields:
        if (
            field_spec.get("openupgrade_other_fields", "") == "preserve"
            and field.name not in field_spec
        ):
            continue
        if not field.store or field.compute or field.related:
            continue  # don't do anything on these cases
        op = field_spec.get(field.name, False)
        if field.type != "reference":
            _list = all_records.mapped(field.name)
        else:
            _list = [x[field.name] for x in all_records if x[field.name]]
        field_vals, field_o2m_changes = apply_operations_by_field_type(
            env,
            model_name,
            record_ids,
            target_record_id,
            field_spec,
            _list,
            field.type if not (version_info[0] > 15 and field.translate) else "jsonb",
            field.name,
            op,
            "orm",
        )
        vals.update(field_vals)
        o2m_changes += field_o2m_changes
    if not vals:
        return
    # Curate values that haven't changed
    new_vals = {}
    for f in vals:
        if model._fields[f].type != "many2many":
            if vals[f] != getattr(target_record, f):
                new_vals[f] = vals[f]
        else:
            if [x[1] for x in vals[f]] not in getattr(target_record, f).ids:
                new_vals[f] = vals[f]
    if new_vals:
        target_record.write(new_vals)
        logger.debug(
            "Write %s value(s) in target record '%s' of model '%s'",
            len(new_vals) + o2m_changes,
            target_record_id,
            model_name,
        )


def _adjust_merged_values_sql(
    env, model_name, record_ids, target_record_id, model_table, field_spec
):
    """This method deals with the values on the records to be merged +
    the target record, performing operations that make sense on the meaning
    of the model.

    :param: field_spec: Dictionary with field names as keys and forced
      operation to perform as values. If a field is not present here, default
      operation will be performed.
      Note: If you pass 'openupgrade_other_fields': 'preserve' in the dict,
      the fields that are not specified in the dict will not be adjusted.

      Possible operations by field types same as _adjust_merged_values_orm.
    """
    if not column_exists(env.cr, model_table, "id"):
        # TODO: handle one2many and many2many
        return
    env.cr.execute(
        """
        SELECT isc.column_name, isc.data_type, imf.ttype
        FROM information_schema.columns isc
        JOIN ir_model_fields imf ON (
            imf.name = isc.column_name AND imf.model = %s)
        WHERE isc.table_name = %s
        """,
        (model_name, model_table),
    )
    dict_column_type = env.cr.fetchall()
    columns = ", ".join([x[0] for x in dict_column_type])
    env.cr.execute(
        """SELECT {columns}
        FROM {table}
        WHERE id IN %(record_ids)s""".format(
            table=model_table,
            columns=columns,
        ),
        {"record_ids": (target_record_id,) + tuple(record_ids)},
    )
    lists = list(zip(*(env.cr.fetchall())))
    new_vals = {}
    vals = {}
    for i, (column, column_type, field_type) in enumerate(dict_column_type):
        if (
            field_spec.get("openupgrade_other_fields", "") == "preserve"
            and column not in field_spec
        ):
            continue
        op = field_spec.get(column, False)
        _list = list(lists[i])
        if column_type == "jsonb":
            field_type = column_type
        if field_type == "serialized":
            import json

            _list = [x if isinstance(x, dict) else json.loads(x) for x in _list]
        field_vals = apply_operations_by_field_type(
            env,
            model_name,
            record_ids,
            target_record_id,
            field_spec,
            _list,
            field_type,
            column,
            op,
            "sql",
        )
        vals.update(field_vals)
    if not vals:
        return
    # Curate values that haven't changed
    env.cr.execute(
        """SELECT {columns}
        FROM {table}
        WHERE id = %(target_record_id)s
        """.format(
            table=model_table, columns=", ".join(list(vals.keys()))
        ),
        {"target_record_id": target_record_id},
    )
    record_vals = env.cr.dictfetchall()
    for column in vals:
        if vals[column] != record_vals[0]:
            new_vals[column] = vals[column]
    if new_vals:
        ident_dict = {x: sql.Identifier(x) for x in new_vals.keys()}
        query = sql.SQL(
            "UPDATE {table} SET {set_value} WHERE {id} = %(target_record_id)s"
        ).format(
            table=sql.Identifier(model_table),
            id=sql.Identifier("id"),
            set_value=sql.SQL(
                ", ".join(
                    [
                        "{{{field}}} = %({field})s".format(field=x)
                        for x in new_vals.keys()
                    ]
                )
            ).format(**ident_dict),
        )
        new_vals["target_record_id"] = target_record_id
        logged_query(env.cr, query, new_vals)


def _change_generic(
    env,
    model_name,
    record_ids,
    target_record_id,
    exclude_columns,
    method="orm",
    new_model_name=None,
):
    """Update known generic style res_id/res_model references.
    :param env: ORM environment
    :param model_name: Name of the model that have the generic references.
    :param record_ids: List of ids of the records to be changed.
    :param target_record_id: ID of the target record to host the source records
    :param exclude_columns: list of columns to exclude from the update
    :param method: 'orm' or 'sql'
    :param new_model_name: If specified, name of the new model to use in the
      references. This is useful for being used outside the merge records
      feature, for example when replacing one model per another (i.e.:
      account.invoice > account.move).
    """
    for model_to_replace, res_id_column, model_column in [
        ("calendar.event", "res_id", "res_model"),
        ("ir.attachment", "res_id", "res_model"),
        ("mail.activity", "res_id", "res_model"),
        ("mail.followers", "res_id", "res_model"),
        ("mail.message", "res_id", "model"),
        ("rating.rating", "res_id", "res_model"),
    ]:
        try:
            model = env[model_to_replace].with_context(active_test=False)
            table = model._table
        except KeyError:
            if method == "orm":
                continue
            table = get_model2table(model_to_replace)
        if (table, res_id_column) in exclude_columns:
            continue
        if method == "orm":
            if not model._fields.get(model_column) or not model._fields.get(
                res_id_column
            ):
                continue
            records = model.search(
                [(model_column, "=", model_name), (res_id_column, "in", record_ids)]
            )
            if records:
                vals = {res_id_column: target_record_id}
                if new_model_name:
                    vals[model_column] = new_model_name
                if model_to_replace != "mail.followers":
                    records.write(vals)
                else:
                    # We need to avoid duplicated results in this model
                    target_duplicated = model.search(
                        [
                            (model_column, "=", model_name),
                            (res_id_column, "=", target_record_id),
                            ("partner_id", "in", records.mapped("partner_id").ids),
                        ]
                    )
                    dup_partners = target_duplicated.mapped("partner_id")
                    duplicated = records.filtered(
                        lambda x: (x.partner_id in dup_partners)
                    )
                    (records - duplicated).write(vals)
                    duplicated.unlink()
                logger.debug(
                    "Changed %s record(s) of model '%s'", len(records), model_to_replace
                )
        else:
            if not column_exists(env.cr, table, res_id_column) or not column_exists(
                env.cr, table, model_column
            ):
                continue
            format_args = {
                "table": sql.Identifier(table),
                "res_id_column": sql.Identifier(res_id_column),
                "model_column": sql.Identifier(model_column),
            }
            query_args = {
                "model_name": model_name,
                "new_model_name": new_model_name or model_name,
                "target_record_id": target_record_id,
                "record_ids": tuple(record_ids),
            }
            query = sql.SQL(
                "UPDATE {table} SET {res_id_column} = %(target_record_id)s"
            ).format(**format_args)
            if new_model_name:
                query += sql.SQL(", {model_column} = %(new_model_name)s").format(
                    **format_args
                )
            query += sql.SQL(" WHERE {model_column} = %(model_name)s ").format(
                **format_args
            )
            if model_to_replace != "mail.followers":
                query += sql.SQL("AND {res_id_column} in %(record_ids)s").format(
                    **format_args
                )
                logged_query(env.cr, query, query_args, skip_no_result=True)
            else:
                for record_id in record_ids:
                    query_args["record_id"] = record_id
                    query2 = (
                        query
                        + sql.SQL(
                            """AND {res_id_column} = %(record_id)s
                        AND partner_id NOT IN (
                            SELECT partner_id FROM {table}
                            WHERE {res_id_column} = %(target_record_id)s
                                AND {model_column} = %(new_model_name)s
                        )"""
                        ).format(**format_args)
                    )
                    logged_query(
                        env.cr,
                        query2,
                        query_args,
                        skip_no_result=True,
                    )
                # Remove remaining records non updated (that are duplicates)
                logged_query(
                    env.cr,
                    sql.SQL(
                        "DELETE FROM {table} "
                        "WHERE {model_column} = %(model_name)s "
                        "AND {res_id_column} IN %(record_ids)s"
                    ).format(**format_args),
                    query_args,
                    skip_no_result=True,
                )


def _delete_records_sql(
    env, model_name, record_ids, target_record_id, model_table=None
):
    if not model_table:
        try:
            model_table = env[model_name]._table
        except KeyError:
            model_table = get_model2table(model_name)
    logged_query(
        env.cr,
        "DELETE FROM ir_model_data WHERE model = %s AND res_id IN %s",
        (model_name, tuple(record_ids)),
    )
    logged_query(
        env.cr,
        "DELETE FROM ir_attachment WHERE res_model = %s AND res_id IN %s",
        (model_name, tuple(record_ids)),
    )
    logged_query(
        env.cr,
        sql.SQL("DELETE FROM {} WHERE id IN %s").format(sql.Identifier(model_table)),
        (tuple(record_ids),),
    )


def _delete_records_orm(env, model_name, record_ids, target_record_id):
    records = env[model_name].browse(record_ids).exists()
    if records:
        records.unlink()
        logger.debug(
            "Deleted %s source record(s) of model '%s'",
            len(record_ids),
            model_name,
        )


def _check_recurrence(env, model_name, record_ids, target_record_id, model_table=None):
    if not model_table:
        try:
            model_table = env[model_name]._table
        except KeyError:
            model_table = get_model2table(model_name)
    env.cr.execute(
        """
        SELECT tc.table_name, kcu.column_name, COALESCE(imf.column1, 'id')
        FROM information_schema.table_constraints AS tc
        JOIN information_schema.key_column_usage AS kcu
            ON tc.constraint_name = kcu.constraint_name
            AND tc.table_schema = kcu.table_schema
        JOIN information_schema.constraint_column_usage AS ccu
            ON ccu.constraint_name = tc.constraint_name
            AND ccu.table_schema = tc.table_schema
        JOIN ir_model_fields AS imf
            ON imf.model = %s AND imf.relation = imf.model AND ((
                imf.name = kcu.column_name AND
                tc.table_name = ccu.table_name) OR (
                imf.column2 = kcu.column_name AND
                tc.table_name = imf.relation_table))
        WHERE tc.constraint_type = 'FOREIGN KEY'
        AND ccu.table_name = %s and ccu.column_name = 'id'
        """,
        (model_name, model_table),
    )
    for table, column, origin in env.cr.fetchall():
        query = sql.SQL(
            """SELECT {column} FROM {table}
            WHERE {origin} = %(target_record_id)s"""
        ).format(
            table=sql.Identifier(table),
            column=sql.Identifier(column),
            origin=sql.Identifier(origin),
        )
        env.cr.execute(
            query,
            {
                "target_record_id": target_record_id,
            },
        )
        new_parent_row = env.cr.fetchall()
        if new_parent_row and new_parent_row[0] in record_ids:
            # When we already have recursive hierarchy, doing a
            # merge of a parent into one of their children let the
            # awkward situation where the child points to itself,
            # so we avoid it checking this condition
            logger.info(
                "Couldn't merge %s record(s) of model %s to record_id %s"
                " to avoid recursion with field %s of table %s",
                len(record_ids),
                model_name,
                target_record_id,
                origin,
                table,
            )
            return True
    return False


def merge_records(
    env,
    model_name,
    record_ids,
    target_record_id,
    field_spec=None,
    method="orm",
    delete=True,
    exclude_columns=None,
    model_table=None,
):
    """Merge several records into the target one.

    NOTE: This should be executed in end migration scripts for assuring that
    all the possible relations are loaded and changed. Tested on v10/v11.

    :param env: Environment variable
    :param model_name: Name of the model of the records to merge
    :param record_ids: List of IDS of records that are going to be merged.
    :param target_record_id: ID of the record where the rest records are going
      to be merge in.
    :param field_spec: Dictionary with field names as keys and forced operation
      to perform as values. If a field is not present here, default operation
      will be performed. See _adjust_merged_values_orm method doc for all the
      available operators.
    :param method: Specify how to perform operations. By default or specifying
      'orm', operations will be performed with ORM, maybe slower, but safer, as
      related and computed fields will be recomputed on changes, and all
      constraints will be checked.
    :param delete: If set, the source ids will be unlinked.
    :exclude_columns: list of tuples (table, column) that will be ignored.
    :model_table: name of the model table. If not provided, got through ORM.
    """
    if exclude_columns is None:
        exclude_columns = []
    if field_spec is None and method == "orm":
        field_spec = {}
    if isinstance(record_ids, list):
        record_ids = tuple(record_ids)
    args0 = (env, model_name, record_ids, target_record_id)
    args = args0 + (exclude_columns,)
    if target_record_id in record_ids:
        raise Exception(
            "You can't put the target record in the list or records to be merged."
        )
    _change_generic(*args, method=method)  # pylint: disable=E1124
    if method == "orm":
        # Check which records to be merged exist
        record_ids = env[model_name].browse(record_ids).exists().ids
        if not record_ids:
            return
        if _check_recurrence(env, model_name, record_ids, target_record_id):
            return
        _change_many2one_refs_orm(*args)
        _change_many2many_refs_orm(*args)
        _change_reference_refs_orm(*args)
        _change_translations_orm(*args)
        args2 = args0 + (field_spec,)
        # TODO: serialized fields
        with env.norecompute():
            _adjust_merged_values_orm(*args2)
        if version_info[0] > 15:
            env[model_name].flush_model()
        else:
            env[model_name].recompute()
        if delete:
            _delete_records_orm(env, model_name, record_ids, target_record_id)
    else:
        # Check which records to be merged exist
        if not model_table:
            try:
                model_table = env[model_name]._table
            except KeyError:
                model_table = get_model2table(model_name)
        env.cr.execute(
            sql.SQL("SELECT id FROM {} WHERE id IN %s").format(
                sql.Identifier(model_table)
            ),
            (tuple(record_ids),),
        )
        record_ids = [x[0] for x in env.cr.fetchall()]
        if not record_ids:
            return
        if _check_recurrence(
            env, model_name, record_ids, target_record_id, model_table=model_table
        ):
            return
        args3 = args + (model_table,)
        _change_foreign_key_refs(*args3)
        _change_reference_refs_sql(*args)
        _change_translations_sql(*args)
        if field_spec is not None:
            args4 = args0 + (model_table,) + (field_spec,)
            _adjust_merged_values_sql(*args4)
        if delete:
            _delete_records_sql(
                env, model_name, record_ids, target_record_id, model_table=model_table
            )