shopinvader/odoo-shopinvader

View on GitHub
shopinvader/models/shopinvader_backend.py

Summary

Maintainability
C
1 day
Test Coverage
# -*- coding: utf-8 -*-
# Copyright 2017 Akretion (http://www.akretion.com).
# @author Sébastien BEAU <sebastien.beau@akretion.com>
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).

from odoo import _, api, fields, models, tools
from odoo.http import request
from odoo.osv import expression

from odoo.addons.queue_job.job import job
from odoo.addons.server_environment import serv_config


class ShopinvaderBackend(models.Model):
    _name = "shopinvader.backend"

    name = fields.Char(required=True)
    company_id = fields.Many2one(
        "res.company",
        "Company",
        required=True,
        default=lambda s: s._default_company_id(),
    )
    location = fields.Char()
    notification_ids = fields.One2many(
        "shopinvader.notification", "backend_id", "Notification"
    )
    nbr_product = fields.Integer(compute="_compute_nbr_content")
    nbr_variant = fields.Integer(compute="_compute_nbr_content")
    nbr_category = fields.Integer(compute="_compute_nbr_content")
    allowed_country_ids = fields.Many2many(
        comodel_name="res.country", string="Allowed Country"
    )
    anonymous_partner_id = fields.Many2one(
        "res.partner",
        "Anonymous Partner",
        help=(
            "Provide partner settings for unlogged users "
            "(i.e. fiscal position)"
        ),
        required=True,
        default=lambda self: self.env.ref("shopinvader.anonymous"),
    )
    sequence_id = fields.Many2one(
        "ir.sequence", "Sequence", help="Naming policy for orders and carts"
    )
    auth_api_key_name = fields.Selection(
        required=False,  # required into the UI to allow demo data
        help="The name of the api_key section used for the authentication of"
        "calls to services dedicated to this backend",
        selection=lambda a: a._get_auth_api_key_name_selection(),
    )
    lang_ids = fields.Many2many("res.lang", string="Lang", required=True)
    pricelist_id = fields.Many2one("product.pricelist", string="Pricelist")

    account_analytic_id = fields.Many2one(
        comodel_name="account.analytic.account",
        string="Analytic account",
        help="This analytic account will be used to fill the "
        "field on the sale order created.",
    )
    filter_ids = fields.Many2many(
        comodel_name="product.filter", string="Filter"
    )
    use_shopinvader_product_name = fields.Boolean(
        string="Use Shopinvader product display name",
        help="If checked, use the specific shopinvader display name for "
        "products instead of the original product name.",
    )
    category_binding_level = fields.Integer(
        default=0,
        help="Define if the product binding should also bind related "
        "categories and how many related parents.\n"
        "Set 0 (or less) to disable the category auto-binding.\n"
        "Set 1 to auto-bind the direct category.\n"
        "Set 2 to auto-bind the direct category and his parent.\n"
        "etc.",
    )
    user_id = fields.Many2one(
        comodel_name="res.users",
        compute="_compute_user_id",
        help="The technical user used to process calls to the services "
        "provided by the backend",
    )
    website_public_name = fields.Char(
        help="Public name of your backend/website."
    )
    clear_cart_options = fields.Selection(
        selection=[
            ("delete", "Delete"),
            ("clear", "Clear"),
            ("cancel", "Cancel"),
        ],
        required=True,
        string="Clear cart",
        default="clear",
        help="Action to execute on the cart when the front want to clear the "
        "current cart:\n"
        "- Delete: delete the cart (and items);\n"
        "- Clear: keep the cart but remove items;\n"
        "- Cancel: The cart is canceled but kept into the database.\n"
        "It could be useful if you want to keep cart for "
        "statistics reasons. A new cart is created automatically when the "
        "customer will add a new item.",
    )
    simple_cart_service = fields.Boolean(
        help="If this option is checked, the add item action on frontend will"
        " either add a new line either increase qty but promotion, taxes,"
        " subtotal computations will be delegated to an asynchronous job."
        " It the customer wants to see his cart before its execution,"
        " the computation will be done on the fly to ensure data"
        " integrity"
    )
    authorize_not_bound_products = fields.Boolean(
        help="Check this if you want to authorize cart to display products"
        "that are not bound to this backend. This can be useful if"
        "you want to modify existing carts from backend."
    )

    _sql_constraints = [
        (
            "auth_api_key_name_uniq",
            "unique(auth_api_key_name)",
            "An authentication API Key can be used by only one backend.",
        )
    ]

    @api.model
    def _get_auth_api_key_name_selection(self):
        selection = []
        for section in serv_config.sections():
            if section.startswith("api_key_") and serv_config.has_option(
                section, "key"
            ):
                selection.append((section, section))
        return selection

    @api.depends("auth_api_key_name")
    @api.multi
    def _compute_user_id(self):
        for rec in self:
            section = rec.auth_api_key_name
            login_name = serv_config.get(section, "user")
            user_model = self.env["res.users"]
            if serv_config.has_option(section, "allow_inactive_user"):
                allow_inactive_user = serv_config.getboolean(
                    section, "allow_inactive_user"
                )
                if allow_inactive_user:
                    user_model = user_model.with_context(active_test=False)
            rec.user_id = user_model.search([("login", "=", login_name)])

    @api.model
    def _default_company_id(self):
        return self.env["res.company"]._company_default_get(
            "shopinvader.backend"
        )

    def _to_compute_nbr_content(self):
        """
        Get a dict to compute the number of content.
        The dict is build like this:
        Key = Odoo number fields string (should be Integer/Float)
        Value = The target model string
        :return: dict
        """
        values = {
            # key => Odoo field: value => related model
            "nbr_product": "shopinvader.product",
            "nbr_category": "shopinvader.category",
            "nbr_variant": "shopinvader.variant",
        }
        return values

    def _compute_nbr_content(self):
        to_count = self._to_compute_nbr_content()
        domain = [("backend_id", "in", self.ids)]
        for odoo_field, odoo_model in to_count.items():
            if odoo_model in self.env and self.env[odoo_model]._table_exist():
                target_model_obj = self.env[odoo_model]
                result = target_model_obj.read_group(
                    domain, ["backend_id"], ["backend_id"], lazy=False
                )
                result = {
                    data["backend_id"][0]: data["__count"] for data in result
                }
                for record in self:
                    record[odoo_field] = result.get(record.id, 0)

    def _bind_all_content(self, model, bind_model, domain):
        bind_model_obj = self.env[bind_model].with_context(active_test=False)
        model_obj = self.env[model]
        records = model_obj.search(domain)
        binds = bind_model_obj.search(
            [
                ("backend_id", "in", self.ids),
                ("record_id", "in", records.ids),
                ("lang_id", "in", self.mapped("lang_ids").ids),
            ]
        )
        for backend in self:
            for lang in backend.lang_ids:
                for record in records:
                    bind = fields.first(
                        binds.filtered(
                            lambda b: b.backend_id == backend
                            and b.record_id == record
                            and b.lang_id == lang
                        )
                    )
                    if not bind:
                        bind_model_obj.with_context(map_children=True).create(
                            {
                                "backend_id": backend.id,
                                "record_id": record.id,
                                "lang_id": lang.id,
                            }
                        )
                    elif not bind.active:
                        bind.write({"active": True})
        return True

    @api.multi
    def bind_all_product(self):
        result = self._bind_all_content(
            "product.template", "shopinvader.product", [("sale_ok", "=", True)]
        )
        self.auto_bind_categories()
        return result

    @api.multi
    def auto_bind_categories(self):
        """
        Auto bind product.category for binded shopinvader.product
        :return: bool
        """
        backends = self.filtered(lambda b: b.category_binding_level > 0)
        if not backends:
            return True
        all_products = self.env["shopinvader.variant"].search(
            [
                ("backend_id", "in", backends.ids),
                # Force to have only active binding
                ("active", "=", True),
            ]
        )
        for backend in backends:
            shopinv_variants = all_products.filtered(
                lambda p: p.backend_id == backend
            )
            products = shopinv_variants.mapped("record_id")
            categories = backend._get_related_categories(products)
            if categories:
                self._bind_all_content(
                    categories._name,
                    "shopinvader.category",
                    [("id", "in", categories.ids)],
                )
        return True

    @api.multi
    def _get_related_categories(self, products):
        """
        Get product.category to bind (based on current backend and
        given products)
        :param products: product recordset (product or template)
        :return: product.category recordset
        """
        self.ensure_one()
        # As we consume the first level (direct category), minus 1
        level = self.category_binding_level - 1
        categories = products.mapped("categ_id")
        # pull up until the correct level
        parent_categories = categories
        while level > 0:
            parent_categories = parent_categories.mapped("parent_id")
            categories |= parent_categories
            level -= 1
        return categories

    @api.multi
    def bind_all_category(self):
        self._bind_all_content("product.category", "shopinvader.category", [])

    def _send_notification(self, notification, record):
        self.ensure_one()
        record.ensure_one()
        notifs = self.env["shopinvader.notification"].search(
            [
                ("backend_id", "=", self.id),
                ("notification_type", "=", notification),
            ]
        )
        description = _("Notify %s for %s,%s") % (
            notification,
            record._name,
            record.id,
        )
        for notif in notifs:
            job_priority = notif.queue_job_priority
            # If < 0 => Live notification
            if job_priority < 0:
                notif.send(record.id)
            else:
                notif.with_delay(
                    description=description, priority=job_priority
                ).send(record.id)
        return True

    def _extract_configuration(self):
        return {}

    @classmethod
    def _get_api_key_name(cls, auth_api_key):
        for section in serv_config.sections():
            if section.startswith("api_key_") and serv_config.has_option(
                section, "key"
            ):
                if tools.consteq(
                    auth_api_key, serv_config.get(section, "key")
                ):
                    return section
        return None

    @api.model
    @tools.ormcache("self._uid", "auth_api_key")
    def _get_id_from_auth_api_key(self, auth_api_key):
        auth_api_key_name = self._get_api_key_name(auth_api_key)
        if auth_api_key_name:
            # filtered, not search because auth_api_key_name is
            # not a searchable field
            return (
                self.search([])
                .filtered(lambda r: r.auth_api_key_name == auth_api_key_name)
                .id
            )
        return False

    @api.model
    def _get_from_http_request(self):
        auth_api_key = getattr(request, "auth_api_key", None)
        return self.browse(self._get_id_from_auth_api_key(auth_api_key))

    @api.multi
    @job(default_channel="root.shopinvader")
    def _job_sale_price_update(self, sales):
        """
        Jobify the process to update prices.
        After launching the price update, we also have to re-apply promotions
        (in case of the promotion change and conditions doesn't match anymore).
        Could be inherited to add others prices recompute.
        :param sales: sale.order recordset
        :return: None
        """
        sales._update_pricelist_and_update_line_prices()

    @api.multi
    @job(default_channel="root.shopinvader")
    def _job_split_sale_price_update(self, sales):
        """
        Split the current job on many SO to 1 job per SO.
        To avoid rollback full price update in case of error.
        Better to have 1 SO bad recomputed instead of all SO.
        :param sales: sale.order recordset
        :return: bool
        """
        for sale in sales:
            description = "Recompute prices for cart %s" % sale.display_name
            self.with_delay(description=description)._job_sale_price_update(
                sale
            )
        return True

    @api.model
    def _launch_sale_price_update(self, domain=False):
        """
        Retrieve cart to update then apply the recalculation
        (could be used for a cron)
        :param domain: list/domain
        :return: bool
        """
        domain = domain or []
        if domain:
            domain = expression.normalize_domain(domain)
        sale_domain = expression.normalize_domain(
            [("typology", "=", "cart"), ("state", "=", "draft")]
        )
        domain = expression.AND([domain, sale_domain])
        sale_carts = self.env["sale.order"].search(domain)
        if sale_carts:
            description = "Recompute prices for carts (split: 1 job per cart)"
            return self.with_delay(
                description=description
            )._job_split_sale_price_update(sale_carts)
        return True