Source code for calliope.analysis
"""
Copyright (C) 2013-2017 Calliope contributors listed in AUTHORS.
Licensed under the Apache 2.0 License (see LICENSE file).
analysis.py
~~~~~~~~~~~
Functionality to analyze model results.
"""
import logging
try:
from matplotlib.patches import Rectangle
from matplotlib.colors import ListedColormap
except ImportError:
logging.debug('Matplotlib could not be imported, '
'no plotting will be available.')
import numpy as np
import pandas as pd
from . import analysis_utils as au
[docs]def plot_carrier_production(solution, carrier='power', subset=dict(),
**kwargs):
"""
Generate a stackplot of the production by the given ``carrier``.
Parameters
----------
solution : model solution xarray.Dataset
carrier : str, optional
Name of the carrier to plot, default 'power'.
subset : dict, optional
Specify an additional subset of Dataset coordinates, for example,
dict(t=slice('2005-02-01', '2005-02-10').
**kwargs : optional
Passed to ``plot_timeseries``.
"""
data = solution['e'].loc[dict(c=carrier, **subset)].sum(dim='x')
return plot_timeseries(solution, data, carrier=carrier, demand='demand_{}'.format(carrier), **kwargs)
[docs]def plot_timeseries(
solution, data, carrier='power', demand='demand_power',
tech_types=[
'supply', 'supply_plus', 'conversion',
'conversion_plus', 'storage', 'unmet_demand'],
colormap=None, ticks=None,
resample_options=None, resample_func=None,
add_legend=True, ax=None):
"""
Generate a stackplot of ``data`` for the given ``carrier``,
plotting ``demand`` on top.
Use ``plot_carrier_production`` for a simpler way to plot production by
a given carrier.
Parameters
----------
solution : model solution xarray.Dataset
data : xarray.Dataset
Subset of solution to plot.
carrier : str, optional
Name of the carrier to plot, default 'power'.
demand : str, optional
Name of a demand tech whose time series to plot on top,
default 'demand_power'.
tech_types : list, optional
Technology types to include in the plot. Default list is
['supply', 'supply_plus' , 'conversion', 'storage', 'unmet_demand'].
colormap : matplotlib colormap, optional
Colormap to use. If not given, the colors specified for each
technology in the solution's metadata are used.
ticks : str, optional
Where to draw x-axis (time axis) ticks. By default (None),
auto-detects, but can manually set to either 'hourly', 'daily',
or 'monthly'.
resample_options : dict, optional
Give options for pandas.DataFrame.resample in a dict, to resample
the entire time series prior to plotting. Both resample_options
and resample_func must be given for resampling to happen.
Default None.
resample_func : string, optional
Give the name of the aggregating function to use when resampling,
e.g. "mean" or "sum". Default None.
"""
# Determine ticks
if not ticks:
days = (data.coords['t'].values[-1] - data.coords['t'].values[0])
timespan = days / np.timedelta64(1, 'D')
# timespan = days.astype('timedelta64[D]').astype(int)
if timespan <= 2:
ticks = 'hourly'
elif timespan < 14:
ticks = 'daily'
else:
ticks = 'monthly'
# Set up time series to plot, dividing it by time_res_series
time_res = solution['time_res'].to_pandas()
data = data.to_pandas().T
plot_df = data.divide(time_res, axis='index')
if resample_options and resample_func:
plot_df = getattr(plot_df.resample(**resample_options), resample_func)()
df_metadata = solution['metadata'].to_pandas()
# Get pandas DataFrame based on carrier being plotted
df = df_metadata.query('carrier_in == "{}" or '
'carrier_out == "{}"'.format(carrier, carrier))
# Specifically add pertinent conversion_plus technologies to df
if 'conversion_plus' in tech_types:
df_cp = df_metadata.query('type == "conversion_plus"')
for y, columns in df_cp.iterrows():
if carrier in columns.carrier_in or carrier in columns.carrier_out:
df = df.append(df_metadata.loc[y])
df.drop_duplicates(inplace=True)
# Get tech stack and names
stacked_techs = df_metadata.query('type in {}'.format(tech_types)).index.tolist()
# Put stack in order according to stack_weights
weighted = df.stack_weight.sort_values(ascending=False).index.tolist()
stacked_techs = [y for y in weighted if y in stacked_techs]
names = [df.at[y, 'name'] for y in stacked_techs]
# If no colormap given, derive one from colors given in metadata
if not colormap:
colors = [df.at[i, 'color'] for i in stacked_techs]
colormap = ListedColormap(colors)
# Plot!
if ax is None:
ax = au.stack_plot(plot_df, stacked_techs, colormap=colormap,
alpha=0.9, ticks=ticks, legend=None, names=names)
else:
au.stack_plot(plot_df, stacked_techs, colormap=colormap,
alpha=0.9, ticks=ticks, legend=None, names=names,
ax=ax)
ax.plot(plot_df[demand].index,
plot_df[demand] * -1,
color='red', lw=1.5, ls='--', label=df.at[demand, 'name'])
# Add legend here rather than in stack_plot so we get demand too
if add_legend:
au.legend_outside_ax(ax, where='right')
return ax
[docs]def plot_installed_capacities(
solution,
tech_types=[
'supply', 'supply_plus', 'conversion',
'conversion_plus', 'storage'
],
unit_multiplier=1.0,
unit_label='kW',
**kwargs):
"""
Plot installed capacities (``e_cap``) with a bar plot.
Parameters
----------
solution : model solution xarray.Dataset
tech_types : list, optional
Technology types to include in the plot. Default is
['supply', 'supply_plus', 'conversion', 'conversion_plus', 'storage']
unit_multiplier : float or int, optional
Multiply installed capacities by this value for plotting.
Defaults to 1.0
unit_label : str, optional
Label for capacity values. Default is 'kW', adjust this
when changing ``unit_multiplier``.
**kwargs : optional
are passed to ``pandas.DataFrame.plot()``
"""
query_string = au._get_query_string(tech_types)
md = solution.metadata.to_pandas()
supply_cap = md.query(query_string).index.tolist()
df = solution['e_cap'].loc[dict(y=supply_cap)].to_pandas()
weighted = md.stack_weight.sort_values(ascending=False).index.tolist()
stacked_techs = [y for y in weighted if y in df.columns]
df = df.loc[:, stacked_techs] * unit_multiplier
names = [md.at[y, 'name'] for y in df.columns]
colors = [md.at[i, 'color'] for i in df.columns]
colormap = ListedColormap(colors)
proxies = [Rectangle((0, 0), 1, 1, fc=i)
for i in colors]
# Order the locations nicely, but only take those locations that actually
# exists in the current solution
if ('metadata' in solution.config_model and
'location_ordering' in solution.config_model.metadata):
meta_config = solution.config_model.metadata
for index, item in enumerate(meta_config.location_ordering):
if item in df.index:
df.at[item, 'ordering'] = index
df = df.sort_values(by='ordering', ascending=False)
df = df.drop('ordering', axis=1)
ax = df.plot(kind='barh', stacked=True, legend=False, colormap=colormap,
**kwargs)
leg = au.legend_outside_ax(ax, where='right', artists=proxies, labels=names)
ylab = ax.set_ylabel('')
xlab = ax.set_xlabel('Installed capacity ({})'.format(unit_label))
return ax
[docs]def plot_transmission(solution, tech='ac_transmission', carrier='power',
labels='utilization',
figsize=(15, 15), fontsize=9,
show_scale=True,
ax=None,
**kwargs):
"""
Plot transmission links on a map. Requires that model metadata have
been defined with a lat/lon for each model location and a boundary for
the map display.
Requires Basemap and NetworkX to be installed.
Parameters
----------
solution : solution container
tech : str, default 'ac_transmission'
Which transmission technology to plot.
carrier : str, default 'power'
Which carrier to plot transmission for.
labels : str, default 'utilization'
Determines how transmission links are labeled, either
`transmission` or `utilization`.
figsize : (int, int), default (15, 15)
Size of resulting figure.
fontsize : int, default 9
Font size of figure labels.
show_scale : bool, default True
Plot a distance scale on the map.
ax : matplotlib axes, default None
**kwargs : are passed to ``analysis_utils.plot_graph_on_map()``
"""
import networkx as nx
# Determine maximum that could have been transmitted across a link
def get_edge_capacity(solution, a, b):
hrs = solution['time_res'].to_pandas().sum()
cap = (solution['e_cap_net'].loc[dict(x=a, y='{}:'.format(tech) + b)]
.to_pandas() * hrs)
return cap
# Get annual power transmission between zones
zones = sorted(list(solution.coords['x'].values))
trans_tech = lambda x: '{}:{}'.format(tech, x)
def get_annual_power_transmission(zone):
try:
return (solution['e'].loc[dict(c=carrier, y=trans_tech(zone))]
.sum(dim='t').to_pandas())
except KeyError:
return 0
df = pd.DataFrame({zone: get_annual_power_transmission(zone)
for zone in zones}, index=zones)
# Set negative and very small (erroneous) values to zero --
# Positive values = energy received at a node by transmission,
# negative values = energy sent by transmission.
# The absolute value sent and recieved between two nodes will be
# different when there is line loss (i.e. negative values will have a
# larger absolute value than the positive value for the same transmission
# line), which is why we take the recieving (positive) value.
df[df < df.max().max()*1e-10] = 0
# Create directed graph
G = nx.from_numpy_matrix(df.as_matrix().T, create_using=nx.DiGraph())
G = nx.relabel_nodes(G, dict(list(zip(list(range(len(zones))), zones))))
# Transmission
edge_transmission = {edge: int(round(df.at[edge[1], edge[0]] / 1e6))
for edge in G.edges()}
# Utilization ratio
edge_use = {(a, b): (df.at[a, b] + df.at[b, a])
/ get_edge_capacity(solution, a, b)
for (a, b) in G.edges()}
# Set edge labels
if labels == 'utilization':
edge_labels = {k: '{:.2f}'.format(v) for k, v in edge_use.items()}
elif labels == 'transmission':
edge_labels = edge_transmission
# Set edge colors
edge_colors = [edge_use[i] for i in G.edges()]
ax, m = au.plot_graph_on_map(solution.config_model, G=G,
edge_colors=edge_colors,
edge_labels=edge_labels,
figsize=figsize, fontsize=fontsize,
ax=ax, show_scale=show_scale, **kwargs)
return ax
[docs]def get_delivered_cost(solution, cost_class='monetary', carrier='power',
count_unmet_demand=False):
"""
Get the levelized cost per unit of energy delivered for the given
``cost_class`` and ``carrier``.
Parameters
----------
solution : solution container
cost_class : str, default 'monetary'
carrier : str, default 'power'
count_unmet_demand : bool, default False
Whether to count the cost of unmet demand in the final
delivered cost.
"""
summary = solution.summary.to_pandas()
meta = solution.metadata.to_pandas()
carrier_subset = meta[meta.carrier_out == carrier].index.tolist()
if count_unmet_demand is False:
try:
carrier_subset.remove('unmet_demand_' + carrier)
except ValueError: # no unmet demand technology
pass
cost = (solution['costs'].loc[dict(k=cost_class, y=carrier_subset)]
.to_pandas().sum().sum())
# Actually, met_demand also includes demand "met" by unmet_demand
met_demand = summary.at['demand_' + carrier, 'e_con']
try:
unmet_demand = summary.at['unmet_demand_' + carrier, 'e_con']
except KeyError:
unmet_demand = 0
if count_unmet_demand is False:
demand = met_demand + unmet_demand # unmet_demand is positive, add it
else:
demand = met_demand
return cost / demand * -1
[docs]def get_levelized_cost(solution, cost_class='monetary', carrier='power',
groups=None, locations=None,
unit_multiplier=1.0):
"""
Get the levelized cost per unit of energy produced for the given
``cost_class`` and ``carrier``, optionally for a subset of technologies
given by ``groups`` and a subset of ``locations``.
Parameters
----------
solution : solution container
cost_class : str, default 'monetary'
carrier : str, default 'power'
groups : list, default None
Limit the computation to members of the given groups (see the
groups table in the solution for valid groups). Defaults to
['supply', 'supply_plus'] if not given.
locations : str or iterable, default None
Limit the computation to the given location or locations.
unit_multiplier : float or int, default 1.0
Adjust unit of the returned cost value. For example, if model units
are kW and kWh, ``unit_multiplier=1.0`` will return cost per kWh, and
``unit_multiplier=0.001`` will return cost per MWh.
"""
if groups is None:
groups = ['supply', 'supply_plus']
members = []
for g in groups:
try:
members.extend(solution.groups.to_pandas().at[g, 'members'].split('|'))
except KeyError:
pass
if locations is None:
locations_slice = slice(None)
elif isinstance(locations, (str, float, int)):
# Make sure that locations is a list if it's a single value
locations_slice = [locations]
else:
locations_slice = locations
cost = solution['costs'].loc[dict(k=cost_class, x=locations_slice, y=members)]
c_prod = solution['c_prod'].loc[dict(c=carrier, x=locations_slice, y=members)]
if locations is None:
cost = cost.sum(dim='x').to_pandas()
c_prod = c_prod.sum(dim='x').to_pandas()
else:
cost = cost.to_pandas()
c_prod = c_prod.to_pandas()
return (cost / c_prod) * unit_multiplier
[docs]def get_unmet_demand_hours(solution, carrier='power', details=False):
"""
Get information about unmet demand from ``solution``.
Parameters
----------
solution : solution container
carrier : str, default 'power'
details : bool, default False
By default, only the number of hours with unmet are returned. If
details is True, a dict with 'hours', 'timesteps', and 'dates' keys
is returned instead.
"""
unmet = (solution['e']
.loc[dict(c=carrier, y='unmet_demand_' + carrier)]
.sum(dim='x')
.to_pandas())
timesteps = len(unmet[unmet > 0])
hours = solution.time_res.to_pandas()[unmet > 0].sum()
if details:
return {'hours': hours, 'timesteps': timesteps,
'dates': unmet[unmet > 0].index}
else:
return hours
[docs]def areas_below_resolution(solution, resolution):
"""
Returns a list of (start, end) timestamp tuples delimiting those
areas in the solution below the given timestep resolution (in hours).
"""
# TODO: add unit tests
time_res = solution.time_res.to_pandas()
selected = time_res[time_res < resolution]
return list(au._get_ranges(selected.index.tolist()))
[docs]def get_swi(solution, shares_var='e_cap', exclude_patterns=['unmet_demand']):
"""
Returns the Shannon-Wiener diversity index.
:math:`SWI = -1 \\times \sum_{i=1}^{I} p_{i} \\times \ln(p_{i})`
where where I is the number of categories and :math:`p_{i}`
is each category's share of the total (between 0 and 1).
:math:`SWI` is zero when there is perfect concentration.
"""
# TODO: add unit tests
techs = au.get_supply_groups(solution)
for pattern in exclude_patterns:
techs = [t for t in techs if pattern not in t]
swi = -1 * sum((p * np.log(p))
for p in [solution.shares.to_pandas().at[y, shares_var]
for y in techs]
if p > 0)
return swi
[docs]def get_hhi(solution, shares_var='e_cap', exclude_patterns=['unmet_demand']):
"""
Returns the Herfindahl-Hirschmann diversity index.
:math:`HHI = \sum_{i=1}^{I} p_{i}^2`
where :math:`p_{i}` is the percentage share of each technology i (0-100).
:math:`HHI` ranges between 0 and 10,000. A value above 1800 is
considered a sign of a concentrated market.
"""
# TODO: add unit tests
techs = au.get_supply_groups(solution)
for pattern in exclude_patterns:
techs = [t for t in techs if pattern not in t]
hhi = sum((solution.shares.to_pandas().at[y, shares_var] * 100.) ** 2
for y in techs)
return hhi
[docs]def get_domestic_supply_index(solution):
"""
Assuming that ``solution`` specifies a ``domestic`` cost class to
give each technology a domesticity score, return the total domestic
supply index for the given solution.
"""
# TODO: add unit tests
idx = solution.metadata.query('type == "supply"').index.tolist()
dom = (solution.costs.loc[dict(k='domestic', y=idx)].sum().sum() /
solution['c_prod'].loc[dict(c='power')].sum().sum())
return dom
[docs]def map_results(results, func, as_frame=False):
"""
Applies ``func`` to each model solution in ``results``, returning
a pandas DataFrame (if as_frame is True) or Series,
indexed by the run names (if available).
"""
# TODO: add unit tests
def map_func(x):
try:
return func(x)
except Exception:
return np.nan
iterations = list(results.solutions.keys())
items = [map_func(results.solutions[i]) for i in iterations]
idx = list(results.solutions.keys())
try:
idx = results.iterations.loc[idx, 'name']
except Exception:
pass
if as_frame:
return pd.DataFrame(items, index=idx)
else:
return pd.Series(items, index=idx)