biosustain/optlang

View on GitHub
src/optlang/matrix_interface.py

Summary

Maintainability
F
3 days
Test Coverage
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Generic solver interface for (sparse) matrix solvers.

This interface is only supposed to be a basis to add matrix solvers to optlang. It
creates a thin layer between all optlang operations and the standard formulation of
LP, QP, and MILP problems.

To create a solver from this, you need to do the following:

1. Create a class that inherits from MatrixProblem and implement the `solve` method
   and optionally overwrite the `reset` and `still_valid` methods.

2. Create derived classes for Variable, Constraint, Objective, and Configuration.
   You don't need to overwrite anything. Those should work out of the box.

3. Create a derived class for Model and set the `ProblemClass` attribute to the class
   from (1) and the `status_map` to map the solver status to an Optlang status from
   `optlang.interface`.

For an example take a look at `optlang.osqp_interface`.
"""

import abc
import logging
import pickle
from collections import defaultdict
from typing import NamedTuple

from numpy import Infinity, array, concatenate, zeros
from scipy.sparse import csc_matrix

from optlang import available_solvers, interface, symbolics
from optlang.exceptions import SolverError
from optlang.expression_parsing import parse_optimization_expression
from optlang.symbolics import add, mul

log = logging.getLogger(__name__)

_STATUS_MAP = defaultdict(lambda: interface.UNDEFINED)

_LP_METHODS = ("auto", )

_QP_METHODS = ("auto", )

_TYPES = ("continuous", "binary", "integer")


class SparseProblem(NamedTuple):
    """A representation of the convex optimizatgion problem in standard form.

    This defines the problem in the form.

    ..math::

      \text{minimize }& \frac{1}{2}x^T\mathbf{P}x + q^Tx \\
      \text{s.t.: }& bounds_{.0} \leq \mathbf{A} \leq bounds_{.1} \\
      & vbounds_{.0} \leq x \leq vbounds_{.1} \\
      & \{x_k \in \mathbb{N}^0 \text{ if } integer_k = 1 \}

    Attributes
    ----------
    P : csc_matrix
        A semidefinite positive matrix specifying the coefficients for the quadratic
        objective.
    q : array
        A vector specfiying the linear objective coefficients.
    A : csc_matrix
        The constraint matrix.
    bounds : array
        The lower and upper bounds corresponding to the rows in `A`.
    vbounds : array
        The lower and upper bounds of the variables `x`.
    integer : array
        Indicator vector denoting if `x[k]` is integer.
    """
    P : csc_matrix
    q : array
    A : csc_matrix
    bounds : array
    vbounds : array
    integer : array


class MatrixProblem(abc.ABC):
    """A concise representation of an LP/QP/MILP problem.

    This class assumes that the problem will be pretty much immutable. This is
    a small intermediate layer based on dictionaries that is fast to modify
    but can also be converted to a matrix formulation quickly.
    """

    def __init__(self, **kwargs):
        self.variables = set()
        self.integer_vars = set()
        self.constraints = set()
        self.constraint_coefs = dict()
        self.constraint_lbs = dict()
        self.constraint_ubs = dict()
        self.variable_lbs = dict()
        self.variable_ubs = dict()
        self.obj_linear_coefs = dict()
        self.obj_quadratic_coefs = dict()
        self.primals = {}
        self.cprimals = {}
        self.duals = {}
        self.vduals = {}
        self.obj_value = None
        self.direction = -1
        self.info = None
        self.status = None
        self._solution = None
        self.default_settings()

    def default_settings(self):
        """Set the default settings of the solver.

        Returns
        -------
        dict
            The default settings for the solver.
        """
        self.settings = {
            "presolve": True,
            "lp_method": "auto",
            "qp_method": "auto",
            "time_limit": 0,
            "primal_inf_tolerance": 1e-6,
            "dual_inf_tolerance": 1e-6,
            "optimality_tolerance": 1e-6,
            "mip_tolerance": 1e-6,
            "verbose": 1,
            "threads": 1,
            "iteration_limit": 100000
        }

    def build(self, add_variable_constraints=False):
        """Build the problem instance.

        Parameters
        ----------
        add_variable_constraints : bool
            Whether to add constraints for each individual variables as a (sparse)
            block diagonal to the constraint matrix A.

        Returns
        -------
        tuple of [P, q, A, bounds, vbounds, integer] with the following components:
            - P : Quadratic objective coefficients as an upper triangular CSC matrix
            - q : Linear objective coefficients as an array.
            - A : Constraint matrix as a CSC matrix.
            - bounds : Constraint lower and upper bounds as a 2xNc matrix.
            - vbounds : Variable bounds as a 2xNv matrix.
            - integer : Binary array indicating whether variable i is integer
                        (for MIPs).

        """
        d = self.direction
        vmap = dict(zip(self.variables, range(len(self.variables))))
        nv = len(self.variables)
        cmap = dict(zip(self.constraints, range(len(self.constraints))))
        nc = len(self.constraints)
        if len(self.obj_quadratic_coefs) > 0:
            P = array(
                [
                    [vmap[vn[0]], vmap[vn[1]], coef * d * 2.0]
                    for vn, coef in self.obj_quadratic_coefs.items()
                ]
            )
            P = csc_matrix(
                (P[:, 2], (P[:, 0].astype("int64"), P[:, 1].astype("int64"))),
                shape=(nv, nv),
            )
        else:
            P = None
        q = zeros(nv)
        q[[vmap[vn] for vn in self.obj_linear_coefs]] = list(
            self.obj_linear_coefs.values()
        )
        q = q * d
        vbounds = array(
            [[self.variable_lbs[vn], self.variable_ubs[vn]] for vn in self.variables]
        )

        if len(self.constraint_coefs) > 0:
            A = array(
                [
                    [cmap[vn[0]], vmap[vn[1]], coef]
                    for vn, coef in self.constraint_coefs.items()
                ]
            )
            bounds = array(
                [
                    [self.constraint_lbs[cn], self.constraint_ubs[cn]]
                    for cn in self.constraints
                ]
            )
            if add_variable_constraints:
                Av = array([[vmap[k] + nc, vmap[k], 1.0] for k in self.variables])
                A = concatenate((A, Av))
                bounds = concatenate((bounds, vbounds))
            A = csc_matrix(
                (A[:, 2], (A[:, 0].astype("int64"), A[:, 1].astype("int64"))),
                shape=(bounds.shape[0], nv),
            )
        elif add_variable_constraints:
            Av = array([[vmap[k], vmap[k], 1.0] for k in self.variables])
            A = csc_matrix(
                (Av[:, 2], (Av[:, 0].astype("int64"), Av[:, 1].astype("int64"))),
                shape=(nv, nv),
            )
            bounds = vbounds
        else:
            A = None
            bounds = None

        integer = array([int(vn in self.integer_vars) for vn in self.variables])
        return SparseProblem(P, q, A, bounds, vbounds, integer)

    @abc.abstractmethod
    def solve(self):
        """Solve the problem.

        This will return nothing but will fill the solution attributes.

        Returns
        -------
        Nothing.
        """
        raise NotImplementedError("This needs to be overwritten by the child class.")

    def reset(self, everything=False):
        """Reset the public solver solution.

        Parameters
        ----------
        everything : bool
            If true will also clear cached solution bases or warm start solutions.

        Returns
        -------
        Nothing.
        """
        self.info = None
        self.primals = {}
        self.cprimals = {}
        self.duals = {}
        self.vduals = {}
        if everything:
            self._solution = None

    @abc.abstractmethod
    def still_valid(self, A, bounds):
        """Check if previous solutions is still feasible.

        Parameters
        ----------
        A : numpy.csc_matrix
            The constraint matrix A.
        bounds : numpy.array
            The constraint bounds.

        Returns
        -------
        bool
            Whether the current solution still fulfills the bounds.

        """
        raise NotImplementedError("This needs to be overwritten by the child class.")

    def prune(self):
        """Remove unused variables and constraints."""
        self.reset()
        self.variable_lbs = {
            k: v for k, v in self.variable_lbs.items() if k in self.variables
        }
        self.variable_ubs = {
            k: v for k, v in self.variable_ubs.items() if k in self.variables
        }
        self.constraint_coefs = {
            k: v
            for k, v in self.constraint_coefs.items()
            if k[0] in self.constraints and k[1] in self.variables
        }
        self.obj_linear_coefs = {
            k: v for k, v in self.obj_linear_coefs.items() if k in self.variables
        }
        self.obj_quadratic_coefs = {
            k: v
            for k, v in self.obj_quadratic_coefs.items()
            if k[0] in self.variables and k[1] in self.variables
        }
        self.constraint_lbs = {
            k: v for k, v in self.constraint_lbs.items() if k in self.constraints
        }
        self.constraint_ubs = {
            k: v for k, v in self.constraint_ubs.items() if k in self.constraints
        }
        self.integer_vars = {v for v in self.integer_vars if v in self.variables}

    def rename_constraint(self, old, new):
        """Rename a constraint.

        Parameters
        ----------
        old : str
            The old name of the constraint.
        new : str
            The new name of the constraint.

        Returns
        -------
        Nothing.

        """
        self.reset()
        self.constraints.remove(old)
        self.constraints.add(new)
        name_map = {k: k for k in self.constraints}
        name_map[old] = new
        self.constraint_coefs = {
            (name_map[k[0]], k[1]): v for k, v in self.constraint_coefs.items()
        }
        self.constraint_lbs[new] = self.constraint_lbs.pop(old)
        self.constraint_ubs[new] = self.constraint_ubs.pop(old)

    def rename_variable(self, old, new):
        """Rename a variable.

        Parameters
        ----------
        old : str
            The old name of the variable.
        new : str
            The new name of the variable.

        Returns
        -------
        Nothing.

        """
        self.reset()
        self.variables.remove(old)
        self.variables.add(new)
        name_map = {k: k for k in self.variables}
        name_map[old] = new
        self.constraint_coefs = {
            (k[0], name_map[k[1]]): v for k, v in self.constraint_coefs.items()
        }
        self.obj_quadratic_coefs = {
            (name_map[k[0]], name_map[k[1]]): v
            for k, v in self.obj_quadratic_coefs.items()
        }
        self.obj_linear_coefs = {
            name_map[k]: v for k, v in self.obj_linear_coefs.items()
        }
        self.variable_lbs[new] = self.variable_lbs.pop(old)
        self.variable_ubs[new] = self.variable_ubs.pop(old)
        if old in self.integer_vars:
            self.integer_vars.remove(old)
            self.integer_vars.add(new)


class Variable(interface.Variable):
    def __init__(self, name, *args, **kwargs):
        super().__init__(name=name, **kwargs)

    @interface.Variable.type.setter
    def type(self, value):
        if self.problem is not None:
            if value not in _TYPES:
                raise ValueError(
                    "This interface cannot handle variables of type '%s'. " % value
                    + "The following variable types are available: "
                    + ", ".join(_TYPES)
                )
            if value == "binary":
                self.lb = 0
                self.ub = 1
                self.problem.problem.integer_vars.add(self.name)
            elif value == "integer":
                self.problem.problem.integer_vars.add(self.name)
            elif value == "continuous":
                if self.name in self.problem.problem.integer_vars:
                    self.problem.problem.integer_vars.remove(self.name)
        super(Variable, Variable).type.fset(self, value)

    def _get_primal(self):
        return self.problem.problem.primals.get(self.name, None)

    @property
    def dual(self):
        if self.problem.is_integer:
            raise ValueError("Dual values are not well-defined for integer problems")
        if self.problem is not None:
            return self.problem.problem.vduals.get(self.name, None)
        return None

    @interface.Variable.name.setter
    def name(self, value):
        old_name = getattr(self, "name", None)
        super(Variable, Variable).name.fset(self, value)
        if getattr(self, "problem", None) is not None:
            if old_name != value:
                self.problem.problem.rename_variable(old_name, value)


class Constraint(interface.Constraint):
    _INDICATOR_CONSTRAINT_SUPPORT = False

    def __init__(self, expression, sloppy=False, **kwargs):
        super().__init__(expression=expression, sloppy=sloppy, **kwargs)

    def set_linear_coefficients(self, coefficients):
        if self.problem is not None:
            self.problem.update()
            self.problem.problem.reset()
            for var, coef in coefficients.items():
                self.problem.problem.constraint_coefs[(self.name, var.name)] = float(
                    coef
                )
        else:
            raise Exception(
                "Can't change coefficients if constraint is not associated "
                "with a model."
            )

    def get_linear_coefficients(self, variables):
        if self.problem is not None:
            self.problem.update()
            coefs = {
                v: self.problem.problem.constraint_coefs.get((self.name, v.name), 0.0)
                for v in variables
            }
            return coefs
        else:
            raise Exception(
                "Can't get coefficients from solver if constraint is not " "in a model"
            )

    def _get_expression(self):
        if self.problem is not None:
            variables = self.problem._variables
            all_coefs = self.problem.problem.constraint_coefs
            coefs = [(v, all_coefs.get((self.name, v.name), 0.0)) for v in variables]
            expression = add([mul((symbolics.Real(co), v)) for (v, co) in coefs])
            self._expression = expression
        return self._expression

    @property
    def problem(self):
        return self._problem

    @problem.setter
    def problem(self, value):
        if value is None:
            # Update expression from solver instance one last time
            self._get_expression()
        self._problem = value

    @property
    def primal(self):
        if self.problem is None:
            return None
        return self.problem.problem.cprimals.get(self.name, None)

    @property
    def dual(self):
        if self.problem.is_integer:
            raise ValueError("Dual values are not well-defined for integer problems")
        if self.problem is None:
            return None
        d = self.problem.problem.duals.get(self.name, None)
        return d

    @interface.Constraint.name.setter
    def name(self, value):
        old_name = self.name
        super(Constraint, Constraint).name.fset(self, value)
        if getattr(self, "problem", None) is not None:
            if old_name != value:
                self.problem.problem.rename_constraint(old_name, value)

    @interface.Constraint.lb.setter
    def lb(self, value):
        self._check_valid_lower_bound(value)
        if getattr(self, "problem", None) is not None:
            lb = -Infinity if value is None else float(value)
            self.problem.problem.constraint_lbs[self.name] = lb
        self._lb = value

    @interface.Constraint.ub.setter
    def ub(self, value):
        self._check_valid_upper_bound(value)
        if getattr(self, "problem", None) is not None:
            ub = Infinity if value is None else float(value)
            self.problem.problem.constraint_ubs[self.name] = ub
        self._ub = value

    def __iadd__(self, other):
        # if self.problem is not None:
        #     self.problem._add_to_constraint(self.index, other)
        if self.problem is not None:
            problem_reference = self.problem
            self.problem._remove_constraint(self)
            super().__iadd__(other)
            problem_reference._add_constraint(self, sloppy=False)
        else:
            super().__iadd__(other)
        return self


class Objective(interface.Objective):
    def __init__(self, expression, sloppy=False, **kwargs):
        super().__init__(expression=expression, sloppy=sloppy, **kwargs)
        self._expression_expired = False
        if not (sloppy or self.is_Linear or self.is_Quadratic):
            raise ValueError(
                "This interface only supports linear and quadratic objectives.")

    @property
    def value(self):
        if getattr(self, "problem", None) is None:
            return None
        if self.problem.problem.obj_value is None:
            return None
        return self.problem.problem.obj_value + self.problem._objective_offset

    @interface.Objective.direction.setter
    def direction(self, value):
        super(Objective, Objective).direction.__set__(self, value)
        if value == "min":
            self.problem.problem.direction = 1
        else:
            self.problem.problem.direction = -1

    def _get_expression(self):
        if (
            self.problem is not None
            and self._expression_expired
            and len(self.problem._variables) > 0
        ):
            model = self.problem
            vars = model._variables
            expression = add(
                [
                    coef * vars[vn]
                    for vn, coef in model.problem.obj_linear_coefs.items()
                ]
            )
            q_ex = add(
                [
                    coef * vars[vn[0]] * vars[vn[1]]
                    for vn, coef in model.problem.obj_quadratic_coefs.items()
                ]
            )
            expression += q_ex
            self._expression = expression
            self._expression_expired = False
        return self._expression

    def set_linear_coefficients(self, coefficients):
        if self.problem is not None:
            self.problem.update()
            for v, coef in coefficients.items():
                self.problem.problem.obj_linear_coefs[v.name] = float(coef)
            self._expression_expired = True
        else:
            raise Exception(
                "Can't change coefficients if objective is not associated "
                "with a model."
            )

    def get_linear_coefficients(self, variables):
        if self.problem is not None:
            self.problem.update()
            coefs = {
                v: self.problem.problem.obj_linear_coefs.get(v.name, 0.0)
                for v in variables
            }
            return coefs
        else:
            raise Exception(
                "Can't get coefficients from solver if objective " "is not in a model"
            )


class Configuration(interface.MathematicalProgrammingConfiguration):
    lp_methods = _LP_METHODS
    qp_methods = _QP_METHODS

    def __init__(
        self,
        lp_method="auto",
        presolve=False,
        verbosity=0,
        timeout=None,
        qp_method="auto",
        **kwargs
    ):
        super().__init__(**kwargs)
        self.lp_method = lp_method
        self.presolve = presolve
        self.verbosity = verbosity
        self.timeout = timeout
        self.qp_method = qp_method
        if "tolerances" in kwargs:
            for key, val in kwargs["tolerances"].items():
                if key in self._tolerance_functions():
                    setattr(self.tolerances, key, val)

    @property
    def lp_method(self):
        """The algorithm used to solve LP problems."""
        return self.problem.problem.settings["lp_method"]

    @lp_method.setter
    def lp_method(self, value):
        if value not in self.lp_methods:
            raise ValueError(
                "LP Method %s is not valid (choose one of: %s)"
                % (value, ", ".join(self.lp_methods))
            )
        if getattr(self, "problem", None) is not None:
            self.problem.problem.settings["lp_method"] = value
        self._lp_method = value

    def _set_presolve(self, value):
        if getattr(self, "problem", None) is not None:
            if value in [True, False, "auto"]:
                self.problem.problem.settings["presolve"] = value
            else:
                raise ValueError(
                    str(value) + " is not a valid presolve parameter. Must be True, "
                    "False or 'auto'."
                )

    @property
    def presolve(self):
        return self.problem.problem.settings["presolve"]

    @presolve.setter
    def presolve(self, value):
        self._set_presolve(value)
        self._presolve = value

    @property
    def verbosity(self):
        return self._verbosity

    @verbosity.setter
    def verbosity(self, value):
        if getattr(self, "problem", None) is not None:
            self.problem.problem.settings["verbose"] = int(value)
        self._verbosity = value

    @property
    def timeout(self):
        return self.problem.problem.settings["time_limit"]

    @timeout.setter
    def timeout(self, value):
        if getattr(self, "problem", None) is not None:
            if value is None:
                self.problem.problem.settings["time_limit"] = 0
            else:
                self.problem.problem.settings["time_limit"] = value

    def __getstate__(self):
        return {
            "presolve": self.presolve,
            "timeout": self.timeout,
            "verbosity": self.verbosity,
            "tolerances": {
                "feasibility": self.tolerances.feasibility,
                "optimality": self.tolerances.optimality,
                "integrality": self.tolerances.integrality,
            },
        }

    def __setstate__(self, state):
        for key, val in state.items():
            if key != "tolerances":
                setattr(self, key, val)
        for key, val in state["tolerances"].items():
            if key in self._tolerance_functions():
                setattr(self.tolerances, key, val)

    @property
    def qp_method(self):
        """Change the algorithm used to optimize QP problems."""
        return self.problem.problem.settings["qp_method"]

    @qp_method.setter
    def qp_method(self, value):
        if value not in _QP_METHODS:
            raise ValueError(
                "%s is not a valid qp_method. Choose between %s"
                % (value, str(self.qp_methods))
            )
        if getattr(self, "problem", None) is not None:
            self.problem.problem.settings["qp_method"] = value
        self._qp_method = value

    def _get_feasibility(self):
        return self.problem.problem.settings["primal_inf_tolerance"]

    def _set_feasibility(self, value):
        self.problem.problem.settings["primal_inf_tolerance"] = value
        self.problem.problem.settings["dual_inf_tolerance"] = value

    def _get_integrality(self):
        return self.problem.problem.settings["mip_tolerance"]

    def _set_integrality(self, value):
        self.problem.problem.settings["mip_tolerance"] = value

    def _get_optimality(self):
        return self.problem.problem.settings["optimality_tolerance"]

    def _set_optimality(self, value):
        self.problem.problem.settings["optimality_tolerance"] = value

    def _tolerance_functions(self):
        return {
            "feasibility": (self._get_feasibility, self._set_feasibility),
            "optimality": (self._get_optimality, self._set_optimality),
            "integrality": (self._get_integrality, self._set_integrality),
        }


class Model(interface.Model):
    ProblemClass = MatrixProblem
    status_map = _STATUS_MAP

    def _initialize_problem(self):
        self.problem = self.ProblemClass()

    def _initialize_model_from_problem(self, problem, vc_mapping=None, offset=0):
        if not isinstance(problem, self.ProblemClass):
            raise TypeError("Provided problem is not a valid OSQP model.")
        self.problem = problem
        for name in self.problem.variables:
            var = Variable(
                name,
                lb=self.problem.variable_lbs[name],
                ub=self.problem.variable_ubs[name],
                problem=self,
            )
            super()._add_variables([var])

        for name in self.problem.constraints:
            # Since constraint expressions are lazily retrieved from the
            # solver they don't have to be built here
            lhs = symbolics.Integer(0)
            constr = Constraint(
                lhs,
                lb=self.problem.constraint_lbs[name],
                ub=self.problem.constraint_ubs[name],
                name=name,
                problem=self,
            )

            super()._add_constraints([constr], sloppy=True)

        if vc_mapping is None:
            for constr in self.constraints:
                name = constr.name
                for variable in constr.variables:
                    try:
                        self._variables_to_constraints_mapping[variable.name].add(name)
                    except KeyError:
                        self._variables_to_constraints_mapping[variable.name] = set(
                            [name]
                        )
        else:
            self._variables_to_constraints_mapping = vc_mapping

        linear_expression = add(
            [
                coef * self._variables[vn]
                for vn, coef in self.problem.obj_linear_coefs.items()
            ]
        )
        quadratic_expression = add(
            [
                coef * self._variables[vn[0]] * self._variables[vn[1]]
                for vn, coef in self.problem.obj_quadratic_coefs.items()
            ]
        )

        self._objective_offset = offset
        self._objective = Objective(
            linear_expression + quadratic_expression + offset,
            problem=self,
            direction={-1: "max", 1: "min"}[self.problem.direction],
            name="matrix_objective",
        )

    @property
    def objective(self):
        return self._objective

    @objective.setter
    def objective(self, value):
        value.problem = None
        if self._objective is not None:  # Reset previous objective
            self.problem.obj_linear_coefs = dict()
            self.problem.obj_quadratic_coefs = dict()
        super(Model, self.__class__).objective.fset(self, value)
        self.update()
        expression = self._objective._expression
        (
            offset,
            linear_coefficients,
            quadratic_coeffients,
        ) = parse_optimization_expression(value, quadratic=True, expression=expression)
        self._objective_offset = offset
        if linear_coefficients:
            self.problem.obj_linear_coefs = {
                v.name: float(c) for v, c in linear_coefficients.items()
            }

        for key, coef in quadratic_coeffients.items():
            if len(key) == 1:
                var = next(iter(key))
                self.problem.obj_quadratic_coefs[(var.name, var.name)] = float(coef)
            else:
                var1, var2 = key
                self.problem.obj_quadratic_coefs[(var1.name, var2.name)] = 0.5 * float(
                    coef
                )
                self.problem.obj_quadratic_coefs[(var2.name, var1.name)] = 0.5 * float(
                    coef
                )

        self._set_objective_direction(value.direction)
        value.problem = self

    def _set_objective_direction(self, direction):
        self.problem.direction = {"min": 1, "max": -1}[direction]

    def _get_primal_values(self):
        if len(self.problem.primals) == 0:
            raise SolverError("The problem has not been solved yet!")
        primal_values = [self.problem.primals[v.name] for v in self._variables]
        return primal_values

    def _get_reduced_costs(self):
        if self.is_integer:
            raise ValueError("Dual values are not well-defined for integer problems")
        if len(self.problem.vduals) == 0:
            raise SolverError("The problem has not been solved yet!")
        reduced_costs = [self.problem.vduals[v.name] for v in self._variables]
        return reduced_costs

    def _get_constraint_values(self):
        if len(self.problem.cprimals) == 0:
            raise SolverError("The problem has not been solved yet!")
        constraint_primals = [self.problem.cprimals[c.name] for c in self._constraints]
        return constraint_primals

    def _get_shadow_prices(self):
        if self.is_integer:
            raise ValueError("Dual values are not well-defined for integer problems")
        if len(self.problem.duals) == 0:
            raise SolverError("The problem has not been solved yet!")
        dual_values = [self.problem.duals[c.name] for c in self._constraints]
        return dual_values

    @property
    def is_integer(self):
        return len(self.problem.integer_vars) > 0

    def _optimize(self):
        self.update()
        self.problem.solve()
        prior_status = self.problem.status
        self._original_status = prior_status
        status = self.status_map[prior_status]
        return status

    def optimize(self):
        self.update()
        status = self._optimize()
        self._status = status
        return status

    def _set_variable_bounds_on_problem(self, var_lb, var_ub):
        self.problem.reset()
        for var, val in var_lb:
            lb = -Infinity if val is None else float(val)
            self.problem.variable_lbs[var.name] = lb
        for var, val in var_ub:
            ub = Infinity if val is None else val
            self.problem.variable_ubs[var.name] = float(ub)

    def _add_variables(self, variables):
        super()._add_variables(variables)
        self.problem.reset()
        for variable in variables:
            lb = -Infinity if variable.lb is None else float(variable.lb)
            ub = Infinity if variable.ub is None else float(variable.ub)
            self.problem.variables.add(variable.name)
            self.problem.variable_lbs[variable.name] = lb
            self.problem.variable_ubs[variable.name] = ub
            if variable.type in ("binary", "integer"):
                self.problem.integer_vars.add(variable.name)
            variable.problem = self

    def _remove_variables(self, variables):
        # Not calling parent method to avoid expensive variable removal
        # from sympy expressions
        if self.objective is not None:
            self.objective._expression = self.objective.expression.xreplace(
                {var: 0 for var in variables}
            )
        for variable in variables:
            del self._variables_to_constraints_mapping[variable.name]
            variable.problem = None
            del self._variables[variable.name]
            self.problem.variables.remove(variable.name)
        self.problem.prune()

    def _add_constraints(self, constraints, sloppy=False):
        super()._add_constraints(constraints, sloppy=sloppy)
        self.problem.reset()
        for constraint in constraints:
            # This needs to be done in order to not trigger constraint._get_expression()
            constraint._problem = None
            if constraint.is_Linear:
                _, coeff_dict, _ = parse_optimization_expression(constraint)
                lb = -Infinity if constraint.lb is None else float(constraint.lb)
                ub = Infinity if constraint.ub is None else float(constraint.ub)
                self.problem.constraints.add(constraint.name)
                self.problem.constraint_coefs.update(
                    {
                        (constraint.name, v.name): float(co)
                        for v, co in coeff_dict.items()
                    }
                )
                self.problem.constraint_lbs[constraint.name] = lb
                self.problem.constraint_ubs[constraint.name] = ub
                constraint.problem = self
            elif constraint.is_Quadratic:
                raise NotImplementedError(
                    "Quadratic constraints (like %s) are not supported "
                    "in this interface yet." % constraint
                )
            else:
                raise ValueError(
                    "This interface only supports linear or quadratic constraints. "
                    "%s is neither linear nor quadratic." % constraint
                )

    def _remove_constraints(self, constraints):
        super()._remove_constraints(constraints)
        for constraint in constraints:
            self.problem.constraints.remove(constraint.name)
        self.problem.prune()

    def _get_variable_indices(self, names):
        vmap = dict(zip(self.variables, range(len(self.variables))))
        return [vmap[n] for n in names]

    def __setstate__(self, d):
        self.__init__()
        problem = pickle.loads(d["problem"])
        # workaround to conserve the order
        problem.variables = d["variables"]
        problem.constraints = d["constraints"]
        self._initialize_model_from_problem(
            problem, vc_mapping=d["v_to_c"], offset=d["offset"]
        )
        problem.variables = set(problem.variables)
        problem.constraints = set(problem.constraints)
        self.configuration = self.interface.Configuration()
        self.configuration.problem = self
        self.configuration.__setstate__(d["config"])

    def __getstate__(self):
        self.problem.reset()
        self.update()
        return {
            "problem": pickle.dumps(self.problem),
            "variables": [v.name for v in self._variables],
            "constraints": [c.name for c in self._constraints],
            "v_to_c": self._variables_to_constraints_mapping,
            "config": self.configuration.__getstate__(),
            "offset": getattr(self, "_objective_offset", 0.0),
        }

    @classmethod
    def from_lp(self, lp_problem_str):
        """Read a model from an LP file.

        The solver may not have an integrated LP reader so it will either use
        cplex or glpk to read the model. This means that QP problems will
        currently require cplex to be read :(
        """
        if available_solvers["CPLEX"]:
            from optlang import cplex_interface

            mod = cplex_interface.Model.from_lp(lp_problem_str)
            mod.configuration.lp_method = "auto"
            mod.configuration.qp_method = "auto"
            return super().clone(mod)
        else:
            from optlang import glpk_interface

            mod = glpk_interface.Model.from_lp(lp_problem_str)
            mod.configuration.lp_method = "auto"
            return super().clone(mod)