shopinvader/odoo-shopinvader

View on GitHub
shopinvader/services/cart.py

Summary

Maintainability
D
2 days
Test Coverage
# -*- coding: utf-8 -*-
# Copyright 2016 Akretion (http://www.akretion.com)
# Sébastien BEAU <sebastien.beau@akretion.com>
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
# pylint: disable=consider-merging-classes-inherited
import logging

from cerberus import Validator
from werkzeug.exceptions import NotFound

from odoo.exceptions import UserError
from odoo.tools.translate import _

from odoo.addons.base_rest.components.service import to_int
from odoo.addons.component.core import Component

from .. import shopinvader_response

_logger = logging.getLogger(__name__)


class CartService(Component):
    _inherit = "shopinvader.abstract.sale.service"
    _name = "shopinvader.cart.service"
    _usage = "cart"

    @property
    def cart_recompute_identify_key(self):
        return "sale.order._shopinvader_delayed_recompute.%s" % self.cart_id

    @property
    def cart_id(self):
        return self.shopinvader_session.get("cart_id", 0)

    # The following method are 'public' and can be called from the controller.

    def search(self):
        """Return the cart that have been set in the session or
           search an existing cart for the current partner"""
        if not self.cart_id:
            return {}
        # If the cart_id doesn't exist anymore, we don't have to create a new
        return self._to_json(self._get(create_if_not_found=False))

    def update(self, **params):
        cart = self._get()
        response = self._update(cart, params)
        if response.get("redirect_to"):
            return response
        else:
            return self._to_json(cart)

    def add_item(self, **params):
        """
        Add item to cart.
        Don't access to cart fields in this method. Do it in _add_item.
        The cart has to be recomputed
        :param params:
        :return:
        """
        simple_service = self.shopinvader_backend.simple_cart_service
        cart = self._get()
        if not cart:
            cart = self._create_empty_cart()
        # Modify the cart with no recomputation
        with self.env.norecompute():
            item = self._add_item(cart, params)
        self._launch_cart_recompute(cart, item)
        return self._to_json(cart, simple=simple_service)

    def update_item(self, **params):
        cart = self._get()
        self._update_item(cart, params)
        return self._to_json(cart)

    def delete_item(self, **params):
        cart = self._get()
        self._delete_item(cart, params)
        return self._to_json(cart)

    def clear(self):
        """
        Clear the current cart (by $session)
        :return: dict/json
        """
        cart = self._get()
        cart = self._clear_cart(cart)
        return self._to_json(cart)

    def _get_validator_cart_by_id_domain(self, value):
        """
        Get cart by id domain. Limiting to session partner.
        :param value:
        :return:
        """
        return [("partner_id", "=", self.partner.id), ("id", "=", value)]

    def _get_line_copy_vals(self, line):
        """
        Prepare copy values to be passed to _add_item
        :param line: sale.order.line
        :return: dict
        """
        return {
            "product_id": line.product_id.id,
            "item_qty": line.product_uom_qty,
        }

    def _get_lines_to_copy(self, cart):
        return cart.order_line

    # pylint: disable=W8102,W8106
    def copy(self, **params):
        """
        This service allows
        :param params:
        :return:
        """
        return self._copy(**params)

    def _copy(self, **params):
        """
        Copy the cart given by id without the lines
        They will be re-added
        :return: dict/json
        """
        cart = self.env["sale.order"].search(
            self._get_validator_cart_by_id_domain(params.get("id"))
        )
        # Copy the existing cart
        # Delete all lines and re-add them with 'shopinvader' flavour
        new_cart = cart.copy({"order_line": False, "typology": "cart"})
        for line in self._get_lines_to_copy(cart):
            vals = self._get_line_copy_vals(line)
            self._add_item(new_cart, vals)

        return self._to_json(new_cart)

    # Validator
    def _cart_validator_exists(self, field, value, error):
        """
        Implements 'check_with' validation
        :param field:
        :param value:
        :param error:
        :return:
        """
        cart = self.env["sale.order"].search(
            self._get_validator_cart_by_id_domain(value)
        )
        if len(cart) != 1:
            error(
                field, _("The cart does not exists or does not belong to you!")
            )

    def _validator_copy(self):
        return {
            "id": {
                "coerce": to_int,
                "required": True,
                "type": "integer",
                "check_with": self._cart_validator_exists,
            }
        }

    def _validator_return_copy(self):
        return Validator({}, allow_unknown=True)

    def _validator_search(self):
        return {}

    def _validator_clear(self):
        return {}

    def _subvalidator_shipping(self):
        return {
            "type": "dict",
            "schema": {
                "address": {
                    "type": "dict",
                    "schema": {"id": {"coerce": to_int}},
                }
            },
        }

    def _subvalidator_invoicing(self):
        return {
            "type": "dict",
            "schema": {
                "address": {
                    "type": "dict",
                    "schema": {"id": {"coerce": to_int}},
                }
            },
        }

    def _subvalidator_step(self):
        return {
            "type": "dict",
            "schema": {
                "current": {"type": "string"},
                "next": {"type": "string"},
            },
        }

    def _validator_update(self):
        return {
            "step": self._subvalidator_step(),
            "shipping": self._subvalidator_shipping(),
            "invoicing": self._subvalidator_invoicing(),
            "note": {"type": "string"},
        }

    def _validator_add_item(self):
        return {
            "product_id": {
                "coerce": to_int,
                "required": True,
                "type": "integer",
            },
            "item_qty": {"coerce": float, "required": True, "type": "float"},
        }

    def _validator_update_item(self):
        return {
            "item_id": {"coerce": to_int, "required": True, "type": "integer"},
            "item_qty": {"coerce": float, "required": True, "type": "float"},
        }

    def _validator_delete_item(self):
        return {
            "item_id": {"coerce": to_int, "required": True, "type": "integer"}
        }

    # The following method are 'private' and should be never never NEVER call
    # from the controller.
    # All params are trusted as they have been checked before

    def _launch_cart_recompute(self, cart, item):
        """
        Launches cart recompute depending the backend configuration
        :param cart:
        :param item:
        :return:
        """
        simple_service = self.shopinvader_backend.simple_cart_service
        if simple_service:
            # Recompute cart asynchronously to avoid latencies on frontend
            description = "Recompute cart %s" % (item.order_id.id)
            item.order_id.with_delay(
                description=description,
                priority=1,
                identity_key=self.cart_recompute_identify_key,
            )._shopinvader_delayed_recompute()
        else:
            cart.recompute()
            item.order_id.shopinvader_to_be_recomputed = False

    def _upgrade_cart_item_quantity(self, cart, item, product_qty):
        vals = {"product_uom_qty": product_qty}
        new_values = item.play_onchanges(vals, vals.keys())
        # clear cache after play onchange
        real_line_ids = [line.id for line in cart.order_line if line.id]
        cart._cache["order_line"] = tuple(real_line_ids)
        vals.update(new_values)
        item.write(vals)

    def _do_clear_cart_cancel(self, cart):
        """
        Cancel the existing cart.
        Don't need to create a new one because it'll done automatically
        when the customer will add a new item.
        :param cart: sale.order recordset
        :return: sale.order recordset
        """
        cart.action_cancel()
        return cart.browse()

    def _do_clear_cart_delete(self, cart):
        """
        Delete/unlink the given cart
        :param cart: sale.order recordset
        :return: sale.order recordset
        """
        cart.unlink()
        return cart.browse()

    def _do_clear_cart_clear(self, cart):
        """
        Remove items from given cart.
        :param cart: sale.order recordset
        :return: sale.order recordset
        """
        cart.write({"order_line": [(5, False, False)]})
        return cart

    def _clear_cart(self, cart):
        """
        Action to clear the cart, depending on the backend configuration.
        :param cart: sale.order recordset
        :return: sale.order recordset
        """
        clear_option = self.shopinvader_backend.clear_cart_options
        do_clear = "_do_clear_cart_%s" % clear_option
        if hasattr(self, do_clear):
            cart = getattr(self, do_clear)(cart)
        else:
            _logger.error("The %s function doesn't exists.", do_clear)
            raise NotImplementedError(_("Missing feature to clear the cart!"))
        return cart

    def _add_item(self, cart, params):
        existing_item = self._check_existing_cart_item(cart, params)
        if existing_item:
            qty = existing_item.product_uom_qty + params["item_qty"]
            self._upgrade_cart_item_quantity(cart, existing_item, qty)
        else:
            vals = self._prepare_cart_item(params, cart)
            new_values = self._sale_order_line_onchange(vals)
            vals.update(new_values)
            # As the frontend could be in several languages but we have only
            # one anonymous parnter with his language set, we need to ensure
            # that description on the line is in the right language
            partner = cart.partner_id
            ctx_lang = self.env.context.get("lang", partner.lang)
            if partner.lang != ctx_lang:
                product_id = vals["product_id"]
                vals["name"] = self._get_sale_order_line_name(product_id)
            existing_item = self.env["sale.order.line"].create(vals)
        existing_item.order_id.shopinvader_to_be_recomputed = True
        return existing_item

    def _get_sale_order_line_name(self, product_id):
        product = self.env["product.product"].browse(product_id)
        name = product.name_get()[0][1]
        if product.description_sale:
            name += "\n" + product.description_sale
        return name

    def _sale_order_line_onchange(self, vals):
        """
        Simulate the onchange on sale.order.line with given vals.
        :param vals: dict
        :return: dict
        """
        # the next statement is done with suspending the security for
        #  performance reasons. It is safe only if both 3 following
        # fields are filled on the sale order:
        # - company_id
        # - fiscal_position_id
        # - pricelist_id
        so_line_obj = self.env["sale.order.line"].suspend_security()
        new_values = so_line_obj.play_onchanges(vals, vals.keys())
        return new_values

    def _update_item(self, cart, params, item=False):
        if not item:
            item = self._get_cart_item(cart, params, raise_if_not_found=False)
        if item:
            self._upgrade_cart_item_quantity(cart, item, params["item_qty"])
            return
        # The item id is maybe the one from a previous cart.
        line_id = params["item_id"]
        line = self.env["sale.order.line"].search(
            [
                ("id", "=", line_id),
                ("order_id.partner_id", "=", cart.partner_id.id),
            ]
        )
        if line:
            # silently create a new line on the new cart from the previous
            # line. This case could occurs if the customer click on the add
            # button from within an old session still open in its browser
            add_item_params = self._prepare_add_item_params_from_line(line)
            add_item_params["item_qty"] = params["item_qty"]
            self._add_item(cart, add_item_params)
            return params["item_qty"]
        raise NotFound("No cart item found with id %s" % params["item_id"])

    def _delete_item(self, cart, params):
        item = self._get_cart_item(cart, params, raise_if_not_found=False)
        if item:
            item.unlink()

    def _prepare_add_item_params_from_line(self, sale_order_line):
        return {"product_id": sale_order_line.product_id.id, "item_qty": 1}

    def _prepare_shipping(self, shipping, params):
        if "address" in shipping:
            address = shipping["address"]
            # By default we always set the invoice address with the
            # shipping address, if you want a different invoice address
            # just pass it
            params["partner_shipping_id"] = address["id"]
            params["partner_invoice_id"] = params["partner_shipping_id"]

    def _prepare_invoicing(self, invoicing, params):
        if "address" in invoicing:
            params["partner_invoice_id"] = invoicing["address"]["id"]

    def _prepare_step(self, step, params):
        if "next" in step:
            params["current_step_id"] = self._get_step_from_code(
                step["next"]
            ).id
        if "current" in step:
            params["done_step_ids"] = [
                (4, self._get_step_from_code(step["current"]).id, 0)
            ]

    def _prepare_update(self, cart, params):
        if "shipping" in params:
            self._prepare_shipping(params.pop("shipping"), params)
        if "invoicing" in params:
            self._prepare_invoicing(params.pop("invoicing"), params)
        if "step" in params:
            self._prepare_step(params.pop("step"), params)
        return params

    def _update(self, cart, params):
        params = self._prepare_update(cart, params)
        if params:
            cart.write_with_onchange(params)
        return {}

    def _get_step_from_code(self, code):
        step = self.env["shopinvader.cart.step"].search([("code", "=", code)])
        if not step:
            raise UserError(_("Invalid step code %s") % code)
        else:
            return step

    def _to_json_simple(self, cart):
        """
        Returns simple and fast items
        :return: dict
        """
        cart_simple = cart.with_context(prefetch_fields=False)
        qty = sum(
            line.product_uom_qty
            for line in cart_simple.order_line
            if self._is_item(line)
        )
        return {"id": cart.id, "lines": {"count": qty}}

    def _to_json(self, cart, simple=False):
        """
        Return cart json depending on which type of service (simple or not)
        Modify Session response accordingly
        :param cart:
        :param simple:
        :return:
        """
        response = shopinvader_response.get()
        if not cart:
            response.set_session("cart_id", 0)
            response.set_store_cache("cart", {})
            return {"data": {}}
        if simple:
            res = self._to_json_simple(cart)
        else:
            res = super(CartService, self)._to_json(cart)[0]
        response.set_session("cart_id", res["id"])
        response.set_store_cache("cart", res)
        return {"data": res}

    def _get(self, create_if_not_found=True):
        """

        :return: sale.order recordset (cart)
        """
        cart = self.env["sale.order"].browse()
        if self.cart_id:
            # here we take advantage of the cache. If the cart has been
            # already loaded, no SQL query will be issued
            # an alternative would be to build a domain with the expected
            # criteria on the cart but in this case, each time the _get method
            # would have been called, a new SQL query would have been done
            cart = self.env["sale.order"].browse(self.cart_id).exists()
            # Recompute cart if needed (in case of simple service call)
            if cart:
                cart.shopinvader_recompute()
        if (
            cart.shopinvader_backend_id == self.shopinvader_backend
            and cart.typology == "cart"
            and cart.state == "draft"  # ensure that we only work on draft
        ):
            return cart
        if create_if_not_found:
            return self._create_empty_cart()
        return cart

    def _create_empty_cart(self):
        vals = self._prepare_cart()
        return self.env["sale.order"].create(vals)

    def _prepare_cart(self):
        partner = self.partner or self.shopinvader_backend.anonymous_partner_id
        vals = {
            "typology": "cart",
            "partner_id": partner.id,
            "partner_shipping_id": partner.id,
            "partner_invoice_id": partner.id,
            "shopinvader_backend_id": self.shopinvader_backend.id,
        }
        vals.update(self.env["sale.order"].play_onchanges(vals, vals.keys()))
        if self.shopinvader_backend.account_analytic_id.id:
            vals[
                "project_id"
            ] = self.shopinvader_backend.account_analytic_id.id
        if self.shopinvader_backend.pricelist_id:
            # We must always force the pricelist. In the case of sale_profile
            # the pricelist is not set on the backend
            vals.update(
                {"pricelist_id": self.shopinvader_backend.pricelist_id.id}
            )
        if self.shopinvader_backend.sequence_id:
            vals["name"] = self.shopinvader_backend.sequence_id._next()
        return vals

    def _get_onchange_trigger_fields(self):
        return ["partner_id", "partner_shipping_id", "partner_invoice_id"]

    def _check_call_onchange(self, params):
        onchange_fields = self._get_onchange_trigger_fields()
        for changed_field in params.keys():
            if changed_field in onchange_fields:
                return True
        return False

    def _get_cart_item(self, cart, params, raise_if_not_found=True):
        # We search the line based on the item id and the cart id
        # indeed the item_id information is given by the
        # end user (untrusted data) and the cart id by the
        # locomotive server (trusted data)
        item = cart.mapped("order_line").filtered(
            lambda l, id_=params["item_id"]: l.id == id_
        )
        if not item and raise_if_not_found:
            raise NotFound("No cart item found with id %s" % params["item_id"])
        return item

    def _check_existing_cart_item(self, cart, params):
        product_id = params["product_id"]
        order_lines = cart.order_line
        return order_lines.filtered(
            lambda l, p=product_id: l.product_id.id == product_id
        )

    def _prepare_cart_item(self, params, cart):
        return {
            "product_id": params["product_id"],
            "product_uom_qty": params["item_qty"],
            "order_id": cart.id,
        }

    def _load_target_email(self, record_id):
        """
        As this service doesn't have a _expose_model, we have to do it manually
        :param record_id: int
        :return: record or None
        """
        return self.env["sale.order"].browse(record_id)

    def _get_openapi_default_parameters(self):
        defaults = super(CartService, self)._get_openapi_default_parameters()
        defaults.append(
            {
                "name": "SESS-CART-ID",
                "in": "header",
                "description": "Session Cart Identifier",
                "required": False,
                "schema": {"type": "integer"},
                "style": "simple",
            }
        )
        return defaults