Source code for calliope.backend.pyomo.constraints.group
"""
Copyright (C) since 2013 Calliope contributors listed in AUTHORS.
Licensed under the Apache 2.0 License (see LICENSE file).
group.py
~~~~~~~~
Group constraints.
"""
import logging
import numpy as np
import pyomo.core as po # pylint: disable=import-error
from calliope.backend.pyomo.util import (
loc_tech_is_in,
get_param,
invalid,
get_timestep_weight,
)
logger = logging.getLogger(__name__)
ORDER = 20 # order in which to invoke constraints relative to other constraint files
def return_noconstraint(*args):
logger.debug("group constraint returned NoConstraint: {}".format(",".join(args)))
return po.Constraint.NoConstraint
def load_constraints(backend_model):
model_data_dict = backend_model.__calliope_model_data["data"]
for sense in ["min", "max", "equals"]:
if "group_energy_cap_share_{}".format(sense) in model_data_dict:
setattr(
backend_model,
"group_energy_cap_share_{}_constraint".format(sense),
po.Constraint(
getattr(
backend_model, "group_names_energy_cap_share_{}".format(sense)
),
[sense],
rule=energy_cap_share_constraint_rule,
),
)
if "group_energy_cap_{}".format(sense) in model_data_dict:
setattr(
backend_model,
"group_energy_cap_{}_constraint".format(sense),
po.Constraint(
getattr(backend_model, "group_names_energy_cap_{}".format(sense)),
[sense],
rule=energy_cap_constraint_rule,
),
)
if "group_resource_area_{}".format(sense) in model_data_dict:
setattr(
backend_model,
"group_resource_area_{}_constraint".format(sense),
po.Constraint(
getattr(
backend_model, "group_names_resource_area_{}".format(sense)
),
[sense],
rule=resource_area_constraint_rule,
),
)
if "group_carrier_prod_{}".format(sense) in model_data_dict:
setattr(
backend_model,
"group_carrier_prod_{}_constraint".format(sense),
po.Constraint(
getattr(backend_model, "group_names_carrier_prod_{}".format(sense)),
[sense],
rule=carrier_prod_constraint_rule,
),
)
if "group_carrier_con_{}".format(sense) in model_data_dict:
setattr(
backend_model,
"group_carrier_con_{}_constraint".format(sense),
po.Constraint(
getattr(backend_model, "group_names_carrier_con_{}".format(sense)),
[sense],
rule=carrier_con_constraint_rule,
),
)
if "group_demand_share_{}".format(sense) in model_data_dict:
setattr(
backend_model,
"group_demand_share_{}_constraint".format(sense),
po.Constraint(
getattr(backend_model, "group_names_demand_share_{}".format(sense)),
[sense],
rule=demand_share_constraint_rule,
),
)
if "group_demand_share_per_timestep_{}".format(sense) in model_data_dict:
setattr(
backend_model,
"group_demand_share_per_timestep_{}_constraint".format(sense),
po.Constraint(
getattr(
backend_model,
"group_names_demand_share_per_timestep_{}".format(sense),
),
backend_model.timesteps,
[sense],
rule=demand_share_per_timestep_constraint_rule,
),
)
if "group_carrier_prod_share_{}".format(sense) in model_data_dict:
setattr(
backend_model,
"group_carrier_prod_share_{}_constraint".format(sense),
po.Constraint(
getattr(
backend_model, "group_names_carrier_prod_share_{}".format(sense)
),
[sense],
rule=carrier_prod_share_constraint_rule,
),
)
if "group_carrier_prod_share_per_timestep_{}".format(sense) in model_data_dict:
setattr(
backend_model,
"group_carrier_prod_share_per_timestep_{}_constraint".format(sense),
po.Constraint(
getattr(
backend_model,
"group_names_carrier_prod_share_per_timestep_{}".format(sense),
),
backend_model.timesteps,
[sense],
rule=carrier_prod_share_per_timestep_constraint_rule,
),
)
if "group_net_import_share_{}".format(sense) in model_data_dict:
setattr(
backend_model,
"group_net_import_share_{}_constraint".format(sense),
po.Constraint(
getattr(
backend_model, "group_names_net_import_share_{}".format(sense)
),
[sense],
rule=net_import_share_constraint_rule,
),
)
if "group_cost_{}".format(sense) in model_data_dict:
setattr(
backend_model,
"group_cost_{}_constraint".format(sense),
po.Constraint(
getattr(backend_model, "group_names_cost_{}".format(sense)),
backend_model.costs,
[sense],
rule=cost_cap_constraint_rule,
),
)
if "group_cost_var_{}".format(sense) in model_data_dict:
setattr(
backend_model,
"group_cost_var_{}_constraint".format(sense),
po.Constraint(
getattr(backend_model, "group_names_cost_var_{}".format(sense)),
backend_model.costs,
[sense],
rule=cost_var_cap_constraint_rule,
),
)
if "group_cost_investment_{}".format(sense) in model_data_dict:
setattr(
backend_model,
"group_cost_investment_{}_constraint".format(sense),
po.Constraint(
getattr(
backend_model, "group_names_cost_investment_{}".format(sense)
),
backend_model.costs,
[sense],
rule=cost_investment_cap_constraint_rule,
),
)
if "group_demand_share_per_timestep_decision" in model_data_dict:
backend_model.group_demand_share_per_timestep_decision_main_constraint = (
po.Constraint(
backend_model.group_names_demand_share_per_timestep_decision,
backend_model.loc_tech_carriers_demand_share_per_timestep_lhs,
backend_model.timesteps,
rule=demand_share_per_timestep_decision_main_constraint_rule,
)
)
backend_model.group_demand_share_per_timestep_decision_sum_constraint = (
po.Constraint(
backend_model.group_names_demand_share_per_timestep_decision,
rule=demand_share_per_timestep_decision_sum_constraint_rule,
)
)
def equalizer(lhs, rhs, sign):
if sign == "max":
return lhs <= rhs
elif sign == "min":
return lhs >= rhs
elif sign == "equals":
return lhs == rhs
else:
raise ValueError("Invalid sign: {}".format(sign))
def get_group_lhs_loc_tech_carriers(backend_model, group_name):
return getattr(
backend_model, "group_constraint_loc_tech_carriers_{}".format(group_name)
)
def get_group_lhs_and_rhs_loc_tech_carriers(backend_model, group_name):
lhs_loc_tech_carriers = getattr(
backend_model, "group_constraint_loc_tech_carriers_{}_lhs".format(group_name)
)
rhs_loc_tech_carriers = getattr(
backend_model, "group_constraint_loc_tech_carriers_{}_rhs".format(group_name)
)
return lhs_loc_tech_carriers, rhs_loc_tech_carriers
[docs]def carrier_prod_constraint_rule(backend_model, group_name, what):
"""
Enforces carrier_prod for groups of technologies and locations,
as a sum over the entire model period.
.. container:: scrolling-wrapper
.. math::
\\sum_{loc::tech::carrier \\in given\\_group, timestep \\in timesteps} carrier_{prod}(loc::tech::carrier, timestep) \\leq carrier_prod_max
"""
limit = get_param(backend_model, f"group_carrier_prod_{what}", (group_name))
if invalid(limit):
return return_noconstraint("carrier_prod", group_name)
else:
lhs_loc_tech_carriers = get_group_lhs_loc_tech_carriers(
backend_model, group_name
)
lhs = sum(
backend_model.carrier_prod[loc_tech_carrier, timestep]
for loc_tech_carrier in lhs_loc_tech_carriers
for timestep in backend_model.timesteps
)
return equalizer(lhs, limit, what)
[docs]def carrier_con_constraint_rule(backend_model, constraint_group, what):
"""
Enforces carrier_con for groups of technologies and locations,
as a sum over the entire model period. limits are always negative, so min/max
is relative to zero (i.e. min = -1 means carrier_con must be -1 or less)
.. container:: scrolling-wrapper
.. math::
\\sum_{loc::tech::carrier \\in given\\_group, timestep \\in timesteps} carrier_{con}(loc::tech::carrier, timestep) \\geq carrier_con_max
"""
limit = get_param(
backend_model, "group_carrier_con_{}".format(what), (constraint_group)
)
if invalid(limit):
return return_noconstraint("carrier_con", constraint_group)
else:
lhs_loc_tech_carriers = get_group_lhs_loc_tech_carriers(
backend_model, constraint_group
)
lhs = sum(
backend_model.carrier_con[loc_tech_carrier, timestep]
for loc_tech_carrier in lhs_loc_tech_carriers
for timestep in backend_model.timesteps
)
return equalizer(limit, lhs, what)
[docs]def energy_cap_constraint_rule(backend_model, constraint_group, what):
"""
Enforce upper and lower bounds for energy_cap of energy_cap
for groups of technologies and locations.
.. container:: scrolling-wrapper
.. math::
\\sum_{loc::tech \\in given\\_group} energy_{cap}(loc::tech) \\leq energy\\_cap\\_max\\\\
\\sum_{loc::tech \\in given\\_group} energy_{cap}(loc::tech) \\geq energy\\_cap\\_min
"""
threshold = get_param(
backend_model, "group_energy_cap_{}".format(what), (constraint_group)
)
if invalid(threshold):
return return_noconstraint("energy_cap", constraint_group)
else:
lhs_loc_techs = getattr(
backend_model, "group_constraint_loc_techs_{}".format(constraint_group)
)
# Transmission techs only contribute half their capacity in each direction
lhs = []
for loc_tech in lhs_loc_techs:
if loc_tech_is_in(backend_model, loc_tech, "loc_techs_transmission"):
weight = 0.5
else:
weight = 1
lhs.append(weight * backend_model.energy_cap[loc_tech])
rhs = threshold
return equalizer(sum(lhs), rhs, what)
[docs]def storage_cap_constraint_rule(backend_model, constraint_group, what):
"""
Enforce upper and lower bounds for storage_cap of storage_cap
for groups of technologies and locations.
.. container:: scrolling-wrapper
.. math::
\\sum_{loc::tech \\in given\\_group} storage_{cap}(loc::tech) \\leq storage\\_cap\\_max\\\\
\\sum_{loc::tech \\in given\\_group} storage_{cap}(loc::tech) \\geq storage\\_cap\\_min
"""
threshold = get_param(
backend_model, "group_storage_cap_{}".format(what), (constraint_group)
)
if invalid(threshold):
return return_noconstraint("storage_cap", constraint_group)
else:
lhs_loc_techs = getattr(
backend_model, "group_constraint_loc_techs_{}".format(constraint_group)
)
lhs = sum(backend_model.storage_cap[loc_tech] for loc_tech in lhs_loc_techs)
rhs = threshold
return equalizer(lhs, rhs, what)
[docs]def cost_cap_constraint_rule(backend_model, group_name, cost, what):
"""
Limit cost for a specific cost class to a certain value,
i.e. Ɛ-constrained costs,
for groups of technologies and locations.
.. container:: scrolling-wrapper
.. math::
\\sum{loc::tech \\in loc\\_techs_{group\\_name}, timestep \\in timesteps}
\\boldsymbol{cost}(cost, loc::tech, timestep)
\\begin{cases}
\\leq cost\\_max(cost)
\\geq cost\\_min(cost)
= cost\\_equals(cost)
\\end{cases}
"""
cost_cap = get_param(
backend_model, "group_cost_{}".format(what), (cost, group_name)
)
if invalid(cost_cap):
return return_noconstraint("cost_cap", group_name)
else:
loc_techs = [
i
for i in getattr(
backend_model, "group_constraint_loc_techs_{}".format(group_name)
)
if i in backend_model.loc_techs_cost
]
sum_cost = sum(backend_model.cost[cost, loc_tech] for loc_tech in loc_techs)
return equalizer(sum_cost, cost_cap, what)
[docs]def cost_investment_cap_constraint_rule(backend_model, group_name, cost, what):
"""
Limit investment costs specific to a cost class to a
certain value, i.e. Ɛ-constrained costs,
for groups of technologies and locations.
.. container:: scrolling-wrapper
.. math::
\\sum{loc::tech \\in loc\\_techs_{group\\_name}, timestep \\in timesteps}
\\boldsymbol{cost\\_{investment}}(cost, loc::tech, timestep)
\\begin{cases}
\\leq cost\\_investment\\_max(cost)
\\geq cost\\_investment\\_min(cost)
= cost\\_investment\\_equals(cost)
\\end{cases}
"""
cost_cap = get_param(
backend_model, "group_cost_investment_{}".format(what), (cost, group_name)
)
if invalid(cost_cap):
return return_noconstraint("cost_investment_cap", group_name)
else:
loc_techs = [
i
for i in getattr(
backend_model, "group_constraint_loc_techs_{}".format(group_name)
)
if i in backend_model.loc_techs_investment_cost
]
sum_cost = sum(
backend_model.cost_investment[cost, loc_tech] for loc_tech in loc_techs
)
return equalizer(sum_cost, cost_cap, what)
[docs]def cost_var_cap_constraint_rule(backend_model, group_name, cost, what):
"""
Limit variable costs specific to a cost class
to a certain value, i.e. Ɛ-constrained costs,
for groups of technologies and locations.
.. container:: scrolling-wrapper
.. math::
\\sum{loc::tech \\in loc\\_techs_{group\\_name}, timestep \\in timesteps}
\\boldsymbol{cost\\_{var}}(cost, loc::tech, timestep)
\\begin{cases}
\\leq cost\\_var\\_max(cost)
\\geq cost\\_var\\_min(cost)
= cost\\_var\\_equals(cost)
\\end{cases}
"""
cost_cap = get_param(
backend_model, "group_cost_var_{}".format(what), (cost, group_name)
)
if invalid(cost_cap):
return return_noconstraint("cost_var_cap", group_name)
else:
loc_techs = [
i
for i in getattr(
backend_model, "group_constraint_loc_techs_{}".format(group_name)
)
if i in backend_model.loc_techs_om_cost
]
sum_cost = sum(
backend_model.cost_var[cost, loc_tech, timestep]
for loc_tech in loc_techs
for timestep in backend_model.timesteps
)
return equalizer(sum_cost, cost_cap, what)
[docs]def resource_area_constraint_rule(backend_model, constraint_group, what):
"""
Enforce upper and lower bounds of resource_area for groups of
technologies and locations.
.. container:: scrolling-wrapper
.. math::
\\boldsymbol{resource_{area}}(loc::tech) \\leq group\\_resource\\_area\\_max\\\\
\\boldsymbol{resource_{area}}(loc::tech) \\geq group\\_resource\\_area\\_min
"""
threshold = get_param(
backend_model, "group_resource_area_{}".format(what), (constraint_group)
)
if invalid(threshold):
return return_noconstraint("resource_area", constraint_group)
else:
lhs_loc_techs = getattr(
backend_model, "group_constraint_loc_techs_{}".format(constraint_group)
)
lhs = sum(backend_model.resource_area[loc_tech] for loc_tech in lhs_loc_techs)
rhs = threshold
return equalizer(lhs, rhs, what)