oemof/oemof-solph

View on GitHub
src/oemof/solph/flows/_simple_flow_block.py

Summary

Maintainability
F
1 wk
Test Coverage
# -*- coding: utf-8 -*-

"""Creating sets, variables, constraints and parts of the objective function
for Flow objects with neither nonconvex nor investment options.

SPDX-FileCopyrightText: Uwe Krien <krien@uni-bremen.de>
SPDX-FileCopyrightText: Simon Hilpert
SPDX-FileCopyrightText: Cord Kaldemeyer
SPDX-FileCopyrightText: Stephan Günther
SPDX-FileCopyrightText: Birgit Schachler
SPDX-FileCopyrightText: jnnr
SPDX-FileCopyrightText: jmloenneberga
SPDX-FileCopyrightText: Pierre-François Duc
SPDX-FileCopyrightText: Saeed Sayadi
SPDX-FileCopyrightText: Johannes Kochems

SPDX-License-Identifier: MIT

"""
from pyomo.core import BuildAction
from pyomo.core import Constraint
from pyomo.core import Expression
from pyomo.core import NonNegativeIntegers
from pyomo.core import NonNegativeReals
from pyomo.core import Set
from pyomo.core import Var
from pyomo.core.base.block import ScalarBlock


class SimpleFlowBlock(ScalarBlock):
    r"""Flow block with definitions for standard flows.

    See :class:`~oemof.solph.flows._flow.Flow` class for all parameters of the
    *Flow*.

    .. automethod:: _create_constraints
    .. automethod:: _create_variables
    .. automethod:: _create_sets

    .. automethod:: _objective_expression

    Note
    ----
    See the :class:`~oemof.solph.flows._flow.Flow` class for the definition of
    all parameters from the "List of Parameters above.

    """  # noqa: E501

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def _create(self, group=None):
        r"""Creates sets, variables and constraints for all standard flows.

        Parameters
        ----------
        group : list
            List containing tuples containing flow (f) objects and the
            associated source (s) and target (t)
            of flow e.g. groups=[(s1, t1, f1), (s2, t2, f2),..]
        """
        if group is None:
            return None

        self._create_sets(group)
        self._create_variables(group)
        self._create_constraints()

    def _create_sets(self, group):
        """
        Creates all sets for standard flows.
        """
        self.FULL_LOAD_TIME_MAX_FLOWS = Set(
            initialize=[
                (g[0], g[1])
                for g in group
                if g[2].full_load_time_max is not None
                and g[2].nominal_value is not None
            ]
        )

        self.FULL_LOAD_TIME_MIN_FLOWS = Set(
            initialize=[
                (g[0], g[1])
                for g in group
                if g[2].full_load_time_min is not None
                and g[2].nominal_value is not None
            ]
        )

        self.NEGATIVE_GRADIENT_FLOWS = Set(
            initialize=[
                (g[0], g[1])
                for g in group
                if g[2].negative_gradient_limit[0] is not None
            ]
        )

        self.POSITIVE_GRADIENT_FLOWS = Set(
            initialize=[
                (g[0], g[1])
                for g in group
                if g[2].positive_gradient_limit[0] is not None
            ]
        )

        self.INTEGER_FLOWS = Set(
            initialize=[(g[0], g[1]) for g in group if g[2].integer]
        )

        self.LIFETIME_FLOWS = Set(
            initialize=[
                (g[0], g[1])
                for g in group
                if g[2].lifetime is not None and g[2].age is None
            ]
        )

        self.LIFETIME_AGE_FLOWS = Set(
            initialize=[
                (g[0], g[1])
                for g in group
                if g[2].lifetime is not None and g[2].age is not None
            ]
        )

    def _create_variables(self, group):
        r"""Creates all variables for standard flows.

        All *Flow* objects are indexed by a starting and ending node
        :math:`(i, o)`, which is omitted in the following for the sake of
        convenience. The creation of some variables depend on the values of
        *Flow* attributes. The following variables are created:

        * :math:`P(p, t)`
            Actual flow value (created in :class:`~oemof.solph._models.Model`).
            The variable is bound to:
            :math:`f_\mathrm{min}(t) \cdot P_\mathrm{nom}
            \le P(p, t)
            \le f_\mathrm{max}(t) \cdot P_\mathrm{nom}`.

            If `Flow.fix` is not None the variable is bound to
            :math:`P(p, t) = f_\mathrm{fix}(t) \cdot P_\mathrm{nom}`.

        * :math:`ve_n` (`Flow.negative_gradient` is not `None`)
            Difference of a flow in consecutive timesteps if flow is reduced.
            The variable is bound to: :math:`0 \ge ve_n \ge ve_n^{max}`.

        * :math:`ve_p` (`Flow.positive_gradient` is not `None`)
            Difference of a flow in consecutive timesteps if flow is increased.
            The variable is bound to: :math:`0 \ge ve_p \ge ve_p^{max}`.

        The following variable is build for Flows with the attribute
        `integer_flows` being not None.

        * :math:`i` (`Flow.integer` is `True`)
            All flow values are integers. Variable is bound to non-negative
            integers.
        """
        m = self.parent_block()

        self.positive_gradient = Var(
            self.POSITIVE_GRADIENT_FLOWS, m.TIMESTEPS, within=NonNegativeReals
        )

        self.negative_gradient = Var(
            self.NEGATIVE_GRADIENT_FLOWS, m.TIMESTEPS, within=NonNegativeReals
        )

        self.integer_flow = Var(
            self.INTEGER_FLOWS, m.TIMESTEPS, within=NonNegativeIntegers
        )
        # set upper bound of gradient variable
        for i, o, f in group:
            if m.flows[i, o].positive_gradient_limit[0] is not None:
                for t in m.TIMESTEPS:
                    self.positive_gradient[i, o, t].setub(
                        f.positive_gradient_limit[t] * f.nominal_value
                    )
            if m.flows[i, o].negative_gradient_limit[0] is not None:
                for t in m.TIMESTEPS:
                    self.negative_gradient[i, o, t].setub(
                        f.negative_gradient_limit[t] * f.nominal_value
                    )

    def _create_constraints(self):
        r"""Creates all constraints for standard flows.

        The following constraints are created, if the appropriate attribute of
        the *Flow* (see :class:`~oemof.solph.flows._flow.Flow`) object is set:

        * `Flow.full_load_time_max` is not `None` (full_load_time_max_constr):
            .. math::
                \sum_t P(t) \cdot \tau \leq F_{max} \cdot P_{nom}

        * `Flow.full_load_time_min` is not `None` (full_load_time_min_constr):
            .. math::
                \sum_t P(t) \cdot \tau \geq F_{min} \cdot P_{nom}

        * `Flow.negative_gradient` is not `None` (negative_gradient_constr):
            .. math::
              P(t-1) - P(t) \geq ve_n(t)

        * `Flow.positive_gradient` is not `None` (positive_gradient_constr):
            .. math::
              P(t) - P(t-1) \geq ve_p(t)

        * `Flow.integer` is `True`
            .. math::
              P(t) = i(t)
        """
        m = self.parent_block()

        def _flow_full_load_time_max_rule(model):
            """Rule definition for build action of max. sum flow constraint."""
            for inp, out in self.FULL_LOAD_TIME_MAX_FLOWS:
                lhs = sum(
                    m.flow[inp, out, p, ts] * m.timeincrement[ts]
                    for p, ts in m.TIMEINDEX
                )
                rhs = (
                    m.flows[inp, out].full_load_time_max
                    * m.flows[inp, out].nominal_value
                )
                self.full_load_time_max_constr.add((inp, out), lhs <= rhs)

        self.full_load_time_max_constr = Constraint(
            self.FULL_LOAD_TIME_MAX_FLOWS, noruleinit=True
        )
        self.full_load_time_max_build = BuildAction(
            rule=_flow_full_load_time_max_rule
        )

        def _flow_full_load_time_min_rule(model):
            """Rule definition for build action of min. sum flow constraint."""
            for inp, out in self.FULL_LOAD_TIME_MIN_FLOWS:
                lhs = sum(
                    m.flow[inp, out, p, ts] * m.timeincrement[ts]
                    for p, ts in m.TIMEINDEX
                )
                rhs = (
                    m.flows[inp, out].full_load_time_min
                    * m.flows[inp, out].nominal_value
                )
                self.full_load_time_min_constr.add((inp, out), lhs >= rhs)

        self.full_load_time_min_constr = Constraint(
            self.FULL_LOAD_TIME_MIN_FLOWS, noruleinit=True
        )
        self.full_load_time_min_build = BuildAction(
            rule=_flow_full_load_time_min_rule
        )

        def _positive_gradient_flow_rule(model):
            """Rule definition for positive gradient constraint."""
            for inp, out in self.POSITIVE_GRADIENT_FLOWS:
                for index in range(1, len(m.TIMEINDEX) + 1):
                    if m.TIMEINDEX.at(index)[1] > 0:
                        lhs = (
                            m.flow[
                                inp,
                                out,
                                m.TIMEINDEX.at(index)[0],
                                m.TIMEINDEX.at(index)[1],
                            ]
                            - m.flow[
                                inp,
                                out,
                                m.TIMEINDEX.at(index - 1)[0],
                                m.TIMEINDEX.at(index - 1)[1],
                            ]
                        )
                        rhs = self.positive_gradient[
                            inp, out, m.TIMEINDEX.at(index)[1]
                        ]
                        self.positive_gradient_constr.add(
                            (
                                inp,
                                out,
                                m.TIMEINDEX.at(index)[0],
                                m.TIMEINDEX.at(index)[1],
                            ),
                            lhs <= rhs,
                        )
                    else:
                        lhs = self.positive_gradient[inp, out, 0]
                        rhs = 0
                        self.positive_gradient_constr.add(
                            (
                                inp,
                                out,
                                m.TIMEINDEX.at(index)[0],
                                m.TIMEINDEX.at(index)[1],
                            ),
                            lhs == rhs,
                        )

        self.positive_gradient_constr = Constraint(
            self.POSITIVE_GRADIENT_FLOWS, m.TIMEINDEX, noruleinit=True
        )
        self.positive_gradient_build = BuildAction(
            rule=_positive_gradient_flow_rule
        )

        def _negative_gradient_flow_rule(model):
            """Rule definition for negative gradient constraint."""
            for inp, out in self.NEGATIVE_GRADIENT_FLOWS:
                for index in range(1, len(m.TIMEINDEX) + 1):
                    if m.TIMEINDEX.at(index)[1] > 0:
                        lhs = (
                            m.flow[
                                inp,
                                out,
                                m.TIMEINDEX.at(index - 1)[0],
                                m.TIMEINDEX.at(index - 1)[1],
                            ]
                            - m.flow[
                                inp,
                                out,
                                m.TIMEINDEX.at(index)[0],
                                m.TIMEINDEX.at(index)[1],
                            ]
                        )
                        rhs = self.negative_gradient[
                            inp, out, m.TIMEINDEX.at(index)[1]
                        ]
                        self.negative_gradient_constr.add(
                            (
                                inp,
                                out,
                                m.TIMEINDEX.at(index)[0],
                                m.TIMEINDEX.at(index)[1],
                            ),
                            lhs <= rhs,
                        )
                    else:
                        lhs = self.negative_gradient[inp, out, 0]
                        rhs = 0
                        self.negative_gradient_constr.add(
                            (
                                inp,
                                out,
                                m.TIMEINDEX.at(index)[0],
                                m.TIMEINDEX.at(index)[1],
                            ),
                            lhs == rhs,
                        )

        self.negative_gradient_constr = Constraint(
            self.NEGATIVE_GRADIENT_FLOWS, m.TIMEINDEX, noruleinit=True
        )
        self.negative_gradient_build = BuildAction(
            rule=_negative_gradient_flow_rule
        )

        def _integer_flow_rule(block, ii, oi, pi, ti):
            """Force flow variable to NonNegativeInteger values."""
            return self.integer_flow[ii, oi, ti] == m.flow[ii, oi, pi, ti]

        self.integer_flow_constr = Constraint(
            self.INTEGER_FLOWS, m.TIMEINDEX, rule=_integer_flow_rule
        )

        if m.es.periods is not None:

            def _lifetime_output_rule(block):
                """Force flow value to zero when lifetime is reached"""
                for inp, out in self.LIFETIME_FLOWS:
                    for p, ts in m.TIMEINDEX:
                        if m.flows[inp, out].lifetime <= m.es.periods_years[p]:
                            lhs = m.flow[inp, out, p, ts]
                            rhs = 0
                            self.lifetime_output.add(
                                (inp, out, p, ts), (lhs == rhs)
                            )

            self.lifetime_output = Constraint(
                self.LIFETIME_FLOWS, m.TIMEINDEX, noruleinit=True
            )
            self.lifetime_output_build = BuildAction(
                rule=_lifetime_output_rule
            )

            def _lifetime_age_output_rule(block):
                """Force flow value to zero when lifetime is reached
                considering initial age
                """
                for inp, out in self.LIFETIME_AGE_FLOWS:
                    for p, ts in m.TIMEINDEX:
                        if (
                            m.flows[inp, out].lifetime - m.flows[inp, out].age
                            <= m.es.periods_years[p]
                        ):
                            lhs = m.flow[inp, out, p, ts]
                            rhs = 0
                            self.lifetime_age_output.add(
                                (inp, out, p, ts), (lhs == rhs)
                            )

            self.lifetime_age_output = Constraint(
                self.LIFETIME_AGE_FLOWS, m.TIMEINDEX, noruleinit=True
            )
            self.lifetime_age_output_build = BuildAction(
                rule=_lifetime_age_output_rule
            )

    def _objective_expression(self):
        r"""Objective expression for all standard flows with fixed costs
        and variable costs.

        Depending on the attributes of the `Flow` object the following parts of
        the objective function are created for a standard model:

        * `Flow.variable_costs` is not `None`:
            .. math::
              \sum_{(i,o)} \sum_t P(t) \cdot w(t) \cdot c_{var}(i, o, t)

        where :math:`w(t)` is the objective weighting.

        In a multi-period model, in contrast, the following parts of
        the objective function are created:

        * `Flow.variable_costs` is not `None`:
            .. math::
              \sum_{(i,o)} \sum_{p, t} P(p, t) \cdot w(t)
              \cdot c_{var}(i, o, t)

        * `Flow.fixed_costs` is not `None` and flow has no lifetime limit
            .. math::
              \sum_{(i,o)} \displaystyle \sum_{pp=0}^{year_{max}}
              P_{nominal} \cdot c_{fixed}(i, o, pp) \cdot DF^{-pp}

        * `Flow.fixed_costs` is not `None` and flow has a lifetime limit,
           but not an initial age
            .. math::
              \sum_{(i,o)} \displaystyle \sum_{pp=0}^{limit_{exo}}
              P_{nominal} \cdot c_{fixed}(i, o, pp) \cdot DF^{-pp}

        * `Flow.fixed_costs` is not `None` and flow has a lifetime limit,
           and an initial age
            .. math::
              \sum_{(i,o)} \displaystyle \sum_{pp=0}^{limit_{exo}} P_{nominal}
              \cdot c_{fixed}(i, o, pp) \cdot DF^{-pp}

        Hereby

        * :math:`DF(p) = (1 + dr)` is the discount factor for period :math:`p`
          and :math:`dr` is the discount rate.
        * :math:`n` is the unit lifetime and :math:`a` is the initial age.
        * :math:`year_{max}` denotes the last year of the optimization
          horizon, i.e. at the end of the last period.
        * :math:`limit_{exo}=min\{year_{max}, n - a\}` is used as an
          upper bound to ensure fixed costs for existing capacities to occur
          within the optimization horizon. :math:`a` is the initial age
          of an asset (or 0 if not specified).
        """
        m = self.parent_block()

        variable_costs = 0
        fixed_costs = 0

        if m.es.periods is None:
            for i, o in m.FLOWS:
                if m.flows[i, o].variable_costs[0] is not None:
                    for p, t in m.TIMEINDEX:
                        variable_costs += (
                            m.flow[i, o, p, t]
                            * m.objective_weighting[t]
                            * m.flows[i, o].variable_costs[t]
                        )

        else:
            for i, o in m.FLOWS:
                if m.flows[i, o].variable_costs[0] is not None:
                    for p, t in m.TIMEINDEX:
                        variable_costs += (
                            m.flow[i, o, p, t]
                            * m.objective_weighting[t]
                            * m.flows[i, o].variable_costs[t]
                            * ((1 + m.discount_rate) ** -m.es.periods_years[p])
                        )

                # Fixed costs for units with no lifetime limit
                if (
                    m.flows[i, o].fixed_costs[0] is not None
                    and m.flows[i, o].nominal_value is not None
                    and (i, o) not in self.LIFETIME_FLOWS
                    and (i, o) not in self.LIFETIME_AGE_FLOWS
                ):
                    fixed_costs += sum(
                        m.flows[i, o].nominal_value
                        * m.flows[i, o].fixed_costs[pp]
                        * ((1 + m.discount_rate) ** (-pp))
                        for pp in range(m.es.end_year_of_optimization)
                    )

            # Fixed costs for units with limited lifetime
            for i, o in self.LIFETIME_FLOWS:
                if m.flows[i, o].fixed_costs[0] is not None:
                    range_limit = min(
                        m.es.end_year_of_optimization,
                        m.flows[i, o].lifetime,
                    )
                    fixed_costs += sum(
                        m.flows[i, o].nominal_value
                        * m.flows[i, o].fixed_costs[pp]
                        * ((1 + m.discount_rate) ** (-pp))
                        for pp in range(range_limit)
                    )

            for i, o in self.LIFETIME_AGE_FLOWS:
                if m.flows[i, o].fixed_costs[0] is not None:
                    range_limit = min(
                        m.es.end_year_of_optimization,
                        m.flows[i, o].lifetime - m.flows[i, o].age,
                    )
                    fixed_costs += sum(
                        m.flows[i, o].nominal_value
                        * m.flows[i, o].fixed_costs[pp]
                        * ((1 + m.discount_rate) ** (-pp))
                        for pp in range(range_limit)
                    )

        self.variable_costs = Expression(expr=variable_costs)
        self.fixed_costs = Expression(expr=fixed_costs)
        self.costs = Expression(expr=variable_costs + fixed_costs)

        return self.costs