KarrLab/wc_sim

View on GitHub
wc_sim/submodels/dfba.py

Summary

Maintainability
D
1 day
Test Coverage
A
98%
""" A submodel that uses Dynamic Flux Balance Analysis (dFBA) to model a set of reactions

:Author: Yin Hoon Chew <yinhoon.chew@mssm.edu>
:Author: Arthur Goldberg <Arthur.Goldberg@mssm.edu>
:Date: 2020-07-29
:Copyright: 2016-2020, Karr Lab
:License: MIT
"""

import collections
import conv_opt
import copy
import enum
import itertools
import math
import scipy.constants
import wc_lang

from wc_sim import message_types
from wc_sim.config import core as config_core_multialgorithm
from wc_sim.multialgorithm_errors import DynamicMultialgorithmError, MultialgorithmError
from wc_sim.submodels.dynamic_submodel import ContinuousTimeSubmodel


class DfbaSubmodel(ContinuousTimeSubmodel):
    """ Use dynamic Flux Balance Analysis to predict the dynamics of chemical species in a container

    Attributes:
        DFBA_BOUND_SCALE_FACTOR (:obj:`float`): scaling factor for the bounds on reactions and
            constraints that avoid negative species populations; the default value is 1.
        DFBA_COEF_SCALE_FACTOR (:obj:`float`): scaling factor for the stoichiometric coefficients in 
            dFBA objective reactions; the default value is 1.
        SOLVER (:obj:`str`): name of the selected solver in conv_opt, the default value is 'cplex'
        PRESOLVE (:obj:`str`): presolve mode in `conv_opt` ('auto', 'on', 'off'), the default
            value is 'on'
        SOLVER_OPTIONS (:obj:`dict`): parameters for the solver; default values are provided for
            'cplex'
        OPTIMIZATION_TYPE (:obj:`str`): direction of optimization ('maximize', 'max',
            'minimize', 'min'); the default value is 'maximize'
        FLUX_BOUNDS_VOLUMETRIC_COMPARTMENT_ID (:obj:`str`): id of the compartment to which the
            measured flux bounds are normalized, the default is the whole-cell
        VERBOSITY (:obj:`str`): output verbosity of the solver
        NEG_POP_CONSTRAINTS (:obj:`boolean`): whether the constraints that
            prevent negative species over the next time-step should be used; defaults to :obj:`True`
        dfba_solver_options (:obj:`dict` of :obj:`str`: :obj:`any`): options for solving dFBA submodel
        reaction_fluxes (:obj:`dict` of :obj:`str`: :obj:`float`): reaction fluxes data structure,
            which is pre-allocated
        dfba_obj_expr (:obj:`ParsedExpression`): an analyzed and validated dFBA objective expression
        exchange_rxns (:obj:`set` of :obj:`wc_lang.Reactions`): set of exchange and demand reactions
        _multi_reaction_constraints (:obj:`dict` of :obj:`str`: :obj:`conv_opt.Constraint`): a map from
            constraint id to constraints that avoid negative species populations
            in `self._conv_model.constraints`
        _conv_model (:obj:`conv_opt.Model`): linear programming model in `conv_opt` format
        _conv_variables (:obj:`dict` of :obj:`str`: :obj:`conv_opt.Variable`): a dictionary mapping
            reaction IDs to their associated `conv_opt.Variable` objects
        _conv_metabolite_matrices (:obj:`dict` of :obj:`str`: :obj:`list`): a dictionary mapping metabolite
            species IDs to lists of :obj:`conv_opt.LinearTerm` objects; each :obj:`conv_opt.LinearTerm`
            associates a reaction that the species participates in with the species' stoichiometry
            in the reaction
        _dfba_obj_reactions (:obj:`dict` of :obj:`str`: :obj:`wc_lang.DfbaObjReaction`): all
            :obj:`wc_lang.DfbaObjReaction`\ s used by the :obj:`self.dfba_obj_expr`
        _dfba_obj_species (:obj:`list` of :obj:`wc_lang.DfbaObjSpecies:`): all species in
            :obj:`DfbaObjReaction`\ s used by `dfba_obj_expr`, keyed by their IDs
        _reaction_bounds (:obj:`dict` of :obj:`str`: :obj:`tuple`): a dictionary that maps reaction IDs
            to (minimum bound, maximum bound) tuples
        _optimal_obj_func_value (:obj:`float`): the value of objective function returned by the solver
    """
    # default options
    DFBA_BOUND_SCALE_FACTOR = 1.
    DFBA_COEF_SCALE_FACTOR = 1.
    SOLVER = 'cplex'
    PRESOLVE = 'on'
    SOLVER_OPTIONS = {
        'cplex': {
            'parameters': {
                'emphasis': {
                    'numerical': 1,
                },
                'read': {
                    'scale': 1,
                },
            },
        },
    }
    OPTIMIZATION_TYPE = 'maximize'
    VERBOSITY = 'off'
    FLUX_BOUNDS_VOLUMETRIC_COMPARTMENT_ID = 'wc'
    NEG_POP_CONSTRAINTS = True

    # register the message types sent by DfbaSubmodel
    messages_sent = [message_types.RunFba]

    # register 'handle_RunFba_msg' to handle RunFba events
    event_handlers = [(message_types.RunFba, 'handle_RunFba_msg')]

    time_step_message = message_types.RunFba

    def __init__(self, id, dynamic_model, reactions, species, dynamic_compartments,
                 local_species_population, dfba_time_step, options=None):
        """ Initialize a dFBA submodel instance

        Args:
            id (:obj:`str`): unique id of this dFBA submodel
            dynamic_model (:obj: `DynamicModel`): the simulation's central coordinator
            reactions (:obj:`list` of `wc_lang.Reaction`): the reactions modeled by this dFBA submodel
            species (:obj:`list` of `wc_lang.Species`): the species that participate in the reactions
                modeled by this dFBA submodel
            dynamic_compartments (:obj: `dict`): `DynamicCompartment`s, keyed by id, that contain
                species which participate in reactions that this dFBA submodel models, including
                adjacent compartments used by its transfer reactions
            local_species_population (:obj:`LocalSpeciesPopulation`): the store that maintains this
                dFBA submodel's species population
            dfba_time_step (:obj:`float`): time interval between FBA optimization
            options (:obj:`dict`, optional): dFBA submodel options

        Raises:
            :obj:`MultiAlgorithmError`: if the `dynamic_dfba_objective` cannot be found,
                or if some reactions are reversible,
                or if the provided 'dfba_bound_scale_factor' in options does not have a positive value,
                or if the provided 'dfba_coef_scale_factor' in options does not have a positive value,
                or if the provided 'solver' in options is not a valid value,
                or if the provided 'presolve' in options is not a valid value,
                or if the 'solver' value provided in the 'solver_options' in options is not the same
                as the name of the selected `conv_opt.Solver`,
                or if the provided 'flux_bounds_volumetric_compartment_id' is not a valid
                compartment ID in the model
        """
        super().__init__(id, dynamic_model, reactions, species, dynamic_compartments,
                         local_species_population, dfba_time_step, options)

        self.dfba_solver_options = {
            'dfba_bound_scale_factor': self.DFBA_BOUND_SCALE_FACTOR,
            'dfba_coef_scale_factor': self.DFBA_COEF_SCALE_FACTOR,
            'solver': self.SOLVER,
            'presolve': self.PRESOLVE,
            'solver_options': self.SOLVER_OPTIONS,
            'optimization_type': self.OPTIMIZATION_TYPE,
            'flux_bounds_volumetric_compartment_id': self.FLUX_BOUNDS_VOLUMETRIC_COMPARTMENT_ID,
            'verbosity': self.VERBOSITY,
            'negative_pop_constraints': self.NEG_POP_CONSTRAINTS
        }
        if options is not None:
            if 'dfba_bound_scale_factor' in options:
                if options['dfba_bound_scale_factor'] <= 0.:
                    raise MultialgorithmError(f"DfbaSubmodel {self.id}: dfba_bound_scale_factor must"
                        f" be larger than zero but is {options['dfba_bound_scale_factor']}")
                self.dfba_solver_options['dfba_bound_scale_factor'] = options['dfba_bound_scale_factor']
            if 'dfba_coef_scale_factor' in options:
                if options['dfba_coef_scale_factor'] <= 0.:
                    raise MultialgorithmError(f"DfbaSubmodel {self.id}: dfba_coef_scale_factor must"
                        f" be larger than zero but is {options['dfba_coef_scale_factor']}")
                self.dfba_solver_options['dfba_coef_scale_factor'] = options['dfba_coef_scale_factor']
            if 'solver' in options:
                if options['solver'] not in conv_opt.Solver.__members__:
                    raise MultialgorithmError(f"DfbaSubmodel {self.id}: {options['solver']}"
                        f" is not a valid Solver")
                self.dfba_solver_options['solver'] = options['solver']
            if 'presolve' in options:
                if options['presolve'] not in conv_opt.Presolve.__members__:
                    raise MultialgorithmError(f"DfbaSubmodel {self.id}: {options['presolve']}"
                        f" is not a valid Presolve option")
                self.dfba_solver_options['presolve'] = options['presolve']
            if 'solver_options' in options:
                if self.dfba_solver_options['solver'] not in options['solver_options']:
                    raise MultialgorithmError(f"DfbaSubmodel {self.id}: the solver key in"
                        f" solver_options is not the same as the selected solver"
                        f" '{self.dfba_solver_options['solver']}'")
                self.dfba_solver_options['solver_options'] = options['solver_options']
            if 'optimization_type' in options:
                if options['optimization_type'] not in conv_opt.ObjectiveDirection.__members__:
                    raise MultialgorithmError(f"DfbaSubmodel {self.id}: the optimization_type in"
                        f" options can only take 'maximize', 'max', 'minimize' or 'min' as value but is"
                        f" '{options['optimization_type']}'")
                self.dfba_solver_options['optimization_type'] = options['optimization_type']
            if 'flux_bounds_volumetric_compartment_id' in options:
                comp_id = options['flux_bounds_volumetric_compartment_id']
                if comp_id != self.FLUX_BOUNDS_VOLUMETRIC_COMPARTMENT_ID:
                    try:
                        flux_bound_comp = self.dynamic_model.dynamic_compartments[comp_id]
                    except:
                        raise MultialgorithmError(f"DfbaSubmodel {self.id}: the user-provided"
                            f" flux_bounds_volumetric_compartment_id '{comp_id}' is not the ID"
                            f" of a compartment in the model")
                self.dfba_solver_options['flux_bounds_volumetric_compartment_id'] = comp_id
            if 'verbosity' in options:
                if options['verbosity'] not in conv_opt.Verbosity.__members__:
                    raise MultialgorithmError(f"DfbaSubmodel {self.id}: the verbosity in"
                        f" options must be one of {set(conv_opt.Verbosity.__members__.keys())} but is"
                        f" '{options['verbosity']}'")
                self.dfba_solver_options['verbosity'] = options['verbosity']
            if 'negative_pop_constraints' in options:
                self.dfba_solver_options['negative_pop_constraints'] = options['negative_pop_constraints']

        # ensure that all reactions are irreversible
        errors = []
        for rxn in self.reactions:
            if rxn.reversible:
                errors.append(rxn.id)
        if errors:
            rxn_ids = ', '.join(errors)
            raise MultialgorithmError(f"DfbaSubmodel {self.id}: reactions are reversible: {rxn_ids}")

        # determine set of exchange reactions, each of which has only one participant
        # TODO (APG): should we use the exchange reaction pattern from the config instead?
        self.exchange_rxns = set()
        for rxn in self.reactions:
            if len(rxn.participants) == 1:
                self.exchange_rxns.add(rxn)

        # get the dfba objective's expression
        dfba_objective_id = f'dfba-obj-{id}'
        if dfba_objective_id not in dynamic_model.dynamic_dfba_objectives: # pragma: no cover
            raise MultialgorithmError(f"DfbaSubmodel '{self.id}': cannot find dynamic_dfba_objective "
                                      f"{dfba_objective_id}")
        self.dfba_obj_expr = dynamic_model.dynamic_dfba_objectives[dfba_objective_id].wc_lang_expression

        # collect all wc_lang.DfbaObjReactions used by dfba_obj_expr
        self._dfba_obj_reactions = {}
        for rxn_cls in self.dfba_obj_expr.related_objects:
            if issubclass(rxn_cls, wc_lang.DfbaObjReaction):
                for rxn in self.dfba_obj_expr.related_objects[rxn_cls].values():
                    self._dfba_obj_reactions[rxn.id] = rxn

        # ensure that the dfba objective doesn't contain exchange rxns
        errors = []
        for rxn_cls in self.dfba_obj_expr.related_objects:
            for rxn in self.dfba_obj_expr.related_objects[rxn_cls].values():
                if rxn in self.exchange_rxns:
                    errors.append(rxn.id)
        if errors:
            rxns = ', '.join(errors)
            raise MultialgorithmError(f"the dfba objective '{dfba_objective_id}' "
                                      f"uses exchange reactions: {rxns}")

        # TODO (APG): warn if species in dFBA objective reactions aren't used by metabolic reactions

        # log initialization data
        self.log_with_time("init: id: {}".format(id))
        self.log_with_time("init: time_step: {}".format(str(dfba_time_step)))

        self.set_up_dfba_submodel()
        self.set_up_optimizations()

        self._model_dumps = 0

    def set_up_dfba_submodel(self):
        """ Set up a dFBA submodel, by converting to a linear programming matrix

        Raises:
            :obj:`MultiAlgorithmError`: if the ids in :obj:`DfbaObjReaction`\ s and :obj:`Reactions`\ s intersect
        """
        self.set_up_continuous_time_submodel()

        ### dFBA specific code ###

        # raise an error if the ids in DfbaObjReactions and Reactions intersect
        reaction_ids = set([rxn.id for rxn in self.reactions])
        dfba_obj_reaction_ids = set(self._dfba_obj_reactions)
        if reaction_ids & dfba_obj_reaction_ids:
            raise MultialgorithmError(f"in model {self.dynamic_model.id} the ids in DfbaObjReactions "
                                      f"and Reactions intersect: {reaction_ids & dfba_obj_reaction_ids}")
        # TODO (APG): later: support colliding ids by creating unique ids prefixed by class

        # Formulate the optimization problem using the conv_opt package
        self._conv_model = conv_opt.Model(name='model')
        self._conv_variables = {}
        self._conv_metabolite_matrices = collections.defaultdict(list)
        for rxn in self.reactions:
            self._conv_variables[rxn.id] = conv_opt.Variable(
                name=rxn.id, type=conv_opt.VariableType.continuous)
            self._conv_model.variables.append(self._conv_variables[rxn.id])
            for part in rxn.participants:
                self._conv_metabolite_matrices[part.species.id].append(
                    conv_opt.LinearTerm(self._conv_variables[rxn.id], part.coefficient))

        self._dfba_obj_species = []
        for rxn in self._dfba_obj_reactions.values():
            self._conv_variables[rxn.id] = conv_opt.Variable(
                name=rxn.id, type=conv_opt.VariableType.continuous, lower_bound=0.)
            self._conv_model.variables.append(self._conv_variables[rxn.id])
            for part in rxn.dfba_obj_species:
                self._dfba_obj_species.append(part)
                self._conv_metabolite_matrices[part.species.id].append(
                    conv_opt.LinearTerm(self._conv_variables[rxn.id], part.value))

        # Set up the objective function
        errors = []
        dfba_obj_expr_objs = self.dfba_obj_expr.lin_coeffs
        for rxn_cls in dfba_obj_expr_objs.values():
            for rxn, lin_coef in rxn_cls.items():
                if math.isnan(lin_coef):
                    errors.append(rxn.id)
                self._conv_model.objective_terms.append(conv_opt.LinearTerm(
                    self._conv_variables[rxn.id], lin_coef))
        if errors:
            rxn_ids = ', '.join(errors)
            raise MultialgorithmError(f"objective function not linear: reaction(s) have NaN coefficient(s) "
                                      f"in its expression: {rxn_ids}")

        self._conv_model.objective_direction = \
            conv_opt.ObjectiveDirection[self.dfba_solver_options['optimization_type']]

        self._multi_reaction_constraints = self.initialize_neg_species_pop_constraints()

    def get_conv_model(self):
        """ Get the `conv_opt` model

        Returns:
            :obj:`conv_opt.Model`: the linear programming model in `conv_opt` format
        """
        return self._conv_model

    @staticmethod
    def _get_species_and_stoichiometry(reaction):
        """ Get a reaction's species and their net stoichiometries

        Handles both :obj:`wc_lang.Reaction` and :obj:`wc_lang.DfbaObjReaction` reactions.

        Args:
            reaction (:obj:`wc_lang.Reaction` or :obj:`wc_lang.DfbaObjReaction`): a reaction

        Returns:
            :obj:`dict`: map from species to net stoichiometry for each species in `reaction`,
            with entries which have net stoichiometry of 0. removed
        """
        # get net coefficients, since a species can participate in both sides of a reaction
        species_net_coefficients = collections.defaultdict(float)
        if isinstance(reaction, wc_lang.Reaction):
            for part in reaction.participants:
                species_net_coefficients[part.species] += part.coefficient

        # pragma: no cover false branch in this elif; ignore missing branch coverage report
        elif isinstance(reaction, wc_lang.DfbaObjReaction):
            for part in reaction.dfba_obj_species:
                species_net_coefficients[part.species] += part.value

        species_to_rm = [s for s in species_net_coefficients if species_net_coefficients[s] == 0.]
        for species in species_to_rm:
            del species_net_coefficients[species]

        return species_net_coefficients

    NEG_POP_CONSTRAINT_PREFIX = 'neg_pop_constr'
    NEG_POP_CONSTRAINT_SEP = '__'
    LB = '__LB__'
    RB = '__RB__'

    @staticmethod
    def species_id_without_brkts(species_id):
        """ Replace brackets in a species id with codes

        Args:
            species_id (:obj:`str`): WC Lang species id

        Returns:
            :obj:`str`: species id with brackets replaced by codes

        Raises:
            :obj:`MultiAlgorithmError`: if `species_id` isn't a properly formatted :obj:`wc_lang.Species` id,
                or has bracket codes
        """
        try:
            wc_lang.Species.parse_id(species_id)
        except ValueError as e:
            raise MultialgorithmError(e)
        if DfbaSubmodel.LB in species_id or DfbaSubmodel.RB in species_id:
            raise MultialgorithmError(f"species_id '{species_id}' already has bracket code(s)")
        return species_id.replace('[', DfbaSubmodel.LB).replace(']', DfbaSubmodel.RB)

    @staticmethod
    def species_id_with_brkts(species_id):
        """ Replace codes in a species id with brackets

        Args:
            species_id (:obj:`str`): WC Lang species id with brackets replaced by codes

        Returns:
            :obj:`str`: standard WC Lang species id

        Raises:
            :obj:`MultiAlgorithmError`: if `species_id` doesn't have bracket codes or has brackets
        """
        if (DfbaSubmodel.LB not in species_id or DfbaSubmodel.RB not in species_id or
            '[' in species_id or ']' in species_id):
            raise MultialgorithmError(f"invalid species_id with bracket codes '{species_id}' it should be "
                                      f"species_type_id{DfbaSubmodel.LB}compartment_id{DfbaSubmodel.RB}")
        return species_id.replace(DfbaSubmodel.LB, '[').replace(DfbaSubmodel.RB, ']')

    @staticmethod
    def gen_neg_species_pop_constraint_id(species_id):
        """ Generate a negative species population constraint id

        Args:
            species_id (:obj:`str`): id of species being constrained

        Returns:
            :obj:`str`: a negative species population constraint id
        """
        return DfbaSubmodel.NEG_POP_CONSTRAINT_PREFIX + DfbaSubmodel.NEG_POP_CONSTRAINT_SEP + species_id

    @staticmethod
    def parse_neg_species_pop_constraint_id(neg_species_pop_constraint_id):
        """ Parse a negative species population constraint id

        Args:
            neg_species_pop_constraint_id (:obj:`str`): a negative species population constraint id

        Returns:
            :obj:`str`: id of species being constrained
        """
        loc = len(DfbaSubmodel.NEG_POP_CONSTRAINT_PREFIX) + len(DfbaSubmodel.NEG_POP_CONSTRAINT_SEP)
        return neg_species_pop_constraint_id[loc:]

    def initialize_neg_species_pop_constraints(self):
        """ Make constraints that prevent species populations from going negative

        A separate constraint is made for each species. These constraints prevent the species population
        from declining so quickly that it becomes negative in the next time step.
        Call this when a dFBA submodel is initialized.

        Do nothing if negative species population constraints are not being used.

        Returns:
            :obj:`dict` of :obj:`str`: :obj:`conv_opt.Constraint`: a map from constraint id to
            constraints stored in `self._conv_model.constraints`
        """

        if not self.dfba_solver_options['negative_pop_constraints']:
            return {}

        # make map from species to pseudo-reactions that use the species
        reactions_using_species = collections.defaultdict(set)
        # add dFBA objective reactions and their species
        for rxn in self._dfba_obj_reactions.values():
            for species in self._get_species_and_stoichiometry(rxn):
                reactions_using_species[species].add(rxn)
        # add exchange reactions and their species
        for rxn in self.exchange_rxns:
            for species in self._get_species_and_stoichiometry(rxn):
                reactions_using_species[species].add(rxn)

        multi_reaction_constraints = {}
        for species, rxns in reactions_using_species.items():
            # create an expression for species' rate of change, in molecules / sec
            # ds/dt for species s is sum(coef * flux) for all rxns that use s

            constr_expr = []
            for rxn in rxns:
                for rxn_species, net_coef in self._get_species_and_stoichiometry(rxn).items():
                    if rxn_species == species:
                        constr_expr.append(conv_opt.LinearTerm(self._conv_variables[rxn.id], net_coef))

            # optimization: only create a Constraint when the species can be consumed,
            # which can only occur when some of its net coefficients are negative
            if any([linear_term.coefficient < 0 for linear_term in constr_expr]):
                # before solving FBA bound_neg_species_pop_constraints()  will set lower_bound of constraint
                # to the rate at which the amount of species goes to 0 in the next time step
                # constraint keeps the amount of species >= 0 over the time step
                constraint_id = DfbaSubmodel.gen_neg_species_pop_constraint_id(species.id)
                constraint = conv_opt.Constraint(constr_expr,
                                                 name=constraint_id,
                                                 lower_bound=None,
                                                 upper_bound=None)
                self._conv_model.constraints.append(constraint)
                multi_reaction_constraints[constraint_id] = constraint

        return multi_reaction_constraints

    def set_up_optimizations(self):
        """ To improve performance, pre-compute and pre-allocate some data structures """
        self.set_up_continuous_time_optimizations()

        # pre-allocate dict of reaction fluxes
        self.reaction_fluxes = {rxn.id: float('NaN') for rxn in self.reactions}

        # initialize adjustments, the dict that will hold the species population change rates
        self.adjustments = {}
        for obj_species in self._dfba_obj_species:
            self.adjustments[obj_species.species.id] = 0.
        for exchange_rxn in self.exchange_rxns:
            self.adjustments[exchange_rxn.participants[0].species.id] = 0.

        for met_id, expression in self._conv_metabolite_matrices.items():
            self._conv_model.constraints.append(conv_opt.Constraint(expression, name=met_id,
                upper_bound=0.0, lower_bound=0.0))

    def get_reaction_fluxes(self):
        """ Get the reaction fluxes

        Returns:
            :obj:`dict` of :obj:`str`: :obj:`float`: reaction fluxes
        """
        return self.reaction_fluxes

    def determine_bounds(self):
        """ Determine the minimum and maximum flux bounds for each reaction

        Bounds provided by rate laws or flux bound constants in the model are written
        to `self._reaction_bounds`.
        """
        flux_comp_id = self.dfba_solver_options['flux_bounds_volumetric_compartment_id']
        if flux_comp_id == self.FLUX_BOUNDS_VOLUMETRIC_COMPARTMENT_ID:
            flux_comp_volume = self.dynamic_model.cell_volume()
        else:
            flux_comp_volume = self.dynamic_model.dynamic_compartments[flux_comp_id].volume()

        self._reaction_bounds = {}
        for reaction in self.reactions:

            # defaults
            # the default minimum constraint of an irreversible reaction is 0
            min_constr = 0.
            # None indicates no maximum constraint
            max_constr = None

            # if a rate law is available, use it to compute a max bound
            if reaction.rate_laws:
                max_constr = self.calc_reaction_rate(reaction, use_enabled=False)

            # otherwise use the fixed bounds
            elif reaction.flux_bounds:

                rxn_bounds = reaction.flux_bounds
                if isinstance(rxn_bounds.min, (int, float)) and 0 <= rxn_bounds.min:
                    min_constr = rxn_bounds.min * flux_comp_volume * scipy.constants.Avogadro

                if isinstance(rxn_bounds.max, (int, float)) and not math.isnan(rxn_bounds.max):
                    max_constr = rxn_bounds.max * flux_comp_volume * scipy.constants.Avogadro

            self._reaction_bounds[reaction.id] = (min_constr, max_constr)

    def bound_neg_species_pop_constraints(self):
        """ Update bounds in the negative species population constraints that span multiple reactions

        Update the bounds in each constraint in `self._multi_reaction_constraints` that
        prevents a species from having a negative species population in the next time step.

        Call this before each run of the FBA solver.
        """
        # set bounds in multi-reaction constraints
        for constraint_id, constraint in self._multi_reaction_constraints.items():
            species_id = DfbaSubmodel.parse_neg_species_pop_constraint_id(constraint_id)
            species_pop = self.local_species_population.read_one(self.time, species_id)
            max_allowed_consumption_of_species = species_pop / self.time_step
            constraint.lower_bound = -max_allowed_consumption_of_species

    def update_bounds(self):
        """ Update the minimum and maximum bounds of `conv_opt.Variable` based on
            the values in `self._reaction_bounds`
        """
        for rxn_id, (min_constr, max_constr) in self._reaction_bounds.items():
            self._conv_variables[rxn_id].lower_bound = min_constr
            self._conv_variables[rxn_id].upper_bound = max_constr
        self.bound_neg_species_pop_constraints()

    def del_infeasible_rxns(self, conv_opt_model, copy_model=True):
        """ Delete infeasible reactions from a convex optimization model

        Delete reactions with lower bound == upper bound == 0.

        Args:
            conv_opt_model (:obj:`conv_opt.Model`): a convex optimization model
            copy_model (:obj:`boolean`, optional): whether to copy the convex optimization model
                before modifying it; defaults to :obj:`True`

        Returns:
            :obj:`conv_opt.Model`: the convex optimization model with infeasible reactions removed
        """
        infeasible_rxns = set()
        # infeasible rxns are the variables with lower bound == upper bound == 0.
        # remove infeasible rxns from the variables
        # remove infeasible rxns from constraints that use them
        # remove constraints that have no terms remaining
        if copy_model:
            conv_opt_model = copy.deepcopy(conv_opt_model)

    def compute_population_change_rates(self):
        """ Compute the rate of change of the populations of species used by this dFBA

        Because FBA obtains a steady-state solution for reaction fluxes, only species that
        participate in the exchange reactions or dFBA objective pseudo-reactions
        at the edge of the FBA network can have non-zero rates of change.

        Updates the existing dict `self.adjustments`.
        """
        # Calculate the adjustment for each species in a pseudo-reaction
        # as the sum over reactions of stoichiometry * reaction flux

        for species_id in self.adjustments:
            self.adjustments[species_id] = 0

        # Compute for exchange species
        for exchange_rxn in self.exchange_rxns:
            self.adjustments[exchange_rxn.participants[0].species.id] -= \
                exchange_rxn.participants[0].coefficient * self.reaction_fluxes[exchange_rxn.id]

        # Compute for dFBA objective species
        for obj_species in self._dfba_obj_species:
            self.adjustments[obj_species.species.id] -= \
                obj_species.value * self.reaction_fluxes[obj_species.dfba_obj_reaction.id]

    def scale_conv_opt_model(self, conv_opt_model, copy_model=True,
                             dfba_bound_scale_factor=None, dfba_coef_scale_factor=None):
        """ Apply scaling factors to a `conv_opt` model

        Scaling factors can be used to scale the size of bounds and objective reaction 
        stoichiometric coefficients to address numerical problems with the linear programming solver.
        They are elements of `dfba_solver_options`.
        The `dfba_bound_scale_factor` option scales the bounds on reactions and constraints
        that avoid negative species populations.
        The `dfba_coef_scale_factor` scales the stoichiometric coefficients in dFBA objective
        reactions. Scaling is done by the this method.
        Symmetrically, the solution results are returned to the scale of the whole-cell model by
        inverting the consequences of these scaling factors.
        This is done by the `unscale_conv_opt_solution` method.

        Args:
            conv_opt_model (:obj:`conv_opt.Model`): a convex optimization model
            copy_model (:obj:`boolean`, optional): whether to copy the convex optimization model
                before scaling it; defaults to :obj:`True`
            dfba_bound_scale_factor (:obj:`float`, optional): factor used to scale the bounds on
                reactions and constraints that avoid negative species populations; if not supplied,
                is taken from `self.dfba_solver_options`
            dfba_coef_scale_factor (:obj:`float`, optional): factor used to scale the stoichiometric 
                coefficients in dFBA objective reactions; if not supplied, is taken from 
                `self.dfba_solver_options`

        Returns:
            :obj:`conv_opt.Model`: the scaled convex optimization model
        """
        if copy_model:
            conv_opt_model = copy.deepcopy(conv_opt_model)

        if dfba_bound_scale_factor is None:
            dfba_bound_scale_factor = self.dfba_solver_options['dfba_bound_scale_factor']
        if dfba_coef_scale_factor is None:
            dfba_coef_scale_factor = self.dfba_solver_options['dfba_coef_scale_factor']

        # scale bounds
        # skip non-numeric bounds, such as None
        for variable in conv_opt_model.variables:
            if isinstance(variable.lower_bound, (int, float)):
                variable.lower_bound *= dfba_bound_scale_factor
            if isinstance(variable.upper_bound, (int, float)):
                variable.upper_bound *= dfba_bound_scale_factor

        # scale bounds in constraints; bound values of 0 are unchanged
        for constraint in conv_opt_model.constraints:
            # this 'if' will always be true for properly formed constraints
            if isinstance(constraint.lower_bound, (int, float)):
                constraint.lower_bound *= dfba_bound_scale_factor
            if isinstance(constraint.upper_bound, (int, float)):
                constraint.upper_bound *= dfba_bound_scale_factor

        # scale stoichiometric coefficient of dfba objective reactions
        for rxn in self._dfba_obj_reactions.values():
            rxn_variable = [i for i in conv_opt_model.variables if i.name==rxn.id][0]
            for part in rxn.dfba_obj_species:
                # scale stoichiometric coefficient in steady-state constraint
                steady_state_const = [i for i in conv_opt_model.constraints if i.name==part.species.id][0]
                lin_terms = steady_state_const.terms
                obj_rxn_term = [i for i in lin_terms if i.variable==rxn_variable][0]
                obj_rxn_term.coefficient *= dfba_coef_scale_factor

                # scale stoichiometric coefficient in negative population constraint
                constraint_id = DfbaSubmodel.gen_neg_species_pop_constraint_id(part.species.id)
                neg_pop_const = [i for i in conv_opt_model.constraints if i.name==constraint_id]
                if neg_pop_const:
                    lin_terms = neg_pop_const[0].terms
                    obj_rxn_term = [i for i in lin_terms if i.variable==rxn_variable][0]
                    obj_rxn_term.coefficient *= dfba_coef_scale_factor

        return conv_opt_model

    def unscale_conv_opt_solution(self, dfba_bound_scale_factor=None, dfba_coef_scale_factor=None):
        """ Remove scaling factors from a `conv_opt` model solution

        Args:
            dfba_bound_scale_factor (:obj:`float`, optional): factor used to scale reaction and
                constraint bounds; if not supplied, is taken from `self.dfba_solver_options`
            dfba_coef_scale_factor (:obj:`float`, optional): factor used to scale the stoichiometric
                coefficients in dFBA objective reactions; if not supplied, is taken from 
                `self.dfba_solver_options`
        """
        if dfba_bound_scale_factor is None:
            dfba_bound_scale_factor = self.dfba_solver_options['dfba_bound_scale_factor']
        if dfba_coef_scale_factor is None:
            dfba_coef_scale_factor = self.dfba_solver_options['dfba_coef_scale_factor']

        for rxn_variable in self._conv_model.variables:
            if rxn_variable.name in self._dfba_obj_reactions:
                self.reaction_fluxes[rxn_variable.name] /= (dfba_bound_scale_factor / dfba_coef_scale_factor)
            else:    
                self.reaction_fluxes[rxn_variable.name] /= dfba_bound_scale_factor
        self._optimal_obj_func_value /= (dfba_bound_scale_factor / dfba_coef_scale_factor)

    def save_fba_solution(self, conv_opt_model, conv_opt_solution):
        """ Assign a FBA solution to local variables

        Args:
            conv_opt_model (:obj:`conv_opt.Model`): the convex optimization model that was solved
            conv_opt_solution (:obj:`conv_opt.Result`): the model's solution
        """
        self._optimal_obj_func_value = conv_opt_solution.value
        for rxn_variable in conv_opt_model.variables:
            self.reaction_fluxes[rxn_variable.name] = rxn_variable.primal

    def run_fba_solver(self):
        """ Run the FBA solver for one time step

        Raises:
            :obj:`DynamicMultiAlgorithmError`: if no optimal solution is found
        """
        self.determine_bounds()
        self.update_bounds()
        # print('\n--- WC Sim dFBA conv opt model ---')
        # print(ShowConvOptElements.show_conv_opt_model(self.get_conv_model()))

        # scale just before solving
        scaled_conv_opt_model = self.scale_conv_opt_model(self.get_conv_model())

        if self._model_dumps % 100 == 0:
            print('\n--- Scaled WC Sim dFBA conv opt model ---')
            print(ShowConvOptElements.show_conv_opt_model(scaled_conv_opt_model))
            print('--- END Scaled WC Sim dFBA conv opt model ---\n')

        # Set options for conv_opt solver
        options = conv_opt.SolveOptions(
            solver=conv_opt.Solver[self.dfba_solver_options['solver']],
            presolve=conv_opt.Presolve[self.dfba_solver_options['presolve']],
            solver_options=self.dfba_solver_options['solver_options']
        )
        if self._model_dumps % 100 == 0:
            # Set options for conv_opt solver
            options = conv_opt.SolveOptions(
                solver=conv_opt.Solver[self.dfba_solver_options['solver']],
                presolve=conv_opt.Presolve[self.dfba_solver_options['presolve']],
                verbosity=conv_opt.Verbosity[self.dfba_solver_options['verbosity']],
                solver_options=self.dfba_solver_options['solver_options']
            )

        # solve optimization model
        result = scaled_conv_opt_model.solve(options=options)
        end_time = self.time + self.time_step
        if result.status_code != conv_opt.StatusCode(0):
            raise DynamicMultialgorithmError(self.time, f"DfbaSubmodel {self.id}: "
                                                        f"No optimal solution found: "
                                                        f"'{result.status_message}' "
                                                        f"for time step [{self.time}, {end_time}]")
        # save and unscale the solution
        self.save_fba_solution(scaled_conv_opt_model, result)
        self.unscale_conv_opt_solution()

        if self._model_dumps % 100 == 0:
            print()
            print(f'--- time {self.time}: solution ---')
            non_zero_fluxes = [f for f in self.reaction_fluxes.values() if 0 < f]
            print(f'{len(non_zero_fluxes)} non-zero reaction fluxes')
            for rxn_id, flux in self.reaction_fluxes.items():
                if 0 < flux:
                    print(f"{rxn_id:<20} {flux:>10.2g}")
            print(f"objective {self._optimal_obj_func_value:>10.2g}")

        # Compute the population change rates
        self.compute_population_change_rates()

        ### store results in local_species_population ###
        self.local_species_population.adjust_continuously(self.time, self.id, self.adjustments,
                                                          time_step=self.time_step)

        # flush expressions that depend on species and reactions modeled by this dFBA submodel from cache
        # TODO (APG): OPTIMIZE DFBA CACHING: minimize flushing by implementing OPTIMIZE DFBA CACHING todos elsewhere
        self.dynamic_model.continuous_submodel_flush_after_populations_change(self.id)

        self._model_dumps += 1

    ### handle DES events ###
    def handle_RunFba_msg(self, event):
        """ Handle an event containing a RunFba message

        Args:
            event (:obj:`Event`): a simulation event
        """
        self.run_fba_solver()
        self.schedule_next_periodic_analysis()


# TODO (APG): in conv. opt. model output: cleanup; unittests; docstrings; move to conv_opt, etc.
# TODO (APG): report on values that are "not a double precision number (NaN)"
# TODO (APG): move to conv. opt. package
class ObjToRow(object):

    def __init__(self, col_widths, headers, attrs):
        self.col_widths = col_widths
        self.headers = headers
        self.attrs = attrs

    def header_rows(self):
        rv = []
        for header_row in self.headers:
            row = ''
            for col_header, width in zip(header_row, self.col_widths):
                row += f'{col_header:<{width}}'
            rv.append(row)
        return rv

    def obj_as_row(self, obj):
        row = ''
        for attr, width in zip(self.attrs, self.col_widths):
            value = getattr(obj, attr)
            str_value = f'{str(value):<{width-1}} '
            if isinstance(value, enum.Enum):
                str_value = f'{value.name:<{width-1}}'
            elif isinstance(value, float):
                str_value = f'{value:>{width-1}.2E} '
            elif isinstance(value, int):
                str_value = f'{value:>{width-1}d} '
            row += str_value
        return row


class ShowConvOptElements(object):

    @staticmethod
    def show_conv_opt_variable(header=False, variable=None):
        headers_1 = ('name', 'type', 'lower', 'upper')
        headers_2 = ('', '', 'bound', 'bound',)
        variable_to_row = ObjToRow((18, 18, 14, 14,),
                                   [headers_1, headers_2],
                                   ('name', 'type', 'lower_bound', 'upper_bound'))
        if header:
            return variable_to_row.header_rows()
        return variable_to_row.obj_as_row(variable)

    @staticmethod
    def show_conv_opt_constraint(header=False, constraint=None):
        # constraints: skip 'dual' which isn't used
        headers_1 = ('name', 'lower', 'upper')
        headers_2 = ('', 'bound', 'bound',)
        constraint_to_row = ObjToRow((22, 10, 10,),
                                     [headers_1, headers_2],
                                     ('name', 'lower_bound', 'upper_bound'))
        if header:
            return constraint_to_row.header_rows()
        return constraint_to_row.obj_as_row(constraint)

    @staticmethod
    def show_conv_opt_variable_term(header=False, variable_term=None):
        # I presume that the lower and upper bounds in constraint terms are ignored
        variable_term_to_row = ObjToRow((18, 18),
                                        None,
                                        ('name', 'type'))
        if header:
            return variable_term_to_row.header_rows()
        return variable_term_to_row.obj_as_row(variable_term)

    @classmethod
    def show_conv_opt_constraints(cls, constraints):
        rows = ['']
        rows.extend(cls.show_conv_opt_constraint(header=True))
        for id, constraint in constraints.items():
            rows.append(cls.show_conv_opt_constraint(constraint=constraint))
            rows.append('--- terms ---')
            for linear_term in constraint.terms:
                row = f'{linear_term.coefficient:<8}'
                row += cls.show_conv_opt_variable_term(variable_term=linear_term.variable)
                rows.append(row)
            rows.append('')
        return '\n'.join(rows)

    @classmethod
    def show_conv_opt_model(cls, conv_opt_model):
        """ Convert a `conv_opt` into a readable representation

        Args:
            conv_opt_model (:obj:`conv_opt.Model`): a convex optimization model

        Returns:
            :obj:`str`: a readable representation of `conv_opt_model`
        """
        conv_opt_model_rows = ['']
        conv_opt_model_rows.append('--- conf_opt model ---')
        conv_opt_model_rows.append(f"name: '{conv_opt_model.name}'")

        # variables: skip 'primal', 'reduced_cost', which aren't used
        conv_opt_model_rows.append('')
        conv_opt_model_rows.append('--- variables ---')
        conv_opt_model_rows.extend(cls.show_conv_opt_variable(header=True))
        for variable in conv_opt_model.variables:
            conv_opt_model_rows.append(cls.show_conv_opt_variable(variable=variable))

        # linear terms include Variable as a field, so just print the coefficient directly
        conv_opt_model_rows.append('')
        conv_opt_model_rows.append('--- constraints ---')
        conv_opt_model_rows.extend(cls.show_conv_opt_constraint(header=True))
        for constraint in conv_opt_model.constraints:
            conv_opt_model_rows.append(cls.show_conv_opt_constraint(constraint=constraint))
            conv_opt_model_rows.append('--- terms ---')
            for linear_term in constraint.terms:
                row = f'{linear_term.coefficient:<8}'
                row += cls.show_conv_opt_variable_term(variable_term=linear_term.variable)
                conv_opt_model_rows.append(row)
            conv_opt_model_rows.append('')

        conv_opt_model_rows.append(f'objective direction: {conv_opt_model.objective_direction.name}')

        conv_opt_model_rows.append('')
        conv_opt_model_rows.append('--- objective terms ---')
        for objective_term in conv_opt_model.objective_terms:
            row = f'{objective_term.coefficient:<8}'
            row += cls.show_conv_opt_variable_term(variable_term=objective_term.variable)
            conv_opt_model_rows.append(row)

        return '\n'.join(conv_opt_model_rows)