Source code for calliope.core.time.masks

"""
Copyright (C) 2013-2018 Calliope contributors listed in AUTHORS.
Licensed under the Apache 2.0 License (see LICENSE file).

masks.py
~~~~~~~~

Functions to pick timesteps from data given certain criteria.

"""

import pandas as pd

from calliope.core.time import funcs
from calliope.core.util.dataset import split_loc_techs
from calliope import exceptions


def _get_array(data, var, tech, **kwargs):
    subset = {'techs': tech}
    if kwargs is not None:
        subset.update({k: v for k, v in kwargs.items()})

    unusable_dims = (
        set(subset.keys())
        .difference(["techs", "locs"])
        .difference(data[var].dims)
    )
    if unusable_dims:
        raise exceptions.ModelError(
            'Attempting to mask time based on  technology {}, '
            'but dimension(s) {} do not exist for parameter {}'.format(
                tech, unusable_dims, var.name)
        )

    arr = split_loc_techs(data[var].copy()).loc[subset]
    arr = arr.mean(dim=[i for i in arr.dims if i is not 'timesteps']).to_pandas()
    return arr


def zero(data, tech, var='resource', **kwargs):
    """
    Returns timesteps where ``var`` for the technology ``tech`` is zero.

    kwargs are additional dimensions to subset on, for example,
    ``locs=['location1', 'location2]``

    """
    s = _get_array(data, var, tech, **kwargs)

    return s[s == 0].index


def _concat_indices(indices):
    return pd.concat([i.to_series() for i in indices]).sort_index().index


def _get_minmax_timestamps(series, length, n, how='max', padding=None):
    # Get the max/min timestamps
    group = series.groupby(pd.Grouper(freq=length)).mean()
    timesteps = []
    for _ in range(n):
        if how == 'max':
            ts = group.idxmax()
        elif how == 'min':
            ts = group.idxmin()
        timesteps.append(ts)
        group = group.drop(ts)

    # Get range of timestamps including padding
    full_timesteps = []
    for ts in timesteps:
        ts_end = ts + pd.Timedelta(length)
        if padding is not None:
            ts -= pd.Timedelta(padding)
            ts_end += pd.Timedelta(padding)
        ts_range = series[ts:ts_end].index[:-1]
        full_timesteps.append(ts_range)

    ts_index = _concat_indices(full_timesteps)

    return ts_index


[docs]def extreme(data, tech, var='resource', how='max', length='1D', n=1, groupby_length=None, padding=None, normalize=True, **kwargs): """ Returns timesteps for period of ``length`` where ``var`` for the technology ``tech`` across the given list of ``locations`` is either minimal or maximal. Parameters ---------- data : xarray.Dataset tech : str Technology whose `var` to find extreme for. var : str, optional default 'resource' how : str, optional 'max' (default) or 'min'. length : str, optional Defaults to '1D'. n : int, optional Number of periods of `length` to look for, default is 1. groupby_length : str, optional Group time series and return `n` periods of `length` for each group. padding : str, optional Either Pandas frequency (e.g. '1D') or 'calendar_week'. If Pandas frequency, symmetric padding is undertaken, either side of `length` If 'calendar_week', padding is fit to the calendar week in which the extreme day(s) are found. normalize : bool, optional If True (default), data is normalized using :func:`~calliope.core.time.funcs.normalized_copy`. kwargs : dict, optional Dimensions of the selected var over which to index. Any remaining dimensions will be flattened by mean """ if normalize: # Only normalise the desired var as rest of data may contain # non-numeric variables! data_n = funcs.normalized_copy(data[var].to_dataset(name=var)) else: data_n = data arr = _get_array(data_n, var, tech, **kwargs) return _extreme_with_padding(arr, how, length, n, groupby_length, padding)
[docs]def extreme_diff(data, tech0, tech1, var='resource', how='max', length='1D', n=1, groupby_length=None, padding=None, normalize=True, **kwargs): """ Returns timesteps for period of ``length`` where the diffence in extreme value for ``var`` between technologies ``tech0`` and ``tech1`` is either a minimum or a maximum. Parameters ---------- data : xarray.Dataset tech0 : str First technology for which we find the extreme of `var` tech1 : str Second technology for which we find the extreme of `var` var : str, optional default 'resource' how : str, optional 'max' (default) or 'min'. length : str, optional Defaults to '1D'. n : int, optional Number of periods of `length` to look for, default is 1. groupby_length : str, optional Group time series and return `n` periods of `length` for each group. padding : str, optional Either Pandas frequency (e.g. '1D') or 'calendar_week'. If Pandas frequency, symmetric padding is undertaken, either side of `length` If 'calendar_week', padding is fit to the calendar week in which the extreme day(s) are found. normalize : bool, optional If True (default), data is normalized using :func:`~calliope.core.time.funcs.normalized_copy`. kwargs : dict, optional Dimensions of the selected var over which to index. Any remaining dimensions will be flattened by mean """ if normalize: # Only normalise the desired var as rest of data may contain # non-numeric variables! data_n = funcs.normalized_copy(data[var].to_dataset(name=var)) else: data_n = data arr0 = _get_array(data_n, var, tech0, **kwargs) arr1 = _get_array(data_n, var, tech1, **kwargs) arr = arr0 - arr1 return _extreme_with_padding(arr, how, length, n, groupby_length, padding)
def _extreme(arr, how='max', length='1D', n=1, groupby_length=None, padding=None): if groupby_length: groupby = pd.Grouper(freq=groupby_length) group_indices = [] grouping = arr.groupby(groupby) for k in grouping.groups.keys(): s = grouping.get_group(k) group_indices.append(_get_minmax_timestamps(s, length, n, how, padding)) ts_index = _concat_indices(group_indices) else: ts_index = _get_minmax_timestamps(arr, length, n, how, padding) return ts_index def _extreme_with_padding(arr, how, length, n, groupby_length, padding): if padding == 'calendar_week': if n != 1 or length != '1D': raise ValueError( 'calendar_week padding only supports n=1 and length=1D for now.' ) result = _extreme(arr, how, length, n, groupby_length, padding=None) # get week padding for each day in result days = list(result.groupby(result.dayofyear).values()) weeks = pd.DatetimeIndex(days[0]) for d in days: weeks = weeks.union(_calendar_week_padding(d, arr)) # concatenate the weeks into one index and drop possible duplicates return pd.DatetimeIndex(weeks).drop_duplicates() else: return _extreme(arr, how, length, n, groupby_length, padding) def _calendar_week_padding(day, arr): """ Given a day, returns the whole calendar week which contains that day """ days = len(day.day.unique()) if not days == 1: raise ValueError( 'Only a single day at a time may be used for calendar_week padding, ' 'but {} days were passed.'.format(days) ) # Using day of week, figure out how many days before and after to get # a complete week days_before = 6 - day[0].dayofweek days_after = 6 - days_before # Turn it into a week start_time = day[0] - pd.Timedelta('{}D'.format(days_before)) end_time = day[-1] + pd.Timedelta('{}D'.format(days_after)) before = arr[start_time:day[0]].index[:-1] after = arr[day[-1]:end_time].index[1:] result_week = before.append(day).append(after) return result_week