Source code for calliope.utils
"""
Copyright (C) 2013-2017 Calliope contributors listed in AUTHORS.
Licensed under the Apache 2.0 License (see LICENSE file).
utils.py
~~~~~~~~
Various utility functions, particularly the AttrDict class (a subclass
of regular dict) used for managing model configuration.
"""
from contextlib import contextmanager
from io import StringIO
import functools
import logging
import os
import importlib
import sys
import numpy as np
import yaml
from . import exceptions
class __Missing(object):
def __repr__(self):
return ('MISSING')
def __nonzero__(self):
return False
_MISSING = __Missing()
def _yaml_load(src):
"""Load YAML from a file object or path with useful parser errors"""
if not isinstance(src, str):
try:
src_name = src.name
except AttributeError:
src_name = '<yaml stringio>'
# Force-load file streams as that allows the parser to print
# much more context when it encounters an error
src = src.read()
else:
src_name = '<yaml string>'
try:
return yaml.load(src)
except yaml.YAMLError:
logging.error('Parser error when reading YAML from {}.'.format(src_name))
raise
[docs]class AttrDict(dict):
"""
A subclass of ``dict`` with key access by attributes::
d = AttrDict({'a': 1, 'b': 2})
d.a == 1 # True
Includes a range of additional methods to read and write to YAML,
and to deal with nested keys.
"""
__getattr__ = dict.__getitem__
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
def __init__(self, source_dict=None):
super(AttrDict, self).__init__()
if isinstance(source_dict, dict):
self.init_from_dict(source_dict)
[docs] def copy(self):
"""Override copy method so that it returns an AttrDict"""
return AttrDict(dict(self).copy())
[docs] def init_from_dict(self, d):
"""
Initialize a new AttrDict from the given dict. Handles any
nested dicts by turning them into AttrDicts too::
d = AttrDict({'a': 1, 'b': {'x': 1, 'y': 2}})
d.b.x == 1 # True
"""
for k, v in d.items():
# First, keys must be strings, not ints
if isinstance(k, int):
k = str(k)
# Now, assign to the key, handling nested AttrDicts properly
if isinstance(v, dict):
self.set_key(k, AttrDict(v))
elif isinstance(v, list):
self.set_key(k, [i if not isinstance(i, dict) else AttrDict(i)
for i in v])
else:
self.set_key(k, v)
@classmethod
[docs] def from_yaml(cls, f, resolve_imports=True):
"""
Returns an AttrDict initialized from the given path or
file object ``f``, which must point to a YAML file.
If ``resolve_imports`` is True, ``import:`` statements are
resolved recursively, else they are treated like any other key.
When resolving import statements, anything defined locally
overrides definitions in the imported file.
"""
if isinstance(f, str):
with open(f, 'r') as src:
loaded = cls(_yaml_load(src))
else:
loaded = cls(_yaml_load(f))
if resolve_imports and 'import' in loaded:
for k in loaded['import']:
imported = cls.from_yaml(relative_path(k, f))
# loaded is added to imported (i.e. it takes precedence)
imported.union(loaded)
loaded = imported
# 'import' key no longer needed, so we drop it
loaded.pop('import', None)
return loaded
@classmethod
[docs] def from_yaml_string(cls, string):
"""
Returns an AttrDict initialized from the given string, which
must be valid YAML.
"""
return cls(_yaml_load(string))
[docs] def set_key(self, key, value):
"""
Set the given ``key`` to the given ``value``. Handles nested
keys, e.g.::
d = AttrDict()
d.set_key('foo.bar', 1)
d.foo.bar == 1 # True
"""
if '.' in key:
key, remainder = key.split('.', 1)
try:
self[key].set_key(remainder, value)
except KeyError:
self[key] = AttrDict()
self[key].set_key(remainder, value)
except AttributeError:
if self[key] is None: # If the value is None, we replace it
self[key] = AttrDict()
self[key].set_key(remainder, value)
# Else there is probably something there, and we don't just
# want to overwrite so stop and warn the user
else:
raise KeyError('Cannot set nested key on non-dict key.')
else:
self[key] = value
[docs] def get_key(self, key, default=_MISSING):
"""
Looks up the given ``key``. Like set_key(), deals with nested
keys.
If default is anything but ``_MISSING``, the given default is
returned if the key does not exist.
"""
if '.' in key:
# Nested key of form "foo.bar"
key, remainder = key.split('.', 1)
if default != _MISSING:
try:
value = self[key].get_key(remainder, default)
except KeyError:
# subdict exists, but doesn't contain key
return default
except AttributeError:
# key points to non-dict thing, so no get_key attribute
return default
else:
value = self[key].get_key(remainder)
else:
# Single, non-nested key of form "foo"
if default != _MISSING:
return self.get(key, default)
else:
return self[key]
return value
[docs] def del_key(self, key):
"""Delete the given key. Properly deals with nested keys."""
if '.' in key:
key, remainder = key.split('.', 1)
try:
del self[key][remainder]
except KeyError:
self[key].del_key(remainder)
else:
del self[key]
[docs] def as_dict(self, flat=False):
"""
Return the AttrDict as a pure dict (with nested dicts if
necessary).
"""
if not flat:
return self.as_dict_nested()
else:
return self.as_dict_flat()
def as_dict_nested(self):
d = {}
for k, v in self.items():
if isinstance(v, AttrDict):
d[k] = v.as_dict()
elif isinstance(v, list):
d[k] = [i if not isinstance(i, AttrDict) else i.as_dict()
for i in v]
else:
d[k] = v
return d
def as_dict_flat(self):
d = {}
keys = self.keys_nested()
for k in keys:
d[k] = self.get_key(k)
return d
[docs] def to_yaml(self, path=None, convert_objects=True, **kwargs):
"""
Saves the AttrDict to the given path as a YAML file.
If ``path`` is None, returns the YAML string instead.
Any additional keyword arguments are passed to the YAML writer,
so can use e.g. ``indent=4`` to override the default of 2.
``convert_objects`` (defaults to True) controls whether Numpy
objects should be converted to regular Python objects, so that
they are properly displayed in the resulting YAML output.
"""
if convert_objects:
result = self.copy()
for k in result.keys_nested():
# Convert numpy numbers to regular python ones
v = result.get_key(k)
if isinstance(v, np.floating):
result.set_key(k, float(v))
elif isinstance(v, np.integer):
result.set_key(k, int(v))
result = result.as_dict()
else:
result = self.as_dict()
if path is not None:
with open(path, 'w') as f:
yaml.dump(result, f, **kwargs)
else:
return yaml.dump(result, **kwargs)
[docs] def keys_nested(self, subkeys_as='list'):
"""
Returns all keys in the AttrDict, sorted, including the keys of
nested subdicts (which may be either regular dicts or AttrDicts).
If ``subkeys_as='list'`` (default), then a list of
all keys is returned, in the form ``['a', 'b.b1', 'b.b2']``.
If ``subkeys_as='dict'``, a list containing keys and dicts of
subkeys is returned, in the form ``['a', {'b': ['b1', 'b2']}]``.
"""
keys = []
for k, v in sorted(self.items()):
if isinstance(v, AttrDict) or isinstance(v, dict):
if subkeys_as == 'list':
keys.extend([k + '.' + kk for kk in v.keys_nested()])
elif subkeys_as == 'dict':
keys.append({k: v.keys_nested(subkeys_as=subkeys_as)})
else:
keys.append(k)
return keys
[docs] def union(self, other, allow_override=False, allow_replacement=False):
"""
Merges the AttrDict in-place with the passed ``other``
AttrDict. Keys in ``other`` take precedence, and nested keys
are properly handled.
If ``allow_override`` is False, a KeyError is raised if
other tries to redefine an already defined key.
If ``allow_replacement``, allow "_REPLACE_" key to replace an
entire sub-dict.
"""
if allow_replacement:
WIPE_KEY = '_REPLACE_'
override_keys = [k for k in other.keys_nested()
if WIPE_KEY not in k]
wipe_keys = [k.split('.' + WIPE_KEY)[0]
for k in other.keys_nested()
if WIPE_KEY in k]
else:
override_keys = other.keys_nested()
wipe_keys = []
for k in override_keys:
if not allow_override and k in self.keys_nested():
raise KeyError('Key defined twice: {}'.format(k))
else:
self.set_key(k, other.get_key(k))
for k in wipe_keys:
self.set_key(k, other.get_key(k + '.' + WIPE_KEY))
@contextmanager
def capture_output():
"""
Capture stdout and stderr output of a wrapped function::
with capture_output() as out:
# do things that create stdout or stderr output
Returns a list with the captured strings: ``[stderr, stdout]``
"""
old_out, old_err = sys.stdout, sys.stderr
try:
out = [StringIO(), StringIO()]
sys.stdout, sys.stderr = out
yield out
finally:
sys.stdout, sys.stderr = old_out, old_err
out[0] = out[0].getvalue()
out[1] = out[1].getvalue()
# This used to be a custom function, but as of Python 3.2 we can use
# the built-in lru_cache for simplicity
memoize = functools.lru_cache(maxsize=512)
class memoize_instancemethod(object):
"""
Cache the return value of a method on a per-instance basis
(as opposed to a per-class basis like functools.lru_cache does)
Source: http://code.activestate.com/recipes/577452/
This class is meant to be used as a decorator of methods. The return
value from a given method invocation will be cached on the instance
whose method was invoked. All arguments passed to a method decorated
with memoize must be hashable.
If a memoized method is invoked directly on its class the result
will not be cached. Instead the method will be invoked like a
static method.
"""
def __init__(self, func):
self.func = func
def __get__(self, obj, objtype=None):
if obj is None:
return self.func
return functools.partial(self, obj)
def __call__(self, *args, **kw):
obj = args[0]
try:
cache = obj.__cache
except AttributeError:
cache = obj.__cache = {}
key = (self.func, args[1:], frozenset(list(kw.items())))
try:
res = cache[key]
except KeyError:
res = cache[key] = self.func(*args, **kw)
return res
def relative_path(path, base_path_file):
"""
If ``path`` is not absolute, it is interpreted as relative to the
path of the given ``base_path_file``.
"""
# Check if base_path_file is a string because it might be an AttrDict
if not os.path.isabs(path) and isinstance(base_path_file, str):
path = os.path.join(os.path.dirname(base_path_file), path)
return path
def _load_function(source):
"""
Returns a function from a module, given a source string of the form:
'module.submodule.subsubmodule.function_name'
"""
module_string, function_string = source.rsplit('.', 1)
modules = [i for i in sys.modules.keys() if 'calliope' in i]
# Check if module already loaded, if so, don't re-import it
if (module_string in modules):
module = sys.modules[module_string]
elif ('calliope.' + module_string) in modules:
module = sys.modules['calliope.' + module_string]
# Else load the module
else:
try:
module = importlib.import_module(module_string)
except ImportError:
module = importlib.import_module('calliope.' + module_string)
return getattr(module, function_string)
def plugin_load(name, builtin_module):
try: # First try importing as a third-party module
func = _load_function(name)
except ValueError:
# ValueError raised if we got a string without '.',
# which implies a builtin function,
# so we attempt to load from the given module
func_string = builtin_module + '.' + name
func = _load_function(func_string)
return func
def option_getter(config_model):
"""Returns a get_option() function using the given config_model and data"""
def get_option(option, x=None, default=None, ignore_inheritance=False):
def _get_option(opt, fail=False):
try:
result = config_model.get_key('techs.' + opt)
except KeyError:
if ignore_inheritance:
return _get_option(default, fail)
# 'ccgt.constraints.s_time' -> 'ccgt', 'constraints.s_time'
tech, remainder = opt.split('.', 1)
if ':' in tech: # transmission
parent = tech.split(':')[0]
else:
# parent = e.g. 'defaults'
parent = config_model.get_key('techs.' + tech + '.parent')
try:
result = _get_option(parent + '.' + remainder, fail)
except KeyError:
e = exceptions.OptionNotSetError
if fail:
raise e('Failed to read option `{}` '
'with given default '
'`{}`'.format(option, default))
elif default:
if not isinstance(default, str): #allow setting the default directly as anything that isn't a string - could do with being more robust
result = default
else:
result = _get_option(default, fail=True)
elif tech == 'defaults':
raise e('Reached top of inheritance chain '
'and no default defined for: '
'`{}`'.format(option))
else:
raise e('Can not get parent for `{}` '
'and no default defined '
'({}).'.format(tech, option))
return result
def _get_location_option(key, location):
# NB1: KeyErrors raised here are always caught in get_option
# so need no further information or handling
return config_model.get_key(
'locations.' + location + '.override.' + key
)
if x:
try:
result = _get_location_option(option, x)
# If can't find a location-specific option, fall back to model-wide
except KeyError:
result = _get_option(option)
else:
result = _get_option(option)
# Deal with 'inf' settings
if result == 'inf':
result = float('inf')
return result
return get_option
def cost_getter(option_getter_func):
def get_cost(cost, y, k, x=None, costs_type='costs'):
return option_getter_func(y + '.' + costs_type + '.' + k + '.' + cost,
default=y + '.' + costs_type + '.default.' + cost,
x=x)
return get_cost
def cost_per_distance_getter(config_model):
option_getter_func = option_getter(config_model)
def get_cost_per_distance(cost, y, k, x):
_cost = cost_getter(option_getter_func)
cost = _cost(cost, y, k, x, costs_type='costs_per_distance')
tech, x2 = y.split(':')
per_distance = option_getter_func(y + '.per_distance')
link = config_model.get_key('links.'+ x + ',' + x2,
default=config_model['links'].get(x2 + ',' + x))
# link = None if no link exists
if not link or tech not in link.keys():
return 0
try:
distance = link.get_key(tech + '.distance')
except KeyError:
if cost > 0:
e = exceptions.OptionNotSetError
raise e('Distance must be defined for link: {} '
'and transmission tech: {}, as cost_per_distance '
'is defined'.format(x + ',' + x2, tech))
else:
return 0
distance_cost = cost * (distance / per_distance)
return distance_cost
return get_cost_per_distance
def depreciation_getter(option_getter_func):
def get_depreciation_rate(y, k):
interest = option_getter_func(
y + '.depreciation.interest.' + k,
default=y + '.depreciation.interest.default')
plant_life = option_getter_func(y + '.depreciation.plant_life')
if interest == 0:
dep = 1 / plant_life
else:
dep = ((interest * (1 + interest) ** plant_life)
/ (((1 + interest) ** plant_life) - 1))
return dep
return get_depreciation_rate
def any_option_getter(model):
"""
Get any option from the given Model, including
``costs.`` or ``costs_per_distance.`` options
"""
get_cost = cost_getter(model.get_option)
get_cost_pd = cost_per_distance_getter(model.get_option)
def get_any_option(option, x=None, t=None):
if 'costs.' in option:
if t and x:
y, cost_type, k, cost = option.split('.')
try:
return model.data['_'.join([cost_type,k,cost])].sel(y=y,x=x,t=t)
except:
return model.get_cost(cost, y, k, x=x, costs_type=cost_type)
elif t and not x:
e = exceptions.OptionNotSetError
raise e('must define location for time dependant variable '
'`{}`'.format(option))
else:
y, cost_type, k, cost = option.split('.')
return model.get_cost(cost, y, k, x=x, costs_type=cost_type)
else:
if t and x:
y = option.split('.', 1)[0]
field = option.rsplit('.', 1)[-1]
try:
return model.data[field].sel(y=y,x=x,t=t)
except:
return model.get_option(option, x=x)
elif t and not x:
e = exceptions.OptionNotSetError
raise e('must define location for time dependant variable '
'`{}`'.format(option))
elif x and not t:
return model.get_option(option, x=x)
else:
if 'costs_per_distance.' in option:
y, rest, k, cost = option.split('.')
return get_cost_pd(cost, y, k)
else:
return model.get_option(option)
return get_any_option
def vincenty(coord1, coord2):
"""
Vincenty's inverse method formula to calculate the distance in metres
between two points on the surface of a spheroid (WGS84).
modified from https://github.com/maurycyp/vincenty
"""
a = 6378137 # equitorial radius in meters
f = 1 / 298.257223563 # flattening from sphere to oblate spheroid
b = a * (1 - f) # polar radius in meters
max_iter = 200
thresh = 1e-12
# short-circuit coincident points
if coord1[0] == coord2[0] and coord1[1] == coord2[1]:
return 0
U1 = np.arctan((1 - f) * np.tan(np.radians(coord1[0])))
U2 = np.arctan((1 - f) * np.tan(np.radians(coord2[0])))
L = np.radians(coord2[1] - coord1[1])
Lambda = L
sinU1 = np.sin(U1)
cosU1 = np.cos(U1)
sinU2 = np.sin(U2)
cosU2 = np.cos(U2)
for iteration in range(max_iter):
sinLambda = np.sin(Lambda)
cosLambda = np.cos(Lambda)
sinSigma = np.sqrt((cosU2 * sinLambda) ** 2 +
(cosU1 * sinU2 - sinU1 * cosU2 * cosLambda) ** 2)
if sinSigma == 0:
return 0.0 # coincident points
cosSigma = sinU1 * sinU2 + cosU1 * cosU2 * cosLambda
sigma = np.arctan2(sinSigma, cosSigma)
sinAlpha = cosU1 * cosU2 * sinLambda / sinSigma
cosSqAlpha = 1 - sinAlpha ** 2
try:
cos2SigmaM = cosSigma - 2 * sinU1 * sinU2 / cosSqAlpha
except ZeroDivisionError:
cos2SigmaM = 0
C = f / 16 * cosSqAlpha * (4 + f * (4 - 3 * cosSqAlpha))
LambdaPrev = Lambda
Lambda = L + (1 - C) * f * sinAlpha * (sigma + C * sinSigma *
(cos2SigmaM + C * cosSigma *
(-1 + 2 * cos2SigmaM ** 2)))
if abs(Lambda - LambdaPrev) < thresh:
break # successful convergence
else:
return None # failure to converge
uSq = cosSqAlpha * (a ** 2 - b ** 2) / (b ** 2)
A = 1 + uSq / 16384 * (4096 + uSq * (-768 + uSq * (320 - 175 * uSq)))
B = uSq / 1024 * (256 + uSq * (-128 + uSq * (74 - 47 * uSq)))
deltaSigma = B * sinSigma * (cos2SigmaM + B / 4 * (cosSigma *
(-1 + 2 * cos2SigmaM ** 2) - B / 6 * cos2SigmaM *
(-3 + 4 * sinSigma ** 2) * (-3 + 4 * cos2SigmaM ** 2)))
D = b * A * (sigma - deltaSigma)
return round(D)