wc_model_gen/eukaryote/metabolism.py
""" Generator for metabolism submodel for eukaryotes
:Author: Yin Hoon Chew <yinhoon.chew@mssm.edu>
:Date: 2020-01-21
:Copyright: 2020, Karr Lab
:License: MIT
"""
from wc_onto import onto as wc_ontology
from wc_utils.util.units import unit_registry
import wc_model_gen.utils as utils
import collections
import conv_opt
import math
import numpy
import scipy.constants
import wc_kb
import wc_lang
import wc_model_gen
class MetabolismSubmodelGenerator(wc_model_gen.SubmodelGenerator):
""" Generator for metabolism submodel
Options:
* recycled_metabolites (:obj:`dict`): a dictionary with species IDs of metabolites
to be recycled as keys and recycled amounts in copy number as values
* carbohydrate_components (:obj:`dict`): a dictionary with species IDs of carbohydrate
metabolite components as keys and their relative compositions as values
* lipid_components (:obj:`dict`): a dictionary with species IDs of lipid
metabolite components as keys and their relative compositions as values
* atp_production (:obj:`float`): ATP requirement in copy number per cell cycle per cell;
if not provided, it will be calculated from other generated submodels
* amino_acid_ids (:obj:`list`): amino acid metabolite ids
* media_fluxes(:obj:`dict`): dictionary with reaction ids as keys and tuples of
the lower and upper bounds based on measured fluxes (M/s) as values
* exchange_reactions (:obj:`list`): IDs of exchange/demand/sink reactions
* scale_factor (:obj:`float`): scaling factor multiplied by the reaction bounds during
calibration; the default value is 1.0
* coef_scale_factor (:obj:`float`): scaling factor multiplied by the species coefficients
in the objective function during calibration; the default value is 1.0
* optimization_type (:obj:`bool`): if True, linear optimization is used during submodel
calibration, else a quadratic optimization is used; default is True
* beta (:obj:`float`, optional): ratio of Michaelis-Menten constant to substrate
concentration (Km/[S]) for use when estimating Km values, the default value is 1
* tolerance (:obj:`float`, optional): the upper limit of difference between calibrated and
measured growth as a fraction of the measured growth that can be tolerated,
the default value is 0.01
* kcat_adjustment_factor (:obj:`float`, optional): factor for adjusting the values of kcat
imputed based on flux variability analysis; the adjustment is only made for bounds
that have not been relaxed during calibration; the default value is 1, i.e.
no adjustment will be made
"""
def clean_and_validate_options(self):
""" Apply default options and validate options """
options = self.options
recycled_metabolites = options.get('recycled_metabolites', {})
options['recycled_metabolites'] = recycled_metabolites
carbohydrate_components = options.get('carbohydrate_components', {})
options['carbohydrate_components'] = carbohydrate_components
lipid_components = options.get('lipid_components', {})
options['lipid_components'] = lipid_components
atp_production = options.get('atp_production', 0.)
options['atp_production'] = atp_production
amino_acid_ids = options.get('amino_acid_ids', [])
options['amino_acid_ids'] = amino_acid_ids
media_fluxes = options.get('media_fluxes', {})
options['media_fluxes'] = media_fluxes
exchange_reactions = options.get('exchange_reactions', [])
options['exchange_reactions'] = exchange_reactions
scale_factor = options.get('scale_factor', 1.)
options['scale_factor'] = scale_factor
coef_scale_factor = options.get('coef_scale_factor', 1.)
options['coef_scale_factor'] = coef_scale_factor
optimization_type = options.get('optimization_type', True)
options['optimization_type'] = optimization_type
beta = options.get('beta', 1.)
options['beta'] = beta
tolerance = options.get('tolerance', 0.01)
options['tolerance'] = tolerance
kcat_adjustment_factor = options.get('kcat_adjustment_factor', 1.)
options['kcat_adjustment_factor'] = kcat_adjustment_factor
def gen_reactions(self):
""" Generate reactions associated with submodel
Exchange reactions for components in the media will be be created if they do
not exist. The maximum and minimum flux bounds for exchange reactions will also be set.
A biomass reaction is generated by accounting for all the metabolites
that are consumed and produced by the reactions in other submodels, and the
metabolites that are in the free pool.
"""
cell = self.knowledge_base.cell
model = self.model
submodel = self.submodel
media_fluxes = self.options['media_fluxes']
exchange_reactions = self.options['exchange_reactions']
macro_submodel = model.submodels.get_or_create(id='macromolecular_formation',
framework=wc_ontology['WC:next_reaction_method'])
cytosol = model.compartments.get_one(id='c')
print('Start generating metabolism submodel...')
# Create exchange reactions for components in the media if they do not exist
for exchange_rxn in media_fluxes:
reaction_id = exchange_rxn + '_kb'
if not model.reactions.get_one(id=reaction_id):
kb_rxn = cell.reactions.get_one(id=exchange_rxn)
model_rxn = model.reactions.create(
submodel=submodel,
id=reaction_id,
name=kb_rxn.name,
reversible=kb_rxn.reversible,
comments=kb_rxn.comments)
for participant in kb_rxn.participants:
kb_species = participant.species
model_species_type = model.species_types.get_one(id=kb_species.species_type.id)
model_compartment = model.compartments.get_one(id=kb_species.compartment.id)
model_species = model.species.get_or_create(
species_type=model_species_type, compartment=model_compartment)
model_rxn.participants.add(
model_species.species_coefficients.get_or_create(coefficient=participant.coefficient))
# Set the flux bounds of exchange/demand/sink reactions
for reaction in submodel.reactions:
if reaction.id[:-3] in exchange_reactions:
reaction.flux_bounds = wc_lang.FluxBounds(units=unit_registry.parse_units('M s^-1'))
# Set the flux bounds of exchange reactions for media components to the measured fluxes
if reaction.id[:-3] in media_fluxes:
bounds = media_fluxes[reaction.id[:-3]]
reaction.flux_bounds.min = math.nan if bounds[0]==None else bounds[0]
reaction.flux_bounds.max = math.nan if bounds[1]==None else bounds[1]
# Set the exchange reactions for other media components to be unbounded
elif reaction.participants[0].species.distribution_init_concentration:
reaction.flux_bounds.max = math.nan
if reaction.reversible:
reaction.flux_bounds.min = math.nan
else:
reaction.flux_bounds.min = 0.
# Set the bounds of other exchange/demand/sink reactions to zero
else:
reaction.flux_bounds.min = 0.
reaction.flux_bounds.max = 0.
if reaction.flux_bounds.min == 0.:
reaction.reversible = False
# Create biomass reaction
biomass_rxn = model.dfba_obj_reactions.create(
submodel=submodel,
id='biomass_reaction',
name='Biomass reaction',
units=unit_registry.parse_units('s^-1'),
cell_size_units=unit_registry.parse_units('l'))
# Add metabolites in the free pool to the LHS of biomass reaction
for conc in model.distribution_init_concentrations:
if conc.species.species_type.type == wc_ontology['WC:metabolite'] and \
conc.species.compartment.id!='e':
biomass_rxn.dfba_obj_species.add(
conc.species.dfba_obj_species.get_or_create(model=model, value=-conc.mean))
# Add metabolites to be recycled to the RHS of biomass reaction
for met_id, amount in self.options['recycled_metabolites'].items():
model_species = model.species.get_one(id=met_id)
model_species_coefficient = biomass_rxn.dfba_obj_species.get_one(species=model_species)
if model_species_coefficient:
model_species_coefficient.value += amount
else:
biomass_rxn.dfba_obj_species.add(
model_species.dfba_obj_species.get_or_create(model=model, value=amount))
# Add carbohydrate components to the LHS of biomass reaction and create carbohydrate formation reaction
carbohydrate_st = model.species_types.create(id='carbohydrate', name='carbohydrate',
type = wc_ontology['WC:pseudo_species'], structure = wc_lang.ChemicalStructure())
carbohydrate_species = model.species.create(species_type=carbohydrate_st, compartment=cytosol)
carbohydrate_species.id = carbohydrate_species.gen_id()
conc_model = model.distribution_init_concentrations.create(
species=carbohydrate_species,
mean=1,
units=unit_registry.parse_units('molecule'),
)
conc_model.id = conc_model.gen_id()
carb_rxn = model.reactions.create(
submodel=macro_submodel,
id='carbohydrate_formation',
name='carbohydrate formation',
reversible=False)
carb_rxn.participants.add(carbohydrate_species.species_coefficients.create(coefficient=1))
water_st = model.species_types.get_one(id='h2o')
water_weight = water_st.structure.molecular_weight
unscaled_mass = 0
for met_id, rel_amount in self.options['carbohydrate_components'].items():
model_species = model.species.get_one(id=met_id)
unscaled_mass += (model_species.species_type.structure.molecular_weight - \
water_weight) * rel_amount
scale_factor = (cell.parameters.get_one(id='total_carbohydrate_mass').value - \
water_weight / scipy.constants.Avogadro) / unscaled_mass
charge = 0
weight = 0
monomer_no = 0
for met_id, rel_amount in self.options['carbohydrate_components'].items():
amount = rel_amount * scale_factor * scipy.constants.Avogadro
monomer_no += amount
model_species = model.species.get_one(id=met_id)
charge += model_species.species_type.structure.charge * amount
weight += (model_species.species_type.structure.molecular_weight - \
water_weight) * amount
carb_rxn.participants.add(model_species.species_coefficients.create(coefficient=-amount))
model_species_coefficient = biomass_rxn.dfba_obj_species.get_one(species=model_species)
if model_species_coefficient:
model_species_coefficient.value -= amount
else:
biomass_rxn.dfba_obj_species.add(
model_species.dfba_obj_species.get_or_create(model=model, value=-amount))
carbohydrate_st.structure.molecular_weight = weight + water_weight
carbohydrate_st.structure.charge = round(charge)
# Add water as product of carbohydrate synthesis and to be recycled in the biomass reaction
water_species = water_st.species.get_one(compartment=cytosol)
carb_rxn.participants.add(water_species.species_coefficients.get_or_create(
coefficient=monomer_no - 1))
water_species_coefficient = biomass_rxn.dfba_obj_species.get_one(species=water_species)
if water_species_coefficient:
water_species_coefficient.value += monomer_no - 1
else:
biomass_rxn.dfba_obj_species.add(
water_species.dfba_obj_species.get_or_create(model=model, value=monomer_no - 1))
# Add lipid components to the LHS of biomass reaction and create lipid formation reaction
lipid_st = model.species_types.create(id='lipid', name='lipid',
type = wc_ontology['WC:pseudo_species'], structure = wc_lang.ChemicalStructure())
lipid_species = model.species.create(species_type=lipid_st, compartment=cytosol)
lipid_species.id = lipid_species.gen_id()
conc_model = model.distribution_init_concentrations.create(
species=lipid_species,
mean=1,
units=unit_registry.parse_units('molecule'),
)
conc_model.id = conc_model.gen_id()
lipid_rxn = model.reactions.create(
submodel=macro_submodel,
id='lipid_formation',
name='lipid formation',
reversible=False)
lipid_rxn.participants.add(lipid_species.species_coefficients.create(coefficient=1))
unscaled_mass = 0
for met_id, rel_amount in self.options['lipid_components'].items():
model_species = model.species.get_one(id=met_id)
unscaled_mass += model_species.species_type.structure.molecular_weight * rel_amount
scale_factor = cell.parameters.get_one(id='total_lipid_mass').value / unscaled_mass
charge = 0
weight = 0
for met_id, rel_amount in self.options['lipid_components'].items():
amount = rel_amount * scale_factor * scipy.constants.Avogadro
model_species = model.species.get_one(id=met_id)
charge += model_species.species_type.structure.charge * amount
weight += model_species.species_type.structure.molecular_weight * amount
lipid_rxn.participants.add(model_species.species_coefficients.create(coefficient=-amount))
model_species_coefficient = biomass_rxn.dfba_obj_species.get_one(species=model_species)
if model_species_coefficient:
model_species_coefficient.value -= amount
else:
biomass_rxn.dfba_obj_species.add(
model_species.dfba_obj_species.get_or_create(model=model, value=-amount))
lipid_st.structure.molecular_weight = weight
lipid_st.structure.charge = round(charge)
# Determine the consumption(production) of metabolites in other submodels
metabolic_participants = ['atp', 'ctp', 'gtp', 'utp', 'datp', 'dttp', 'dgtp', 'dctp', 'ppi', 'amp', 'cmp',
'gmp', 'ump', 'h2o', 'h', 'adp', 'pi', 'gdp', 'selnp'] + self.options['amino_acid_ids']
met_requirement = {'{}[{}]'.format(i, j.id): 0 for i in metabolic_participants for j in model.compartments}
doubling_time = model.parameters.get_one(id='mean_doubling_time').value
rnas_kb = cell.species_types.get(__type=wc_kb.eukaryote.TranscriptSpeciesType)
for rna_kb in rnas_kb:
rna_product = model.species_types.get_one(id=rna_kb.id).species[0]
half_life = rna_kb.properties.get_one(property='half-life').get_value()
mean_concentration = rna_product.distribution_init_concentration.mean
if model.submodels.get_one(id='transcription'):
# Add ATP hydrolysis requirement for DNA melting and promoter escape by RNA polymerase II
transcription_init_reaction = model.reactions.get_one(id='transcription_initiation_{}'.format(rna_kb.id))
if transcription_init_reaction:
for part in transcription_init_reaction.participants:
if part.species.id in met_requirement:
met_requirement[part.species.id] += -part.coefficient * mean_concentration * \
(1 + doubling_time / half_life)
# Transcription elongation
transcription_el_reaction = model.reactions.get_one(id='transcription_elongation_{}'.format(rna_kb.id))
if transcription_el_reaction:
for part in transcription_el_reaction.participants:
if part.species.id in met_requirement:
met_requirement[part.species.id] += -part.coefficient * mean_concentration * \
(1 + doubling_time / half_life)
if model.submodels.get_one(id='rna_degradation'):
# RNA degradation
rna_deg_reaction = model.reactions.get_one(id='degradation_{}'.format(rna_kb.id))
for part in rna_deg_reaction.participants:
if part.species.id in met_requirement:
met_requirement[part.species.id] += -part.coefficient * mean_concentration * \
doubling_time / half_life
if rna_kb.protein:
protein_model = model.species_types.get_one(id=rna_kb.protein.id)
translation_compartment = rna_kb.species[0].compartment.id
prot_half_life = rna_kb.protein.properties.get_one(property='half-life').get_value()
complex_model_stoic = {model.species_types.get_one(id=i.id):j.coefficient for i in cell.species_types.get(
__type=wc_kb.core.ComplexSpeciesType) for j in i.subunits if j.species_type==rna_kb.protein}
total_concentration = sum([i.distribution_init_concentration.mean for i in protein_model.species]) + \
sum([i.distribution_init_concentration.mean*v for k,v in complex_model_stoic.items() for i in k.species \
if i.distribution_init_concentration])
if model.submodels.get_one(id='translation_translocation'):
# Translation initiation
translation_init_reaction = model.reactions.get_one(id='translation_initiation_{}'.format(rna_kb.id))
for part in translation_init_reaction.participants:
if part.species.id in met_requirement:
met_requirement[part.species.id] += -part.coefficient * total_concentration * \
(1 + doubling_time / prot_half_life)
# Translation elongation
translation_el_reaction = model.reactions.get_one(id='translation_elongation_{}'.format(rna_kb.id))
for part in translation_el_reaction.participants:
if part.species.id in met_requirement:
met_requirement[part.species.id] += -part.coefficient * total_concentration * \
(1 + doubling_time / prot_half_life)
for protein_sp in protein_model.species:
prot_concentration = protein_sp.distribution_init_concentration.mean
# Translocation
translocation_reaction = model.reactions.get_one(
id='translocation_{}_{}_to_{}'.format(
protein_model.id, translation_compartment, protein_sp.compartment.id))
if translocation_reaction:
for part in translocation_reaction.participants:
if part.species.id in met_requirement:
met_requirement[part.species.id] += -part.coefficient * prot_concentration * \
(1 + doubling_time / prot_half_life)
if model.submodels.get_one(id='protein_degradation'):
# Protein degradation
prot_deg_reaction = model.reactions.get_one(
id='{}_{}_degradation'.format(protein_model.id, protein_sp.compartment.id))
for part in prot_deg_reaction.participants:
if part.species.id in met_requirement:
met_requirement[part.species.id] += -part.coefficient * prot_concentration * \
doubling_time / prot_half_life
# Metabolite requirement as co-factor of complexes
complex_kb = cell.species_types.get(__type=wc_kb.core.ComplexSpeciesType)
for com in complex_kb:
for subunit in com.subunits:
if type(subunit.species_type)==wc_kb.MetaboliteSpeciesType:
met_id = subunit.species_type.id
coef = subunit.coefficient if subunit.coefficient else 1
model_com = model.species_types.get_one(id=com.id)
for sp in model_com.species:
if sp.distribution_init_concentration:
if sp.distribution_init_concentration.mean > 0.:
compartment = sp.compartment
model_met_id = met_id + f'[{compartment.id}]'
if model_met_id in met_requirement:
met_requirement[model_met_id] += sp.distribution_init_concentration.mean * \
coef
else:
met_requirement[model_met_id] = sp.distribution_init_concentration.mean * \
coef
# DNA replication
chromosomes = cell.species_types.get(__type=wc_kb.core.DnaSpeciesType)
for chrom in chromosomes:
compartment_id = 'm' if 'M' in chrom.id else 'n'
seq = chrom.get_seq()
l = len(seq)
n_a = seq.upper().count('A')
n_c = seq.upper().count('C')
n_g = seq.upper().count('G')
n_t = seq.upper().count('T')
n_n = seq.upper().count('N')
known_bases = n_a + n_c + n_g + n_t
n_a += round(n_a / known_bases * n_n)
n_c += round(n_c / known_bases * n_n)
n_g += round(n_g / known_bases * n_n)
n_t = l - (n_a + n_c + n_g)
if chrom.double_stranded:
n_a = n_a + n_t
n_t = n_a
n_c = n_c + n_g
n_g = n_c
strand_no = 2
else:
strand_no = 1
dntp_count = {
'datp[{}]'.format(compartment_id): n_a * chrom.ploidy,
'dctp[{}]'.format(compartment_id): n_c * chrom.ploidy,
'dgtp[{}]'.format(compartment_id): n_g * chrom.ploidy,
'dttp[{}]'.format(compartment_id): n_t * chrom.ploidy,
}
for base, count in dntp_count.items():
met_requirement[base] += count
met_requirement['ppi[{}]'.format(compartment_id)] += -((l-1) * strand_no * chrom.ploidy)
# Use whole-cell ATP usage instead if provided
if self.options['atp_production']:
met_requirement['atp[c]'] = self.options['atp_production']
# Add consumption(production) of metabolites in other submodels to the LHS(RHS) of biomass reaction
for met_id, amount in met_requirement.items():
if amount:
model_species = model.species.get_one(id=met_id)
model_species_coefficient = biomass_rxn.dfba_obj_species.get_one(species=model_species)
if model_species_coefficient:
model_species_coefficient.value -= amount
else:
biomass_rxn.dfba_obj_species.add(
model_species.dfba_obj_species.get_or_create(model=model, value=-amount))
for species in biomass_rxn.dfba_obj_species:
species.id = species.gen_id()
species.units = unit_registry.parse_units('molecule cell^-1')
# Add biomass reaction as objective function
submodel.dfba_obj = wc_lang.DfbaObjective(model=model)
submodel.dfba_obj.id = submodel.dfba_obj.gen_id()
obj_expression = biomass_rxn.id
dfba_obj_expression, error = wc_lang.DfbaObjectiveExpression.deserialize(
obj_expression, {wc_lang.DfbaObjReaction: {biomass_rxn.id: biomass_rxn}})
assert error is None, str(error)
submodel.dfba_obj.expression = dfba_obj_expression
print('Biomass reaction has been generated and set as the objective function')
def gen_rate_laws(self):
""" Generate rate laws for carbohydrate and lipid formation reactions. High
rates are assumed so that the macromolecules are formed as soon as the
components are available.
"""
model = self.model
cytosol = model.compartments.get_one(id='c')
beta = self.options['beta']
# Rate laws for carbohydrate and lipid formation
for reaction in model.submodels.get_one(id='macromolecular_formation').reactions:
for reactant in reaction.get_reactants():
if not reactant.distribution_init_concentration:
conc_model = model.distribution_init_concentrations.create(
species=reactant,
mean=0.,
units=unit_registry.parse_units('molecule'),
comments='Set to zero assuming there is no free pool concentration')
conc_model.id = conc_model.gen_id()
substrates = [[i.species_type.id] for i in reaction.get_reactants()]
expressions, all_species, all_parameters, all_volumes, all_observables = utils.gen_response_functions(
model, beta, reaction.id, 'macromolecular', cytosol, substrates)
k_cat = model.parameters.create(
id='k_cat_{}'.format(reaction.id),
value=2e06,
type=wc_ontology['WC:k_cat'],
units=unit_registry.parse_units('s^-1'),
comments = 'A high rate constant was assigned so that the simulated ' \
'rate of macromolecular formation will be within the higher range'
)
all_parameters[k_cat.id] = k_cat
expression = '{} * {} * 2**{}'.format(
k_cat.id,
' * '.join(expressions),
len(expressions),
)
rate_law_expression, error = wc_lang.RateLawExpression.deserialize(expression, {
wc_lang.Species: all_species,
wc_lang.Parameter: all_parameters,
wc_lang.Function: all_volumes,
})
assert error is None, str(error)
rate_law = model.rate_laws.create(
direction=wc_lang.RateLawDirection.forward,
type=None,
expression=rate_law_expression,
reaction=reaction,
)
rate_law.id = rate_law.gen_id()
def calibrate_submodel(self):
""" Calibrate the submodel by adjusting measured kinetic constants to achieve
the measured growth rate while minimizing the total necessary adjustment.
Kinetic constants that have no measured values are then imputed based on
values determined by Flux Variability Analysis.
"""
model = self.model
scale_factor = self.options['scale_factor']
coef_scale_factor = self.options['coef_scale_factor']
tolerance = self.options['tolerance']
measured_growth = math.log(2)/model.parameters.get_one(id='mean_doubling_time').value
conv_model, conv_variables, lb_adjustable, ub_adjustable = self.conv_for_optim()
result = conv_model.solve()
growth = result.value/scale_factor*coef_scale_factor
print('Optimized flux through biomass reaction before calibration is {}'.format(growth))
if not numpy.isnan(growth):
# Restrict bounds if underconstrained
if growth > measured_growth:
flux_range = self.flux_variability_analysis(
conv_model, fraction_of_objective=measured_growth / growth)
conv_model, _, _, _ = self.conv_for_optim()
result = conv_model.solve()
calibrated_growth = result.value/scale_factor*coef_scale_factor
self.options['kcat_adjustment_factor'] = measured_growth / calibrated_growth
self.impute_kinetic_constant(flux_range)
conv_model, _, _, _ = self.conv_for_optim()
result = conv_model.solve()
growth = result.value/scale_factor*coef_scale_factor
print('Optimized flux through biomass reaction after calibration is {}'.format(growth))
# Relax bounds if overconstrained
lower = upper = {}
while (measured_growth - growth)/measured_growth > tolerance:
target = {'biomass_reaction': measured_growth}
lower, upper = self.relax_bounds(target, lb_adjustable, ub_adjustable)
fixed_values = {}
for reaction_id, adjustment in lower.items():
conv_variables[reaction_id].lower_bound -= adjustment*scale_factor
fixed_values[reaction_id] = conv_variables[reaction_id].lower_bound
for reaction_id, adjustment in upper.items():
conv_variables[reaction_id].upper_bound += adjustment*scale_factor
fixed_values[reaction_id] = conv_variables[reaction_id].upper_bound
# Impute kinetic constants with no measured values based on range values from Flux Variability Analysis
if any(numpy.isnan(i) for i in lower.values()) or any(numpy.isnan(i) for i in upper.values()):
print('No solution is found during model calibration')
break
else:
flux_range = self.flux_variability_analysis(conv_model, fixed_values=fixed_values)
self.impute_kinetic_constant(flux_range)
conv_model, _, _, _ = self.conv_for_optim()
result = conv_model.solve()
growth = result.value/scale_factor*coef_scale_factor
print('Optimized flux through biomass reaction after relaxation is {}'.format(growth))
def determine_bounds(self):
""" Determine the minimum and maximum bounds for each reaction. The bounds will be
scaled according to the provided scale factor.
Returns:
:obj:`dict`: dictionary with reaction IDs as keys and tuples of scaled minimum
and maximum bounds as values
:obj:`list`: list of IDs of reactions whose lower bounds are fully determined from
measured kinetic data and enzyme concentrations are not zero
:obj:`list`: list of IDs of reactions whose upper bounds are fully determined from
measured kinetic data and enzyme concentrations are not zero
"""
model = self.model
submodel = self.submodel
scale_factor = self.options['scale_factor']
cell_volume = model.parameters.get_one(id='cell_volume').value
# Determine all the reaction bounds
reaction_bounds = {}
lower_bound_adjustable = []
upper_bound_adjustable = []
for reaction in submodel.reactions:
# Set the bounds of exchange/demand/sink reactions
if reaction.flux_bounds:
rxn_bounds = reaction.flux_bounds
if rxn_bounds.min:
min_constr = None if numpy.isnan(rxn_bounds.min) else \
rxn_bounds.min*cell_volume*scipy.constants.Avogadro*scale_factor
else:
min_constr = rxn_bounds.min
if rxn_bounds.max:
max_constr = None if numpy.isnan(rxn_bounds.max) else \
rxn_bounds.max*cell_volume*scipy.constants.Avogadro*scale_factor
else:
max_constr = rxn_bounds.max
# Set the bounds of reactions with measured kinetic constants
elif reaction.rate_laws:
for_ratelaw = reaction.rate_laws.get_one(direction=wc_lang.RateLawDirection.forward)
if for_ratelaw:
if all(i.distribution_init_concentration.mean==0. for i in for_ratelaw.expression.species):
max_constr = 0.
elif not any(numpy.isnan(p.value) for p in for_ratelaw.expression.parameters):
max_constr = for_ratelaw.expression._parsed_expression.eval({
wc_lang.Species: {i.id: i.distribution_init_concentration.mean \
for i in for_ratelaw.expression.species}
}) * scale_factor
upper_bound_adjustable.append(reaction.id)
else:
max_constr = None
else:
max_constr = None
rev_ratelaw = reaction.rate_laws.get_one(direction=wc_lang.RateLawDirection.backward)
if rev_ratelaw:
if all(i.distribution_init_concentration.mean==0. for i in rev_ratelaw.expression.species):
min_constr = 0.
elif not any(numpy.isnan(p.value) for p in rev_ratelaw.expression.parameters):
min_constr = -rev_ratelaw.expression._parsed_expression.eval({
wc_lang.Species: {i.id: i.distribution_init_concentration.mean \
for i in rev_ratelaw.expression.species}
}) * scale_factor
lower_bound_adjustable.append(reaction.id)
else:
min_constr = None
elif reaction.reversible:
min_constr = None
else:
min_constr = 0.
# Set other reactions to be unbounded
else:
max_constr = None
if reaction.reversible:
min_constr = None
else:
min_constr = 0.
reaction_bounds[reaction.id] = (min_constr, max_constr)
return reaction_bounds, lower_bound_adjustable, upper_bound_adjustable
def conv_for_optim(self):
""" Convert metabolism reactions into an optimization problem model
Returns:
:obj:`conv_opt.Model`: a conv_opt model for optimization
:obj:`dict`: a dictionary with variable name as keys and
conv_opt variable objects as values
:obj:`list`: list of IDs of reactions whose lower bounds are fully determined from
measured kinetic data and enzyme concentrations are not zero
:obj:`list`: list of IDs of reactions whose upper bounds are fully determined from
measured kinetic data and enzyme concentrations are not zero
"""
model = self.model
submodel = self.submodel
coef_scale_factor = self.options['coef_scale_factor']
optimization_type = self.options['optimization_type']
self._reaction_bounds, lower_bound_adjustable, upper_bound_adjustable = self.determine_bounds()
# Formulate the optimization problem using the conv_opt package
conv_model = conv_opt.Model(name='model')
conv_variables = {}
conv_metabolite_matrices = collections.defaultdict(list)
for reaction in submodel.reactions:
conv_variables[reaction.id] = conv_opt.Variable(
name=reaction.id, type=conv_opt.VariableType.continuous,
lower_bound=self._reaction_bounds[reaction.id][0],
upper_bound=self._reaction_bounds[reaction.id][1])
conv_model.variables.append(conv_variables[reaction.id])
for part in reaction.participants:
conv_metabolite_matrices[part.species.id].append(
conv_opt.LinearTerm(conv_variables[reaction.id],
part.coefficient))
biomass_reaction = submodel.dfba_obj_reactions[0]
conv_variables[biomass_reaction.id] = conv_opt.Variable(
name=biomass_reaction.id, type=conv_opt.VariableType.continuous, lower_bound=0)
conv_model.variables.append(conv_variables[biomass_reaction.id])
for part in biomass_reaction.dfba_obj_species:
conv_metabolite_matrices[part.species.id].append(
conv_opt.LinearTerm(conv_variables[biomass_reaction.id],
part.value*coef_scale_factor))
for met_id, expression in conv_metabolite_matrices.items():
conv_model.constraints.append(conv_opt.Constraint(expression, name=met_id,
upper_bound=0.0, lower_bound=0.0))
if optimization_type:
conv_model.objective_terms = [conv_opt.LinearTerm(
conv_variables[biomass_reaction.id], 1.),]
else:
conv_model.objective_terms.append(conv_opt.QuadraticTerm(
conv_variables[biomass_reaction.id], conv_variables[biomass_reaction.id], 1.))
conv_model.objective_direction = conv_opt.ObjectiveDirection.maximize
options = conv_opt.SolveOptions(
solver=conv_opt.Solver.cplex,
presolve=conv_opt.Presolve.on,
solver_options={
'cplex': {
'parameters': {
'emphasis': {
'numerical': 1,
},
'read': {
'scale': 1,
},
},
},
})
return conv_model, conv_variables, lower_bound_adjustable, upper_bound_adjustable
def relax_bounds(self, target, lower_bound_adjustable, upper_bound_adjustable):
""" Relax bounds to achieve set target flux(es) while minimizing the total necessary adjustment
to the flux bounds
Args:
target (:obj:`dict`): a dictionary of IDs of variables to be set and their target values
lower_bound_adjustable (:obj:`list`): list of IDs of variables whose lower bounds are to be adjusted
upper_bound_adjustable (:obj:`list`): list of IDs of variables whose upper bounds are to be adjusted
Returns:
:obj:`dict`: a dictionary of reaction IDs as keys and the necessary lower bound adjustments
as values
:obj:`dict`: a dictionary of reaction IDs as keys and the necessary upper bound adjustments
as values
"""
submodel = self.submodel
scale_factor = self.options['scale_factor']
coef_scale_factor = self.options['coef_scale_factor']
optimization_type = self.options['optimization_type']
conv_model = conv_opt.Model(name='temporary_model')
alpha_lower = {}
alpha_upper = {}
conv_fluxes = {}
conv_alpha = {}
conv_metabolite_matrices = collections.defaultdict(list)
adjusted = set(lower_bound_adjustable + upper_bound_adjustable)
for reaction in submodel.reactions:
min_constr = self._reaction_bounds[reaction.id][0]
max_constr = self._reaction_bounds[reaction.id][1]
if reaction.id in adjusted:
conv_alpha['alpha_' + reaction.id] = conv_opt.Variable(name='alpha_'+reaction.id,
type=conv_opt.VariableType.continuous,
lower_bound=0.0)
conv_model.variables.append(conv_alpha['alpha_' + reaction.id])
if reaction.id in lower_bound_adjustable and reaction.id in upper_bound_adjustable:
conv_fluxes[reaction.id] = conv_opt.Variable(
name=reaction.id, type=conv_opt.VariableType.continuous)
conv_model.constraints.append(conv_opt.Constraint([conv_opt.LinearTerm(conv_fluxes[reaction.id], 1),
conv_opt.LinearTerm(conv_alpha['alpha_'+reaction.id], -1)],
name=reaction.id+'_max',
upper_bound=max_constr))
conv_model.constraints.append(conv_opt.Constraint([conv_opt.LinearTerm(conv_fluxes[reaction.id], 1),
conv_opt.LinearTerm(conv_alpha['alpha_'+reaction.id], 1)],
name=reaction.id+'_min',
lower_bound=min_constr))
alpha_lower[reaction.id] = 0.
alpha_upper[reaction.id] = 0.
elif reaction.id in lower_bound_adjustable:
conv_fluxes[reaction.id] = conv_opt.Variable(name=reaction.id,
type=conv_opt.VariableType.continuous,
upper_bound=max_constr)
conv_model.constraints.append(conv_opt.Constraint([conv_opt.LinearTerm(conv_fluxes[reaction.id], 1),
conv_opt.LinearTerm(conv_alpha['alpha_' + reaction.id], 1)],
name=reaction.id+'_min',
lower_bound=min_constr))
alpha_lower[reaction.id] = 0.
elif reaction.id in upper_bound_adjustable:
conv_fluxes[reaction.id] = conv_opt.Variable(name=reaction.id,
type=conv_opt.VariableType.continuous,
lower_bound=min_constr)
conv_model.constraints.append(conv_opt.Constraint([conv_opt.LinearTerm(conv_fluxes[reaction.id], 1),
conv_opt.LinearTerm(conv_alpha['alpha_' + reaction.id], -1)],
name=reaction.id+'_max',
upper_bound=max_constr))
alpha_upper[reaction.id] = 0.
else:
conv_fluxes[reaction.id] = conv_opt.Variable(name=reaction.id, type=conv_opt.VariableType.continuous,
lower_bound=min_constr, upper_bound=max_constr)
else:
conv_fluxes[reaction.id] = conv_opt.Variable(name=reaction.id, type=conv_opt.VariableType.continuous,
lower_bound=min_constr, upper_bound=max_constr)
conv_model.variables.append(conv_fluxes[reaction.id])
for part in reaction.participants:
conv_metabolite_matrices[part.species.id].append(
conv_opt.LinearTerm(conv_fluxes[reaction.id],
part.coefficient))
biomass_reaction = submodel.dfba_obj_reactions[0]
conv_fluxes[biomass_reaction.id] = conv_opt.Variable(
name=biomass_reaction.id, type=conv_opt.VariableType.continuous, lower_bound=0)
conv_model.variables.append(conv_fluxes[biomass_reaction.id])
for part in biomass_reaction.dfba_obj_species:
conv_metabolite_matrices[part.species.id].append(
conv_opt.LinearTerm(conv_fluxes[biomass_reaction.id],
part.value*coef_scale_factor))
for met_id, expression in conv_metabolite_matrices.items():
conv_model.constraints.append(conv_opt.Constraint(expression, name=met_id,
upper_bound=0.0, lower_bound=0.0))
for variable_id, val in target.items():
if variable_id == 'biomass_reaction':
conv_model.constraints.append(
conv_opt.Constraint([conv_opt.LinearTerm(conv_fluxes[variable_id], 1)],
name=variable_id+'_target',
lower_bound=val*scale_factor/coef_scale_factor,
upper_bound=val*scale_factor/coef_scale_factor))
else:
conv_model.constraints.append(
conv_opt.Constraint([conv_opt.LinearTerm(conv_fluxes[variable_id], 1)],
name=variable_id+'_target',
lower_bound=val*scale_factor,
upper_bound=val*scale_factor))
for k, v in conv_alpha.items():
if optimization_type:
conv_model.objective_terms.append(conv_opt.LinearTerm(v, 1.))
else:
conv_model.objective_terms.append(conv_opt.QuadraticTerm(v, v, 1.))
conv_model.objective_direction = conv_opt.ObjectiveDirection.minimize
options = conv_opt.SolveOptions(
solver=conv_opt.Solver.cplex,
presolve=conv_opt.Presolve.on,
solver_options={
'cplex': {
'parameters': {
'emphasis': {
'numerical': 1,
},
'read': {
'scale': 1,
},
},
},
})
result = conv_model.solve()
for i, v in enumerate(conv_model.variables):
if v.name[:6] == 'alpha_' and v.primal:
if v.name[6:] in alpha_lower:
alpha_lower[v.name[6:]] = v.primal / scale_factor
if v.name[6:] in alpha_upper:
alpha_upper[v.name[6:]] = v.primal / scale_factor
alpha_lower = {k:v for k,v in alpha_lower.items() if v}
alpha_upper = {k:v for k,v in alpha_upper.items() if v}
return alpha_lower, alpha_upper
def flux_variability_analysis(self, conv_model, fraction_of_objective=1.0,
fixed_values=None, target_reactions=None):
""" Conduct flux variability analysis by:
1) Optimizing the model by maximizing the objective function
2) Setting the objective function to the optimal value
3) Determining the maximal and minimal fluxes for each reaction by
maximizing and minimizing the reaction
Args:
conv_model (:obj:`conv_opt.Model`): a conv_opt model for optimization
fraction_of_objective (:obj:`float`, optional): network state with respect
to the optimal solution, e.g. 0.9 maximal possible biomass production rate
(allowable range: 0.0-1.0, default = 1.0)
fixed_values (:obj:`dict`, optional): a dictionary of reaction IDs as keys
and the values at which the reaction fluxes are to be set
target_reactions (:obj:`list`, optional): a list of reaction IDs where FVA
will be conducted (the default is to conduct FVA on all reactions in the model)
Returns:
:obj:`dict`: a dictionary with reaction ids as keys and tuples containing the
minimum and maximum fluxes as values
"""
scale_factor = self.options['scale_factor']
coef_scale_factor = self.options['coef_scale_factor']
options = conv_opt.SolveOptions(
solver=conv_opt.Solver.cplex,
presolve=conv_opt.Presolve.on,
solver_options={
'cplex': {
'parameters': {
'emphasis': {
'numerical': 1,
},
'read': {
'scale': 1,
},
},
},
})
pre_solution = conv_model.solve()
fva_target = pre_solution.value*fraction_of_objective
model = conv_model.convert(options)
cplex_model = model.load(conv_model)
cplex_model.solve()
cplex_model.set_log_stream(None)
cplex_model.set_error_stream(None)
cplex_model.set_warning_stream(None)
cplex_model.set_results_stream(None)
cplex_model.variables.set_lower_bounds(cplex_model.objective.get_linear().index(1), fva_target)
cplex_model.variables.set_upper_bounds(cplex_model.objective.get_linear().index(1), fva_target)
if fixed_values:
for reaction, flux in fixed_values.items():
cplex_model.variables.set_lower_bounds(reaction, flux)
cplex_model.variables.set_upper_bounds(reaction, flux)
if target_reactions:
reaction_list = target_reactions
else:
reaction_list = [reaction for reaction in cplex_model.variables.get_names()]
flux_range = {}
cplex_model.objective.set_sense(cplex_model.objective.sense.maximize)
cplex_model.objective.set_linear('biomass_reaction', 0.0)
for reaction in reaction_list:
cplex_model.objective.set_linear(reaction, 1.0)
cplex_model.solve()
pre_solution = cplex_model.solution
max_val = min(pre_solution.get_objective_value(),
cplex_model.variables.get_upper_bounds(reaction)) / scale_factor
if reaction=='biomass_reaction':
max_val *= coef_scale_factor
flux_range[reaction] = (max_val,)
cplex_model.objective.set_linear(reaction, 0.0)
cplex_model.objective.set_sense(cplex_model.objective.sense.minimize)
for reaction in reaction_list:
cplex_model.objective.set_linear(reaction, 1.0)
cplex_model.solve()
pre_solution = cplex_model.solution
min_val = max(pre_solution.get_objective_value(),
cplex_model.variables.get_lower_bounds(reaction)) / scale_factor
if reaction=='biomass_reaction':
min_val *= coef_scale_factor
if min_val > flux_range[reaction][0]:
flux_range[reaction] += (flux_range[reaction][0],)
else:
flux_range[reaction] += (min_val,)
cplex_model.objective.set_linear(reaction, 0.0)
flux_range = {k: (v[1], v[0]) for k,v in flux_range.items()}
return flux_range
def impute_kinetic_constant(self, bound_values):
""" Impute the values of kcat that have not been measured.
Args:
bound_values (:obj:`dict`): Keys are reaction IDs and values are tuples
of the minimum and maximum bounds that would be used to impute kcat
"""
submodel = self.submodel
kcat_adjustment_factor = self.options['kcat_adjustment_factor']
median_kcat = numpy.median([k.value for i in submodel.reactions for j in i.rate_laws \
for k in j.expression.parameters if k.value and not numpy.isnan(k.value)])
self._law_bound_pairs = {}
for reaction in submodel.reactions:
if reaction.rate_laws:
bounds = bound_values[reaction.id]
for_ratelaw = reaction.rate_laws.get_one(direction=wc_lang.RateLawDirection.forward)
rev_ratelaw = reaction.rate_laws.get_one(direction=wc_lang.RateLawDirection.backward)
if for_ratelaw:
if bounds[1] >= 0:
self._law_bound_pairs[for_ratelaw] = bounds[1]
else:
self._law_bound_pairs[for_ratelaw] = 0.
if rev_ratelaw:
if bounds[0] <= 0:
self._law_bound_pairs[rev_ratelaw] = -bounds[0]
else:
self._law_bound_pairs[rev_ratelaw] = 0.
for law, value in self._law_bound_pairs.items():
complexes = law.expression.species
if value!=0.0:
if len(complexes) == 1:
comp = complexes[0]
kcat = law.expression.parameters[0]
conc = comp.distribution_init_concentration.mean
if conc != 0.0:
if numpy.isnan(kcat.value):
kcat.value = value/conc*kcat_adjustment_factor
kcat.comments = 'Value imputed based on FVA bound value ' +\
'and adjusted with a factor of {}'.format(kcat_adjustment_factor)
elif (value - kcat.value*conc)/value > 0.01:
kcat.value = value/conc
kcat.comments = 'Measured value adjusted to relax bound'
else:
if numpy.isnan(kcat.value):
kcat.value = median_kcat
kcat.comments = 'Value imputed as the median of measured k_cat values'
elif not any(numpy.isnan(kcat.value) for kcat in law.expression.parameters):
effective_vmax = 0
for comp in complexes:
kcat = [p for p in law.expression.parameters if comp.species_type.id in p.id][0]
effective_vmax += kcat.value*comp.distribution_init_concentration.mean
if effective_vmax and (value - effective_vmax)/value > 0.01:
for kcat in law.expression.parameters:
kcat.value = kcat.value*value/effective_vmax
kcat.comments = 'Measured value adjusted to relax bound'
elif all(numpy.isnan(kcat.value) for kcat in law.expression.parameters):
expressed_comp = [comp for comp in complexes if \
comp.distribution_init_concentration.mean!=0.0]
if len(expressed_comp) > 0:
shared_vmax = value/len(expressed_comp)
for comp in complexes:
kcat = [p for p in law.expression.parameters if comp.species_type.id in p.id][0]
if comp.distribution_init_concentration.mean!=0.0:
kcat.value = shared_vmax/comp.distribution_init_concentration.mean*\
kcat_adjustment_factor
kcat.comments = 'Value imputed based on FVA bound value ' +\
'and adjusted with a factor of {}'.format(kcat_adjustment_factor)
else:
kcat.value = median_kcat
kcat.comments = 'Value imputed as the median of measured k_cat values'
else:
known_vmax = 0
comp_kcat = {}
for comp in complexes:
kcat = [p for p in law.expression.parameters if comp.species_type.id in p.id][0]
if not numpy.isnan(kcat.value):
known_vmax += kcat.value*comp.distribution_init_concentration.mean
else:
comp_kcat[comp] = kcat
if known_vmax < value and (value - known_vmax)/value > 0.01:
unknown_vmax_count = len([comp for comp, kcat in comp_kcat.items() if \
comp.distribution_init_concentration.mean!=0.0])
if unknown_vmax_count > 0:
shared_vmax = (value - known_vmax)/unknown_vmax_count
for comp, kcat in comp_kcat.items():
if comp.distribution_init_concentration.mean!=0.0:
kcat.value = shared_vmax/comp.distribution_init_concentration.mean*\
kcat_adjustment_factor
kcat.comments = 'Value imputed based on FVA bound value ' +\
'and adjusted with a factor of {}'.format(kcat_adjustment_factor)
else:
kcat.value = median_kcat
kcat.comments = 'Value imputed as the median of measured k_cat values'
else:
for comp, kcat in comp_kcat.items():
kcat.value = median_kcat
kcat.comments = 'Value imputed as the median of measured k_cat values'
else:
for comp in complexes:
kcat = [p for p in law.expression.parameters if comp.species_type.id in p.id][0]
if numpy.isnan(kcat.value):
kcat.value = median_kcat
kcat.comments = 'Value imputed as the median of measured k_cat values'